posts - 7, comments - 2, trackbacks - 0, articles - 1
  C++博客 :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

Producers and Consumers

Posted on 2009-08-03 01:26 Lemon 阅读(245) 评论(0)  编辑 收藏 引用
  1 /*
  2     Mon Aug 3, 12:41 AM
  3     Producers and Consumers, Linux C
  4     More than one producers are allowed to writer to buffer at the same time if available.
  5     So are consumers to read from buffer.
  6     Free linked queue and alloced linked queue are applied.
  7 */
  8 #include <stdio.h>
  9 #include <stdlib.h>
 10 #include <string.h>
 11 #include <unistd.h>
 12 #include <signal.h>
 13 #include <time.h>
 14 #include <sys/wait.h>
 15 #include <linux/sem.h>
 16 #include <linux/shm.h>
 17 
 18 /*product infomation*/
 19 struct prod_info {
 20     int no;
 21     char name[20];
 22     time_t tm; /*time which producing is finished*/
 23 };
 24 
 25 #define SEM_KEY 1024
 26 #define SHM_KEY 1025
 27 #define PROD_TYPE struct prod_info
 28 #define BUF_SIZ 2
 29 #define SEM_EMPTY 0 /*semaphore of free buffer resourses*/
 30 #define SEM_FULL 1 /*semaphore of ready buffer resourses*/
 31 #define SEM_CNT 2 /*semaphore of product counts*/
 32 #define SEM_FRE_QUE 3 /*free queue will be locked, before pop or push operation*/
 33 #define SEM_RED_QUE 4 /*ready queue will be locked, before pop or push operation*/
 34 #define PROD_NUM 4
 35 #define CONS_NUM 3
 36 #define PROD_TM 500000000L
 37 #define CONS_TM 500000000L
 38 #define POP_TM 20000000L
 39 #define PUSH_TM POP_TM
 40 
 41 char *ss[] = { /*product name*/
 42                 "apple"
 43                 "banana"
 44                 "orange"
 45                 "water",
 46                 "paper",
 47                 "book",
 48                 "pen",
 49                 "TV",
 50                 "Computer",
 51                 "GCC",
 52                 "G++",
 53                 "JAVA",
 54                 "C#",
 55                 "Perl",
 56                 "Phython"
 57 };
 58 
 59 /*linked queue node structure*/
 60 struct lnk_nod { 
 61     PROD_TYPE prod;
 62     int next;
 63 };
 64 
 65 /*queue header structure*/
 66 struct que {
 67     int base, top;
 68 };
 69 
 70 void delay(int tm) {
 71     while (tm--);
 72 }
 73 
 74 /*lnk_base is the base pointer of queue node array*/
 75 void push_que(struct que *q, struct lnk_nod *lnk_base, int nod_id) 
 76 {
 77     if (q->base == -1) {
 78         q->base = nod_id;
 79         q->top = nod_id;
 80         return ;
 81     }
 82     delay(PUSH_TM); /*clearly show that program is not atomic*/
 83     
 84     lnk_base[q->top].next = nod_id;
 85     delay(PUSH_TM);
 86     
 87     lnk_base[nod_id].next = -1;
 88     delay(PUSH_TM);
 89     
 90     q->top = nod_id;
 91     delay(PUSH_TM);
 92 }
 93 
 94 int pop_que(struct que *q, struct lnk_nod *lnk_base) 
 95 {
 96     int pop_id;
 97     if (q->base == -1) {
 98         fprintf(stderr, "pop_que : queue is empty.\n");
 99         exit(-1);
100     }
101     delay(POP_TM);
102     
103     pop_id = q->base;
104     delay(POP_TM);
105     
106     q->base = lnk_base[pop_id].next;
107     delay(POP_TM);
108     
109     if (pop_id == q->top) q->top = -1;
110     delay(POP_TM);
111     
112     return pop_id;
113 }
114 
115 void p(int sem_id, int idx) 
116 {
117     struct sembuf buf;
118     buf.sem_num = idx;
119     buf.sem_op  = -1;
120     buf.sem_flg = 0;
121     semop(sem_id, &buf, 1);
122 }
123 
124 void v(int sem_id, int idx)
125 {
126     struct sembuf buf;
127     buf.sem_num = idx;
128     buf.sem_op  = 1;
129     buf.sem_flg = 0;
130     semop(sem_id, &buf, 1);
131 }
132 
133 void produce(int id, int sem_id, int *no, 
134                 struct que *free_que, 
135                 struct que *ready_que, 
136                 struct lnk_nod *lnk_base)
137 {
138     int pop_id;
139     printf("Producer #%d starts to produce, waits to get a free buffer.\n", id);
140     p(sem_id, SEM_EMPTY); /*wait for a free node resource*/
141     p(sem_id, SEM_FRE_QUE); /*start to get a free node, and wait for the allowance of free queue's operation*/
142     printf("Producer #%d starts to get a buffer\n", id);
143     pop_id = pop_que(free_que, lnk_base); 
144     printf("Producer #%d has got a buffer form free queue, and starts to write infomations.\n", id);
145     v(sem_id, SEM_FRE_QUE); /*release the lock of free queue*/
146     p(sem_id, SEM_CNT); /*counter lock*/
147     strcpy(lnk_base[pop_id].prod.name, ss[(*no) % 15]);
148     lnk_base[pop_id].prod.no = *no;
149     (*no) = (*no) + 1
150     v(sem_id, SEM_CNT); /*counter lock release*/
151     delay(PROD_TM);
152     time(&lnk_base[pop_id].prod.tm);
153     printf("Producer #%d has finished writing, and waits to put to ready queue.\n", id);
154     p(sem_id, SEM_RED_QUE); /*start to put product to queue, and wait for the allowance of ready queue's operation*/
155     printf("Producer #%d starts to put a buffer\n", id);
156     push_que(ready_que, lnk_base, pop_id);
157     printf("Producer #%d has put it to ready queue.\n", id);
158     v(sem_id, SEM_RED_QUE); /*release the lock of ready queue*/
159     v(sem_id, SEM_FULL); /*available products are added*/
160 }
161 
162 void consume(int id, int sem_id,
163                 struct que *free_que,
164                 struct que *ready_que,
165                 struct lnk_nod *lnk_base)
166 {
167     int pop_id;
168     printf("Consumer #%d starts to consume, and waits to get a ready buffer.\n", id);
169     p(sem_id, SEM_FULL); /*wait for a available product*/
170     p(sem_id, SEM_RED_QUE); /*start to get a product node from ready queue, wait for operation lock*/
171     printf("Consumer #%d starts to get a buffer\n", id);
172     pop_id = pop_que(ready_que, lnk_base);
173     printf("Consumer #%d has got a buffer form ready, and starts to read infomations.\n", id);
174     v(sem_id, SEM_RED_QUE); /*release ready queue lock*/
175     printf("Consumer #%d read : Product no = %d.\n", id, lnk_base[pop_id].prod.no);
176     printf("Consumer #%d read : Product name = %s.\n", id, lnk_base[pop_id].prod.name);
177     printf("Consumer #%d read : Product time = %s", id, ctime(&lnk_base[pop_id].prod.tm));
178     lnk_base[pop_id].prod.no = -1;
179     strcpy(lnk_base[pop_id].prod.name, "-----");
180     lnk_base[pop_id].prod.tm = -1;
181     printf("Consumer #%d has finished reading, and waits to put to free queue.\n", id);
182     p(sem_id, SEM_FRE_QUE); /*start to put a free node to free queue, wait for operation lock*/
183     printf("Consumer #%d starts to put a buffer\n", id);
184     push_que(free_que, lnk_base, pop_id);
185     printf("Consumer #%d has put it to free queue.\n", id);
186     v(sem_id, SEM_FRE_QUE); /*release free queue lock*/
187     v(sem_id, SEM_EMPTY); /*free node resourses are added*/
188 }
189 
190 int shm_id;
191 void *shm_add;
192 int sem_id;
193 
194 /*release shared memory and semaphore, after catch the interrupt signal*/
195 void sig_hadl(int sig_num)
196 {
197     int chld_stat;
198     union semun sem_val;
199     wait(&chld_stat); /*wait until all child is terminated*/
200     shmdt(shm_add);
201     shmctl(shm_id, IPC_RMID, NULL);
202     semctl(sem_id, 0, IPC_RMID, sem_val);
203     printf("Process %d is terminated.\n", getpid());
204 }
205 
206 int main(int argc, char **argv) 
207 {    
208     union semun sem_val;
209     int *no;
210     struct que *free_que, *ready_que;
211     struct lnk_nod *lnk_base;
212     int id, pid, count = 0, i;
213     signal(SIGINT, sig_hadl); /*set interrupt signal*/
214     srand((unsigned long)time(NULL)); /*rand seed set*/
215     /*get semaphore*/
216     sem_id = semget(SEM_KEY, 5, IPC_CREAT | 0666);
217     if (sem_id == -1) {
218         perror("semget");
219         exit(-1);
220     }
221     /*set value of semaphore*/
222     sem_val.val = BUF_SIZ;
223     semctl(sem_id, SEM_EMPTY, SETVAL, sem_val);
224     sem_val.val = 0;
225     semctl(sem_id, SEM_FULL, SETVAL, sem_val);
226     sem_val.val = 1;
227     semctl(sem_id, SEM_CNT, SETVAL, sem_val);
228     semctl(sem_id, SEM_FRE_QUE, SETVAL, sem_val);
229     semctl(sem_id, SEM_RED_QUE, SETVAL, sem_val);
230     /*get shared memory*/
231     shm_id = shmget(SHM_KEY, 
232                 sizeof(int+ sizeof(struct que) * 2 + sizeof(struct lnk_nod) * BUF_SIZ,
233                 IPC_CREAT | 0666);
234     if (shm_id == -1) {
235         perror("shmget");
236         exit(-1);
237     }
238     shm_add = (void*) shmat(shm_id, NULL, 0);
239     if (shm_add == NULL) {
240         perror("shmat");
241         exit(-1);
242     }
243     /*shared memeory manage*/
244     no = (int*) shm_add;
245     *no = 0;
246     free_que = (struct que*) (no + 1);
247     ready_que = free_que + 1;
248     lnk_base = (struct lnk_nod*) (ready_que + 1);
249     for (i = 0; i < BUF_SIZ - 1; i++) {
250         lnk_base[i].next = i + 1;
251     }
252     lnk_base[BUF_SIZ - 1].next = -1;
253     free_que->base = 0;
254     free_que->top = BUF_SIZ - 1;
255     ready_que->base = -1;
256     ready_que->top = -1;
257     /*create child process as producers and consumers*/
258     for (i = 0; i < PROD_NUM + CONS_NUM; i++) {
259         pid = fork();
260         id = count++;
261         if (pid == 0break;
262     }
263     if (pid == 0) {
264         if (id < PROD_NUM) {
265             printf("Producer #%d is created.\n", id);
266             while (1) {
267                 produce(id, sem_id, no, free_que, ready_que, lnk_base);
268             }
269         } else {
270             printf("Consumer #%d is created.\n", id - PROD_NUM);
271             while (1) {
272                 consume(id - PROD_NUM, sem_id, free_que, ready_que, lnk_base);
273             }
274         }
275         exit(0);
276     } else {
277         int chld_stat;
278         wait(&chld_stat);
279     }
280     return 0;
281 }


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理