posts - 7, comments - 2, trackbacks - 0, articles - 1
  C++博客 :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理

Producer and Consumer

Posted on 2009-08-02 01:11 Lemon 阅读(424) 评论(0)  编辑 收藏 引用
  1 /*
  2     August 2nd, 1:00 am
  3     Producer And Consumer, Linux C
  4     Shared memory represents the buffered queue.
  5     Main process represents the producer.
  6     A child process represents the consumer.
  7 */
  8 #include <stdio.h>
  9 #include <stdlib.h>
 10 #include <string.h>
 11 #include <unistd.h>
 12 #include <signal.h>
 13 #include <sys/types.h>
 14 #include <sys/wait.h>
 15 #include <linux/shm.h>
 16 #include <linux/sem.h>
 17 
 18 #define SEM_KEY 1024
 19 #define SHM_KEY 1025
 20 #define BUF_TYPE int
 21 #define BUF_SIZ 10
 22 #define SEM_FULL 0
 23 #define SEM_EMPTY 1
 24 #define PROD_TM 100000000
 25 #define CONS_TM 200000000
 26 
 27 void p(int sem_id, int idx) 
 28 {
 29     struct sembuf buf;
 30     buf.sem_num = idx;
 31     buf.sem_op = -1;
 32     buf.sem_flg = 0;
 33     semop(sem_id, &buf, 1);
 34 }
 35 
 36 void v(int sem_id, int idx)
 37 {
 38     struct sembuf buf;
 39     buf.sem_num = idx;
 40     buf.sem_op = 1;
 41     buf.sem_flg = 0;
 42     semop(sem_id, &buf, 1);
 43 }
 44 
 45 /*Write a number to buffered queue if it has available empty position.
 46 It will be blocked if queue is full.*/
 47 void prod(int sem_id, const void *shm_add)
 48 {
 49     int *put_idx = (int*)shm_add;
 50     int tm, num;
 51     BUF_TYPE *buf_base = (BUF_TYPE *)(shm_add + sizeof(int* 2);
 52     puts("Producer waits to put product to buffer."); 
 53     p(sem_id, SEM_EMPTY);
 54     num = rand();
 55     buf_base[*(put_idx)] = num;
 56     (*put_idx) = (*put_idx) + 1;
 57     printf("Producer is now puting a product value = %d to buffer.\n", num);
 58     tm = PROD_TM;
 59     while (tm--);
 60     puts("Producer finishes puting.");
 61     v(sem_id, SEM_FULL);
 62 }
 63 
 64 /*Read a number form buffered queue if it is nonempty.*/
 65 void cons(int sem_id, const void *shm_add) 
 66 {
 67     int *get_idx = (int*)shm_add + 1;
 68     BUF_TYPE *buf_base = (BUF_TYPE *)(shm_add + sizeof(int* 2);
 69     int tm, num;
 70     puts("Consumer waits to get product from buffer.");
 71     p(sem_id, SEM_FULL);
 72     num = buf_base[*get_idx];
 73     (*get_idx) = (*get_idx) + 1;
 74     printf("Consumer is now geting a product value = %d from buffer.\n", num);
 75     tm = CONS_TM;
 76     while (tm--);
 77     puts("Consumer finishes geting.");
 78     v(sem_id, SEM_EMPTY);
 79 }
 80 
 81 int shm_id;
 82 BUF_TYPE *shm_add;
 83 
 84 /*Deal with the interrput signal, and release the resources.*/
 85 void signal_handler(int sig_num) {
 86     int chld_stat;
 87     int rv;
 88     wait(&chld_stat);
 89     /*Detach shared memory.*/
 90     rv = shmdt(shm_add);
 91     if (rv == -1) {
 92         perror("shmdt");
 93         exit(-1);
 94     }
 95     /*Remove shared memory.*/
 96     shmctl(shm_id, IPC_RMID, NULL);
 97     puts("Terminated.");
 98     exit(0);
 99 }
100 
101 int main(int argc, char *argv[]) 
102 {
103     int sem_id;
104     union semun sem_val;
105     int rv, pid;
106     /*Set interrupt signal to deal with it.*/
107     signal(SIGINT, signal_handler);
108     sem_id = semget(SEM_KEY, 3, IPC_CREAT | 0666);
109     if (sem_id == -1) {
110         perror("semget");
111         exit(-1);
112     }
113 
114     sem_val.val = 0;
115     rv = semctl(sem_id, SEM_FULL, SETVAL, sem_val);
116     if (rv == -1) {
117         perror("semctl set SEM_FULL");
118         exit(-1);
119     }
120     
121     sem_val.val = BUF_SIZ;
122     rv = semctl(sem_id, SEM_EMPTY, SETVAL, sem_val);
123     if (rv == -1) {
124         perror("semctl set SEM_EMPTY");
125         exit(-1);
126     }
127     
128     shm_id = shmget(SHM_KEY, 
129                     /*The shared memory size is set at the first two pointer of buffered queue,
130                          and the following buffered queue.*/
131                     sizeof(BUF_TYPE) * BUF_SIZ + sizeof(int), 
132                     IPC_CREAT | 0666);
133     if (shm_id == -1) {
134         perror("shmget");
135         exit(-1);
136     }
137     /*Attach to memeory, and get the base pointer of shared memory.*/
138     shm_add = (BUF_TYPE *)shmat(shm_id, NULL, 0);
139     if (shm_add == NULL) {
140         perror("shmat");
141         exit(-1);
142     }
143     
144     (* ( (int*)shm_add ) ) = 0/*Base pointer of buffered queue*/
145     (* ( (int*)shm_add + 1) ) = 0/*Top pointer of buffered queue*/
146     
147     /*Create another process as consumer, while the main process do producing.*/
148     pid = fork();
149     /*Program will be terminated until receives interrupt signal(CTRL + C) from user.*/
150     if (pid) {
151         while (1) {
152             prod(sem_id, shm_add);
153         }
154     } else {
155         while (1) {
156             cons(sem_id, shm_add);
157         }
158     }
159     return 0;
160 }


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理