前言
什么是oracle数据库的高级队列:oracle内部管理的一个队列,队列元素的格式,可以是自定义或者系统默认类型,oracle的存储过程,触发器或者某个外部应用可以往队列中加入元素,也可以从队列中取出某个元素,出队列的顺序是先进先出(FIFO).例如,某个表格有更新,可以往队列里面加入下面一条消息“actiontype=update;col1='1';col2='name'.......”,
Oracle高级队列管理提供了单消费者队列和多消费者队列。单消费者队列只面向单一的接收者。多消费者队列可以被多个接收者使用。当把消息放入多消费者队列时,应用程序的程序员必须显式地在消息属性中指定这些接收者,或者建立决定每条消息的接收者的基于规则的订阅过程。下面,我们只讨论单消费者队列的情况。
第一步,安装oracle 10g client 程序(oracle 8i以上的版本才支持C接口访问高级队列),安装后,在oracle运行目录下面有一个oci文件夹,里面有include和lib目录,oci.h和oci.lib文件在后面的工程中,都会用上。
第二步,初始化oci
ret=OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0, (dvoid * (*)(void *,size_t)) 0,(dvoid * (*)(void *,void *,size_t)) 0, (void (*)(void *,void *)) 0 );
OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp,(ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp);
ret=OCIEnvInit(&envhp,(ub4)OCI_DEFAULT,21,(dvoid **) &tmp);
/* allocate a error report handle */
ret=OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp,(ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp);
/* allocate a server context handle */
ret=OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp,(ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp);
第三步,连接oracle服务器,例如数据库的服务名为"SID"
ret=OCIServerAttach(srvhp, errhp, (text *) "SID", (sb4) strlen("SID"), (ub4) OCI_DEFAULT);
第四步, /* allocate a service context handle */
ret=OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp,(ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp);
/* set attribute server context in the service context */
ret=OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp,(ub4) 0 ,(ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
第五步, /* allocate a user session handle */
ret=OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,(size_t) 0, (dvoid **) 0);
第六步,例如访问数据库的用户名和密码为 “test”和“pwd”
ret=OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"test",(ub4)strlen("test"), OCI_ATTR_USERNAME, errhp);
ret=OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,(dvoid *)"pwd",(ub4)strlen("pwd"),OCI_ATTR_PASSWORD,errhp);
第七步, ret=OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS,OCI_DEFAULT);
ret=OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX,(dvoid *) usrhp, (ub4) 0,(ub4) OCI_ATTR_SESSION, errhp);
至此,初始化和连接数据库服务器过程就完成,
第八步,执行非查询sql语句:
//分配资源句柄
retcode =OCIHandleAlloc( (dvoid *)envhp, (dvoid **) &stmthp,OCI_HTYPE_STMT, (size_t) 0, (dvoid **) 0);
//准备sql语句
retcode = OCIStmtPrepare (stmthp,errhp,(unsigned char *)sqlcmd,(ub4)strlen((char *)sqlcmd),(ub4) OCI_NTV_SYNTAX, (ub4) OCI_DEFAULT);
//执行SQL语句
retcode=OCIStmtExecute(svchp,stmthp,errhp, (ub4) 1, (ub4) 0,(CONST OCISnapshot *) NULL, (OCISnapshot *) NULL, OCI_DEFAULT );
第九步,执行查询sql语句
retcode = OCIStmtPrepare (stmthp,errhp,(unsigned char *)sqlcmd,(ub4)strlen((char *)sqlcmd),(ub4) OCI_NTV_SYNTAX, (ub4) OCI_DEFAULT);
//定义输出变量 validtime
retcode=OCIDefineByPos(stmthp, &defcolp[0], errhp, (ub4)1, (dvoid *)&validtime,(sb4)sizeof(int), SQLT_INT , (dvoid *)&indp[0], (ub2 *)0,(ub2 *)0, (ub4) OCI_DEFAULT);
//定义输出变量 keepday
retcode=OCIDefineByPos(stmthp, &defcolp[0],errhp, (ub4)2, (dvoid *)&keepday,(sb4)sizeof(int), SQLT_INT , (dvoid *)&indp[1], (ub2 *)0,(ub2 *)0, (ub4) OCI_DEFAULT);
//定义输出变量 text
retcode=OCIDefineByPos(stmthp, &defcolp[0],errhp, (ub4)3, (dvoid *)text,(sb4)sizeof(text),SQLT_CHR,(dvoid *)&indp[2], (ub2 *)0,(ub2 *)0, (ub4) OCI_DEFAULT);
//执行SQL语句
retcode=OCIStmtExecute(svchp,stmthp,errhp, (ub4) 1, (ub4) 0,(CONST OCISnapshot *) NULL, (OCISnapshot *) NULL, OCI_DEFAULT );
如果执行成功,前面绑定的变量都会赋予正确的值。
第十步,访问oracle高级队列(才入正题)
ub4 wait = OCI_DEQ_NO_WAIT; /* timeout after 0 seconds */
ub4 navigation = OCI_DEQ_FIRST_MSG;/* always get head of q */
ub4 deq_mode =OCI_DEQ_REMOVE;
/* Get the type descriptor object for the type SCOTT.MESSAGE: */
checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"SYS", strlen("SYS"),(CONST text *)"RAW", strlen("RAW"),(text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo));
/* Set wait time, navigation in dequeue options: */
checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp));//如果没有数据,多长时间返回
checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)&navigation, 0,OCI_ATTR_NAVIGATION, errhp));
checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)&deq_mode, 0, OCI_ATTR_DEQ_MODE, errhp));
sword ret=OCIAQDeq(svchp, errhp, (OraText *)qName, deqopt, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&indptr, 0, 0);
if(ret ==0)
{
OCITransCommit(svchp, errhp, (ub4) 0);
int msgSize=OCIRawSize(envhp, deqmesg);//消息长度
ub1* msgAddr=OCIRawPtr(envhp, deqmesg);//消息缓存区
//处理消息内容
}
待续.....