参考文章:
http://blog.chinaunix.net/u1/53810/showart_425856.html1.创建消息队列
int msgget(key_t key, int msgflg);
通常是msgflg =IPC_CREAT| IPC_EXCL|0666
通过key_t ftok(const char *pathname, int proj_id);创建key_t
2.队列读写
ssize_t msgrcv(int msqid, struct msgbuf *msgp, size_t msgsz, long msgtyp, int msgflg);
3.消息队列控制
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
进程间通讯--消息队列服务端:
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <errno.h>
5 #include <sys/types.h>
6 #include <sys/ipc.h>
7 #include <sys/msg.h>
8 #include <sys/stat.h>
9 #include <pthread.h>
10 #include <iostream>
11
12 using namespace std;
13
14 #define MSG_FILE "/boot"
15 #define BUFSIZE 255
16
17 /* 服务端创建的消息队列最后没有删除,我们要使用ipcrm命令来删除的 */
18 /* ipcrm -q <msqid> i ipcs -q*/
19
20 typedef struct msgtype
21 {
22 long mtype;
23 char buffer[BUFSIZE+1];
24 };
25
26 void* RecvThreadProc(void* lpPara)
27 {
28 int msgid = (int)lpPara;
29 msgtype msg;
30
31 while( true )
32 {
33 int Ret = msgrcv(msgid, &msg, sizeof(msg.buffer), 9999, 0);
34
35 if( Ret < 0 )
36 {
37 fprintf(stderr, "Receive Message Error:%s\n", strerror(errno));
38 break;
39 }
40
41 if( strncmp(msg.buffer, "exit", 4) == 0 )
42 {
43 continue;
44 }
45
46 std::cout << msg.buffer << std::endl;
47 }
48
49 return NULL;
50 }
51
52 void* SendThreadProc(void* lpPara)
53 {
54 int msgid = (int)lpPara;
55 msgtype msg;
56 char buf[BUFSIZE];
57
58 while( true )
59 {
60 memset( buf, 0x00, sizeof(buf) );
61 cin.getline(buf, BUFSIZE);
62
63 msg.mtype = 8888;
64 strcpy( msg.buffer, buf);
65
66 int Ret = msgsnd(msgid, &msg, sizeof(msg.buffer), 0);
67
68 if( Ret != 0 )
69 {
70 fprintf(stderr,"Send Message Error:%s\n", strerror(errno));
71 break;
72 }
73
74 if( strncmp(msg.buffer, "exit", 4) == 0 )
75 {
76 break;
77 }
78 }
79
80 return NULL;
81 }
82
83 int main(int argc, char* argv[])
84 {
85 key_t key;
86 int msgid;
87
88 key = ftok(MSG_FILE, 'a');
89 if( -1 == key )
90 {
91 fprintf(stderr,"Creat Key Error:%s\n", strerror(errno));
92 exit(1);
93 }
94
95 msgid = msgget(key, S_IRUSR | S_IWUSR|IPC_CREAT | IPC_EXCL );
96
97 if( -1 == msgid )
98 {
99 fprintf(stderr, "Creat Message Error:%s\n", strerror(errno));
100 exit(1);
101 }
102 printf("msqid = %d\n", msgid);
103
104 pthread_t pthread_recv;
105 pthread_t pthread_send;
106
107 if ( pthread_create( &pthread_recv, NULL, RecvThreadProc, (void*)msgid) != 0 )
108 {
109 fprintf(stderr, "Creat Recveive Thread Error:%s\n", strerror(errno));
110 exit(1);
111 }
112
113 if ( pthread_create( &pthread_send, NULL, SendThreadProc, (void*)msgid) != 0 )
114 {
115 fprintf(stderr, "Creat Send Thread Error:%s\n", strerror(errno));
116 exit(1);
117 }
118
119 std::cout << "Start message queue server successful" << std::endl;
120
121 pthread_join(pthread_send, NULL);
122 msgctl ( msgid, IPC_RMID, NULL );
123
124 return 0;
125 }
进程间通讯--消息队列客户端:
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <errno.h>
5 #include <sys/types.h>
6 #include <sys/ipc.h>
7 #include <sys/msg.h>
8 #include <sys/stat.h>
9 #include <pthread.h>
10 #include <iostream>
11
12 using namespace std;
13
14 #define MSG_FILE "/boot"
15 #define BUFSIZE 255
16
17 /* 服务端创建的消息队列最后没有删除,我们要使用ipcrm命令来删除的 */
18 /* ipcrm -q <msqid> i ipcs -q*/
19
20 typedef struct msgtype
21 {
22 long mtype;
23 char buffer[BUFSIZE+1];
24 };
25
26 void* RecvThreadProc(void* lpPara)
27 {
28 int msgid = (int)lpPara;
29 msgtype msg;
30
31 while( true )
32 {
33 int Ret = msgrcv(msgid, &msg, sizeof(msg.buffer), 8888, 0);
34
35 if( Ret < 0 )
36 {
37 fprintf(stderr, "Receive Message Error %s\n", strerror(errno));
38 break;
39 }
40
41 std::cout << msg.buffer << std::endl;
42 }
43
44 return NULL;
45 }
46
47 void* SendThreadProc(void* lpPara)
48 {
49 int msgid = (int)lpPara;
50 msgtype msg;
51 char buf[BUFSIZE];
52
53 while( true )
54 {
55 memset( buf, 0x00, sizeof(buf) );
56 cin.getline(buf, BUFSIZE);
57
58 msg.mtype = 9999;
59 strcpy( msg.buffer, buf);
60
61 int Ret = msgsnd(msgid, &msg, sizeof(msg.buffer), 0);
62
63 if( Ret != 0 )
64 {
65 fprintf(stderr,"Send Message Error:%s\n", strerror(errno));
66 break;
67 }
68
69 if( strncmp(msg.buffer, "exit", 4) == 0 )
70 {
71 break;
72 }
73 }
74
75 return NULL;
76 }
77
78 int main(int argc, char* argv[])
79 {
80 key_t key;
81 int msgid;
82
83 key = ftok(MSG_FILE, 'a');
84 if( -1 == key )
85 {
86 fprintf(stderr,"Creat Key Error:%s\n", strerror(errno));
87 exit(1);
88 }
89
90 msgid = msgget(key, S_IRUSR|S_IWUSR);
91
92 if( -1 == msgid )
93 {
94 fprintf(stderr, "Creat Message Error:%s\n", strerror(errno));
95 exit(1);
96 }
97 printf("msqid = %d\n", msgid);
98
99 pthread_t pthread_recv;
100 pthread_t pthread_send;
101
102 if ( pthread_create( &pthread_recv, NULL, RecvThreadProc, (void*)msgid) != 0 )
103 {
104 fprintf(stderr, "Creat Recveive Thread Error:%s\n", strerror(errno));
105 exit(1);
106 }
107
108 if ( pthread_create( &pthread_send, NULL, SendThreadProc, (void*)msgid) != 0 )
109 {
110 fprintf(stderr, "Creat Send Thread Error:%s\n", strerror(errno));
111 exit(1);
112 }
113
114 std::cout << "Start message queue client successful" << std::endl;
115
116 pthread_join(pthread_send, NULL);
117
118 return 0;
119 }
消息队列服务端启动后,
可以通过: ipcs -q查询 msqid
通过
g++ -o MsgClient MsgClient.cpp -lpthread
g++ -o MsgServer MsgServer.cpp -lpthread
分别编译服务端与客户端!
http://www.cppblog.com/Files/bujiwu/MsgQueue.rar