|
Posted on 2009-08-03 01:26 Lemon 阅读(245) 评论(0) 编辑 收藏 引用
1 /* 2 Mon Aug 3, 12:41 AM 3 Producers and Consumers, Linux C 4 More than one producers are allowed to writer to buffer at the same time if available. 5 So are consumers to read from buffer. 6 Free linked queue and alloced linked queue are applied. 7 */ 8 #include <stdio.h> 9 #include <stdlib.h> 10 #include <string.h> 11 #include <unistd.h> 12 #include <signal.h> 13 #include <time.h> 14 #include <sys/wait.h> 15 #include <linux/sem.h> 16 #include <linux/shm.h> 17 18 /*product infomation*/ 19 struct prod_info { 20 int no; 21 char name[20]; 22 time_t tm; /*time which producing is finished*/ 23 }; 24 25 #define SEM_KEY 1024 26 #define SHM_KEY 1025 27 #define PROD_TYPE struct prod_info 28 #define BUF_SIZ 2 29 #define SEM_EMPTY 0 /*semaphore of free buffer resourses*/ 30 #define SEM_FULL 1 /*semaphore of ready buffer resourses*/ 31 #define SEM_CNT 2 /*semaphore of product counts*/ 32 #define SEM_FRE_QUE 3 /*free queue will be locked, before pop or push operation*/ 33 #define SEM_RED_QUE 4 /*ready queue will be locked, before pop or push operation*/ 34 #define PROD_NUM 4 35 #define CONS_NUM 3 36 #define PROD_TM 500000000L 37 #define CONS_TM 500000000L 38 #define POP_TM 20000000L 39 #define PUSH_TM POP_TM 40 41 char *ss[] = { /*product name*/ 42 "apple", 43 "banana", 44 "orange", 45 "water", 46 "paper", 47 "book", 48 "pen", 49 "TV", 50 "Computer", 51 "GCC", 52 "G++", 53 "JAVA", 54 "C#", 55 "Perl", 56 "Phython" 57 }; 58 59 /*linked queue node structure*/ 60 struct lnk_nod { 61 PROD_TYPE prod; 62 int next; 63 }; 64 65 /*queue header structure*/ 66 struct que { 67 int base, top; 68 }; 69 70 void delay(int tm) { 71 while (tm--); 72 } 73 74 /*lnk_base is the base pointer of queue node array*/ 75 void push_que(struct que *q, struct lnk_nod *lnk_base, int nod_id) 76 { 77 if (q->base == -1) { 78 q->base = nod_id; 79 q->top = nod_id; 80 return ; 81 } 82 delay(PUSH_TM); /*clearly show that program is not atomic*/ 83 84 lnk_base[q->top].next = nod_id; 85 delay(PUSH_TM); 86 87 lnk_base[nod_id].next = -1; 88 delay(PUSH_TM); 89 90 q->top = nod_id; 91 delay(PUSH_TM); 92 } 93 94 int pop_que(struct que *q, struct lnk_nod *lnk_base) 95 { 96 int pop_id; 97 if (q->base == -1) { 98 fprintf(stderr, "pop_que : queue is empty.\n"); 99 exit(-1); 100 } 101 delay(POP_TM); 102 103 pop_id = q->base; 104 delay(POP_TM); 105 106 q->base = lnk_base[pop_id].next; 107 delay(POP_TM); 108 109 if (pop_id == q->top) q->top = -1; 110 delay(POP_TM); 111 112 return pop_id; 113 } 114 115 void p(int sem_id, int idx) 116 { 117 struct sembuf buf; 118 buf.sem_num = idx; 119 buf.sem_op = -1; 120 buf.sem_flg = 0; 121 semop(sem_id, &buf, 1); 122 } 123 124 void v(int sem_id, int idx) 125 { 126 struct sembuf buf; 127 buf.sem_num = idx; 128 buf.sem_op = 1; 129 buf.sem_flg = 0; 130 semop(sem_id, &buf, 1); 131 } 132 133 void produce(int id, int sem_id, int *no, 134 struct que *free_que, 135 struct que *ready_que, 136 struct lnk_nod *lnk_base) 137 { 138 int pop_id; 139 printf("Producer #%d starts to produce, waits to get a free buffer.\n", id); 140 p(sem_id, SEM_EMPTY); /*wait for a free node resource*/ 141 p(sem_id, SEM_FRE_QUE); /*start to get a free node, and wait for the allowance of free queue's operation*/ 142 printf("Producer #%d starts to get a buffer\n", id); 143 pop_id = pop_que(free_que, lnk_base); 144 printf("Producer #%d has got a buffer form free queue, and starts to write infomations.\n", id); 145 v(sem_id, SEM_FRE_QUE); /*release the lock of free queue*/ 146 p(sem_id, SEM_CNT); /*counter lock*/ 147 strcpy(lnk_base[pop_id].prod.name, ss[(*no) % 15]); 148 lnk_base[pop_id].prod.no = *no; 149 (*no) = (*no) + 1; 150 v(sem_id, SEM_CNT); /*counter lock release*/ 151 delay(PROD_TM); 152 time(&lnk_base[pop_id].prod.tm); 153 printf("Producer #%d has finished writing, and waits to put to ready queue.\n", id); 154 p(sem_id, SEM_RED_QUE); /*start to put product to queue, and wait for the allowance of ready queue's operation*/ 155 printf("Producer #%d starts to put a buffer\n", id); 156 push_que(ready_que, lnk_base, pop_id); 157 printf("Producer #%d has put it to ready queue.\n", id); 158 v(sem_id, SEM_RED_QUE); /*release the lock of ready queue*/ 159 v(sem_id, SEM_FULL); /*available products are added*/ 160 } 161 162 void consume(int id, int sem_id, 163 struct que *free_que, 164 struct que *ready_que, 165 struct lnk_nod *lnk_base) 166 { 167 int pop_id; 168 printf("Consumer #%d starts to consume, and waits to get a ready buffer.\n", id); 169 p(sem_id, SEM_FULL); /*wait for a available product*/ 170 p(sem_id, SEM_RED_QUE); /*start to get a product node from ready queue, wait for operation lock*/ 171 printf("Consumer #%d starts to get a buffer\n", id); 172 pop_id = pop_que(ready_que, lnk_base); 173 printf("Consumer #%d has got a buffer form ready, and starts to read infomations.\n", id); 174 v(sem_id, SEM_RED_QUE); /*release ready queue lock*/ 175 printf("Consumer #%d read : Product no = %d.\n", id, lnk_base[pop_id].prod.no); 176 printf("Consumer #%d read : Product name = %s.\n", id, lnk_base[pop_id].prod.name); 177 printf("Consumer #%d read : Product time = %s", id, ctime(&lnk_base[pop_id].prod.tm)); 178 lnk_base[pop_id].prod.no = -1; 179 strcpy(lnk_base[pop_id].prod.name, "-----"); 180 lnk_base[pop_id].prod.tm = -1; 181 printf("Consumer #%d has finished reading, and waits to put to free queue.\n", id); 182 p(sem_id, SEM_FRE_QUE); /*start to put a free node to free queue, wait for operation lock*/ 183 printf("Consumer #%d starts to put a buffer\n", id); 184 push_que(free_que, lnk_base, pop_id); 185 printf("Consumer #%d has put it to free queue.\n", id); 186 v(sem_id, SEM_FRE_QUE); /*release free queue lock*/ 187 v(sem_id, SEM_EMPTY); /*free node resourses are added*/ 188 } 189 190 int shm_id; 191 void *shm_add; 192 int sem_id; 193 194 /*release shared memory and semaphore, after catch the interrupt signal*/ 195 void sig_hadl(int sig_num) 196 { 197 int chld_stat; 198 union semun sem_val; 199 wait(&chld_stat); /*wait until all child is terminated*/ 200 shmdt(shm_add); 201 shmctl(shm_id, IPC_RMID, NULL); 202 semctl(sem_id, 0, IPC_RMID, sem_val); 203 printf("Process %d is terminated.\n", getpid()); 204 } 205 206 int main(int argc, char **argv) 207 { 208 union semun sem_val; 209 int *no; 210 struct que *free_que, *ready_que; 211 struct lnk_nod *lnk_base; 212 int id, pid, count = 0, i; 213 signal(SIGINT, sig_hadl); /*set interrupt signal*/ 214 srand((unsigned long)time(NULL)); /*rand seed set*/ 215 /*get semaphore*/ 216 sem_id = semget(SEM_KEY, 5, IPC_CREAT | 0666); 217 if (sem_id == -1) { 218 perror("semget"); 219 exit(-1); 220 } 221 /*set value of semaphore*/ 222 sem_val.val = BUF_SIZ; 223 semctl(sem_id, SEM_EMPTY, SETVAL, sem_val); 224 sem_val.val = 0; 225 semctl(sem_id, SEM_FULL, SETVAL, sem_val); 226 sem_val.val = 1; 227 semctl(sem_id, SEM_CNT, SETVAL, sem_val); 228 semctl(sem_id, SEM_FRE_QUE, SETVAL, sem_val); 229 semctl(sem_id, SEM_RED_QUE, SETVAL, sem_val); 230 /*get shared memory*/ 231 shm_id = shmget(SHM_KEY, 232 sizeof(int) + sizeof(struct que) * 2 + sizeof(struct lnk_nod) * BUF_SIZ, 233 IPC_CREAT | 0666); 234 if (shm_id == -1) { 235 perror("shmget"); 236 exit(-1); 237 } 238 shm_add = (void*) shmat(shm_id, NULL, 0); 239 if (shm_add == NULL) { 240 perror("shmat"); 241 exit(-1); 242 } 243 /*shared memeory manage*/ 244 no = (int*) shm_add; 245 *no = 0; 246 free_que = (struct que*) (no + 1); 247 ready_que = free_que + 1; 248 lnk_base = (struct lnk_nod*) (ready_que + 1); 249 for (i = 0; i < BUF_SIZ - 1; i++) { 250 lnk_base[i].next = i + 1; 251 } 252 lnk_base[BUF_SIZ - 1].next = -1; 253 free_que->base = 0; 254 free_que->top = BUF_SIZ - 1; 255 ready_que->base = -1; 256 ready_que->top = -1; 257 /*create child process as producers and consumers*/ 258 for (i = 0; i < PROD_NUM + CONS_NUM; i++) { 259 pid = fork(); 260 id = count++; 261 if (pid == 0) break; 262 } 263 if (pid == 0) { 264 if (id < PROD_NUM) { 265 printf("Producer #%d is created.\n", id); 266 while (1) { 267 produce(id, sem_id, no, free_que, ready_que, lnk_base); 268 } 269 } else { 270 printf("Consumer #%d is created.\n", id - PROD_NUM); 271 while (1) { 272 consume(id - PROD_NUM, sem_id, free_que, ready_que, lnk_base); 273 } 274 } 275 exit(0); 276 } else { 277 int chld_stat; 278 wait(&chld_stat); 279 } 280 return 0; 281 }
|