主要思路
1. 首次连接时调用redisConnectWithTimeout或redisConnectUnixWithTimeout连接Redis服务端,若成功则保存返回的redisContext,假设为ctx
2. 发送命令数据后获取响应,如果是pipeling模式则调用redisGetReply获取响应,再检查redisContext中的错误码,如果为网络出错或关闭,则不置位ctx
REDIS_CONNECTED标志
3. 在下次发送数据时,先检查ctx否置位了
REDIS_CONNECTED标志,若没有则调用redisReconnect重连Redis服务端
实现代码
自动连接
1 int redis_auto_connect(CBED_REDIS *redis)
2 {
3 if(NULL==redis->ctx){
4 redisContext *ctx;
5 if(redis->type == CONN_TCP)
6 ctx = redisConnectWithTimeout(redis->conn.tcp.ip, redis->conn.tcp.port, redis->timeout_conn);
7 else
8 ctx = redisConnectUnixWithTimeout(redis->conn.unix.path, redis->timeout_conn);
9
10 if(NULL==ctx){
11 zlog_fatal(c_redis, "redis allocate context fail");
12 return -1;
13
14 }else if(ctx->err){
15 zlog_fatal(c_redis, "redis connection %s:%d error: %s",
16 redis->type==CONN_TCP?redis->conn.tcp.ip:redis->conn.unix.path,
17 redis->type==CONN_TCP?redis->conn.tcp.port:0, ctx->errstr);
18 redisFree(ctx);
19 return -1;
20 }
21
22 if(REDIS_ERR==redisSetTimeout(ctx, redis->timeout_rw)){
23 zlog_fatal(c_redis, "redis set rw timeout error: %s", ctx->errstr);
24 redisFree(ctx);
25 return -1;
26 }
27
28 redis->ctx = ctx;
29 if(redis_auth(redis)){
30 redisFree(ctx);
31 redis->ctx = NULL;
32 return -1;
33 }
34
35 } else if(!(redis->ctx->flags & REDIS_CONNECTED)){
36 int retry = redis->reconn_max, n = 0;
37 do {
38 if(REDIS_OK==redisReconnect(redis->ctx)){
39 return redis_auth(redis);
40 }
41
42 zlog_warn(c_redis, "redis reconnect %d error: %s", ++n, redis->ctx->errstr);
43 sleep(redis->reconn_interval); //reconn_interval default is 3 seconds
44
45 }while(--retry > 0);
46
47 zlog_error(c_redis, "redis reconnect exceed max num %d", redis->reconn_max);
48 return -1;
49 }
50
51 return 0;
52 }
发送时检查错误码
1 static int redis_bulk_get_reply(CBED_REDIS *redis)
2 {
3 redisReply *r;
4 int i = 0;
5 int num = redis->cmd_num;
6 redis->cmd_num = 0;
7
8 for(; i<num; ++i){
9 if(REDIS_OK==redisGetReply(redis->ctx, (void**)&r)){
10 if(r->type == REDIS_REPLY_ERROR){
11 zlog_error(c_redis, "redis get reply error: %.*s", r->len, r->str);
12 freeReplyObject(r);
13 return -1;
14 }
15 freeReplyObject(r);
16
17 }else{
18 if(redis->ctx->err==REDIS_ERR_IO||redis->ctx->err==REDIS_ERR_EOF)
19 redis->ctx->flags &= ~REDIS_CONNECTED;
20 zlog_fatal(c_redis, "redis get reply fail: %s", redis->ctx->errstr);
21 return -1;
22 }
23 }
24
25 return 0;
26 }
27
28 int redis_send(CBED_REDIS *redis, unsigned char *data, unsigned int size, int force)
29 {
30 if(redis_auto_connect(redis))
31 return -1;
32
33 int i;
34
35 if(redis->max_cmd_num > 1){ //pipelining
36 for(i=0; i<redis->queue_num; ++i){
37 if(REDIS_ERR == redisAppendCommand(redis->ctx, "RPUSH %s %b", redis->queue[i], data, size)) {
38 zlog_fatal(c_redis, "redis append command rpush %s len %u fail: %s", redis->queue[i], size, redis->ctx->errstr);
39 return -1;
40 }
41
42 ++redis->cmd_num;
43 if((!force && redis->cmd_num==redis->max_cmd_num) || force){
44 if(redis_bulk_get_reply(redis))
45 return -1;
46 }
47 }
48
49 }else{
50 for(i=0; i<redis->queue_num; ++i){
51 redisReply *r = redisCommand(redis->ctx, "RPUSH %s %b", redis->queue[i], data, size);
52 if(NULL==r){
53 if(redis->ctx->err==REDIS_ERR_IO||redis->ctx->err==REDIS_ERR_EOF)
54 redis->ctx->flags &= ~REDIS_CONNECTED;
55 zlog_fatal(c_redis, "redis command rpush %s len %u fail: %s", redis->queue[i], size, redis->ctx->errstr);
56 return -1;
57 }
58
59 if(r->type == REDIS_REPLY_ERROR){
60 zlog_error(c_redis, "redis reply rpush %s len %u error: %.*s", redis->queue[i], size, r->len, r->str);
61 freeReplyObject(r);
62 return -1;
63 }
64
65 freeReplyObject(r);
66 }
67 }
68
69 return 0;
70 }
posted on 2021-02-25 15:51
春秋十二月 阅读(6353)
评论(0) 编辑 收藏 引用 所属分类:
Network