阿二的梦想船

 

2011年11月29日

redis如何删除过期数据

随着nosql风潮兴起,redis作为当中一个耀眼的明星,也越来越多的被关注和使用,我在工作中也广泛的用到了redis来充当cachekey-value DB,但当大家发现数据越来越多时,不禁有些担心,redis能撑的住吗,虽然官方已经有漂亮的benchmark,自己也可以做做压力测试,但是看看源码,也是确认问题最直接的办法之一。比如目前我们要确认的一个问题是,redis是如何删除过期数据的?

用一个可以"find reference"IDE,沿着setex(Set the value and expiration of a key)命令一窥究竟:

void setexCommand(redisClient *c) {
    c
->argv[3= tryObjectEncoding(c->argv[3]);
    setGenericCommand(c,
0,c->argv[1],c->argv[3],c->argv[2]);
}

setGenericCommand是一个实现set,setnx,setex的通用函数,参数设置不同而已。

void setCommand(redisClient *c) {
    c
->argv[2= tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,
0,c->argv[1],c->argv[2],NULL);
}
 
void setnxCommand(redisClient *c) {
    c
->argv[2= tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,
1,c->argv[1],c->argv[2],NULL);
}
 
void setexCommand(redisClient *c) {
    c
->argv[3= tryObjectEncoding(c->argv[3]);
    setGenericCommand(c,
0,c->argv[1],c->argv[3],c->argv[2]);
}

再看setGenericCommand

 1 void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire) {
 2     long seconds = 0/* initialized to avoid an harmness warning */
 3 
 4     if (expire) {
 5         if (getLongFromObjectOrReply(c, expire, &seconds, NULL) != REDIS_OK)
 6             return;
 7         if (seconds <= 0) {
 8             addReplyError(c,"invalid expire time in SETEX");
 9             return;
10         }
11     }
12 
13     if (lookupKeyWrite(c->db,key) != NULL && nx) {
14         addReply(c,shared.czero);
15         return;
16     }
17     setKey(c->db,key,val);
18     server.dirty++;
19     if (expire) setExpire(c->db,key,time(NULL)+seconds); 
20     addReply(c, nx ? shared.cone : shared.ok);
21 }
22 

13行处理"Set the value of a key, only if the key does not exist"的场景,17行插入这个key19行设置它的超时,注意时间戳已经被设置成了到期时间。这里要看一下redisDb(c->db)的定义:

typedef struct redisDb {
    dict 
*dict;                 /* The keyspace for this DB */
    dict 
*expires;              /* Timeout of keys with a timeout set */
    dict 
*blocking_keys;        /* Keys with clients waiting for data (BLPOP) */
    dict 
*io_keys;              /* Keys with clients waiting for VM I/O */
    dict 
*watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    
int id;
} redisDb;

仅关注dictexpires,分别来存key-value和它的超时,也就是说如果一个key-value是有超时的,那么它会存在dict里,同时也存到expires里,类似这样的形式:dict[key]:value,expires[key]:timeout.

当然key-value没有超时,expires里就不存在这个key剩下setKeysetExpire两个函数无非是插数据到两个字典里,这里不再详述。


那么redis是如何删除过期key的呢。

通过查看dbDelete的调用者,首先注意到这一个函数,是用来删除过期key的。

 1 int expireIfNeeded(redisDb *db, robj *key) {
 2     time_t when = getExpire(db,key);
 3 
 4     if (when < 0return 0/* No expire for this key */
 5 
 6     /* Don't expire anything while loading. It will be done later. */
 7     if (server.loading) return 0;
 8 
 9     /* If we are running in the context of a slave, return ASAP:
10      * the slave key expiration is controlled by the master that will
11      * send us synthesized DEL operations for expired keys.
12      *
13      * Still we try to return the right information to the caller, 
14      * that is, 0 if we think the key should be still valid, 1 if
15      * we think the key is expired at this time. */
16     if (server.masterhost != NULL) {
17         return time(NULL) > when;
18     }
19 
20     /* Return when this key has not expired */
21     if (time(NULL) <= when) return 0;
22 
23     /* Delete the key */
24     server.stat_expiredkeys++;
25     propagateExpire(db,key);
26     return dbDelete(db,key);
27 }
28 

ifNeed表示能删则删,所以4行没有设置超时不删,7行在"loading"时不删,16行非主库不删,21行未到期不删。25行同步从库和文件。

再看看哪些函数调用了expireIfNeeded,有lookupKeyReadlookupKeyWritedbRandomKeyexistsCommandkeysCommand。通过这些函数命名可以看出,只要访问了某一个key,顺带做的事情就是尝试查看过期并删除,这就保证了用户不可能访问到过期的key。但是如果有大量的key过期,并且没有被访问到,那么就浪费了许多内存。Redis是如何处理这个问题的呢。


dbDelete的调用者里还发现这样一个函数:

 1 /* Try to expire a few timed out keys. The algorithm used is adaptive and
 2  * will use few CPU cycles if there are few expiring keys, otherwise
 3  * it will get more aggressive to avoid that too much memory is used by
 4  * keys that can be removed from the keyspace. */
 5 void activeExpireCycle(void) {
 6     int j;
 7 
 8     for (j = 0; j < server.dbnum; j++) {
 9         int expired;
10         redisDb *db = server.db+j;
11 
12         /* Continue to expire if at the end of the cycle more than 25%
13          * of the keys were expired. */
14         do {
15             long num = dictSize(db->expires);
16             time_t now = time(NULL);
17 
18             expired = 0;
19             if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
20                 num = REDIS_EXPIRELOOKUPS_PER_CRON;
21             while (num--) {
22                 dictEntry *de;
23                 time_t t;
24 
25                 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
26                 t = (time_t) dictGetEntryVal(de);
27                 if (now > t) {
28                     sds key = dictGetEntryKey(de);
29                     robj *keyobj = createStringObject(key,sdslen(key));
30 
31                     propagateExpire(db,keyobj);
32                     dbDelete(db,keyobj);
33                     decrRefCount(keyobj);
34                     expired++;
35                     server.stat_expiredkeys++;
36                 }
37             }
38         } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
39     }
40 }
41 

这个函数的意图已经有说明:删一点点过期key,如果过期key较少,那也只用一点点cpu25行随机取一个key38行删key成功的概率较低就退出。这个函数被放在一个cron里,每毫秒被调用一次。这个算法保证每次会删除一定比例的key,但是如果key总量很大,而这个比例控制的太大,就需要更多次的循环,浪费cpu,控制的太小,过期的key就会变多,浪费内存——这就是时空权衡了。

 

最后在dbDelete的调用者里还发现这样一个函数:

/* This function gets called when 'maxmemory' is set on the config file to limit
 * the max memory used by the server, and we are out of memory.
 * This function will try to, in order:
 *
 * - Free objects from the free list
 * - Try to remove keys with an EXPIRE set
 *
 * It is not possible to free enough memory to reach used-memory < maxmemory
 * the server will start refusing commands that will enlarge even more the
 * memory usage.
 
*/
void freeMemoryIfNeeded(void)

这个函数太长就不再详述了,注释部分说明只有在配置文件中设置了最大内存时候才会调用这个函数,而设置这个参数的意义是,你把redis当做一个内存cache而不是key-value数据库。


以上3种删除过期key的途径,第二种定期删除一定比例的key是主要的删除途径,第一种“读时删除”保证过期key不会被访问到,第三种是一个当内存超出设定时的暴力手段。由此也能看出redis设计的巧妙之处,

posted @ 2011-11-29 19:57 阿二 阅读(14848) | 评论 (1)编辑 收藏

2010年9月10日

Poco::TCPServer框架解析

     摘要: POCO C++ Libraries提供一套 C++ 的类库用以开发基于网络的可移植的应用程序,功能涉及线程、文件、流,网络协议包括:HTTP、FTP、SMTP 等,还提供 XML 的解析和 SQL 数据库的访问接口。不仅给我的工作带来极大的便利,而且设计巧妙,代码易读,注释丰富,也是非常好的学习材料。  阅读全文

posted @ 2010-09-10 01:05 阿二 阅读(18756) | 评论 (13)编辑 收藏

2008年9月9日

从海量数据中找出中位数

题目和基本思路都来源网上,本人加以整理。

题目:在一个文件中有 10G 个整数,乱序排列,要求找出中位数。内存限制为 2G。只写出思路即可(内存限制为 2G的意思就是,可以使用2G的空间来运行程序,而不考虑这台机器上的其他软件的占用内存)。

关于中位数:数据排序后,位置在最中间的数值。即将数据分成两部分,一部分大于该数值,一部分小于该数值。中位数的位置:当样本数为奇数时,中位数=(N+1)/2 ; 当样本数为偶数时,中位数为N/2与1+N/2的均值(那么10G个数的中位数,就第5G大的数与第5G+1大的数的均值了)。

分析:明显是一道工程性很强的题目,和一般的查找中位数的题目有几点不同。
1. 原数据不能读进内存,不然可以用快速选择,如果数的范围合适的话还可以考虑桶排序或者计数排序,但这里假设是32位整数,仍有4G种取值,需要一个16G大小的数组来计数。

2. 若看成从N个数中找出第K大的数,如果K个数可以读进内存,可以利用最小或最大堆,但这里K=N/2,有5G个数,仍然不能读进内存。

3. 接上,对于N个数和K个数都不能一次读进内存的情况,《编程之美》里给出一个方案:设k<K,且k个数可以完全读进内存,那么先构建k个数的堆,先找出第0到k大的数,再扫描一遍数组找出第k+1到2k的数,再扫描直到找出第K个数。虽然每次时间大约是nlog(k),但需要扫描ceil(K/k)次,这里要扫描5次。

解法:首先假设是32位无符号整数。
1. 读一遍10G个整数,把整数映射到256M个区段中,用一个64位无符号整数给每个相应区段记数。
说明:整数范围是0 - 2^32 - 1,一共有4G种取值,映射到256M个区段,则每个区段有16(4G/256M = 16)种值,每16个值算一段, 0~15是第1段,16~31是第2段,……2^32-16 ~2^32-1是第256M段。一个64位无符号整数最大值是0~8G-1,这里先不考虑溢出的情况。总共占用内存256M×8B=2GB。

2. 从前到后对每一段的计数累加,当累加的和超过5G时停止,找出这个区段(即累加停止时达到的区段,也是中位数所在的区段)的数值范围,设为[a,a+15],同时记录累加到前一个区段的总数,设为m。然后,释放除这个区段占用的内存。

3. 再读一遍10G个整数,把在[a,a+15]内的每个值计数,即有16个计数。

4. 对新的计数依次累加,每次的和设为n,当m+n的值超过5G时停止,此时的这个计数所对应的数就是中位数。

总结:
1.以上方法只要读两遍整数,对每个整数也只是常数时间的操作,总体来说是线性时间。

2. 考虑其他情况。
若是有符号的整数,只需改变映射即可。若是64为整数,则增加每个区段的范围,那么在第二次读数时,要考虑更多的计数。若过某个计数溢出,那么可认定所在的区段或代表整数为所求,这里只需做好相应的处理。噢,忘了还要找第5G+1大的数了,相信有了以上的成果,找到这个数也不难了吧。

3. 时空权衡。
花费256个区段也许只是恰好配合2GB的内存(其实也不是,呵呵)。可以增大区段范围,减少区段数目,节省一些内存,虽然增加第二部分的对单个数值的计数,但第一部分对每个区段的计数加快了(总体改变??待测)。

4. 映射时尽量用位操作,由于每个区段的起点都是2的整数幂,映射起来也很方便。

posted @ 2008-09-09 22:49 阿二 阅读(6105) | 评论 (2)编辑 收藏

基于boost::multi_array的矩阵相乘

博客第一篇,还望大家多多指点。

看了半天的boost::multi_array文档,才发现可以用shape()[]这个的东西,来取某一维的长度

而关于视图部分,小弟看的一知半解,
比如,怎么样把一个4×4的矩阵分成4个2×2的矩阵呢
虽然可以用别的途径解决,但还是想看下multi_array的视图操作

本来要实现下Strassen算法的,
下面是普通的矩阵乘法。

#include <iostream>
#include 
"boost/multi_array.hpp"
using namespace std;

typedef boost::multi_array
<int2> matrix; 

matrix matrix_multiply(matrix
& a,matrix& b)
{
    matrix::index row
=a.shape()[0];
    matrix::index col
=b.shape()[1];
    matrix c(boost::extents[row][col]);

    
for (matrix::index i=0; i!=a.shape()[0]; ++i)
        
for (matrix::index j=0; j!=b.shape()[1]; ++j)
            
for (matrix::index k=0; k!=a.shape()[1]; ++k)
                c[i][j]
+=a[i][k]*b[k][j];

    
return c;
}


void print(const matrix& m)
{
    
for (matrix::index i=0; i!=m.shape()[0]; cout<<endl,++i)
        
for (matrix::index j=0; j!=m.shape()[1]; ++j)
                cout
<<m[i][j]<<" ";    
}


int main() {   

    
int values[] = {   
        
012,   
        
345    
    }
;   
    
const int values_size = 6;   
    matrix A(boost::extents[
2][3]);  
    matrix B(boost::extents[
3][2]); 
    A.assign(values,values 
+ values_size);
    B.assign(values,values 
+ values_size);

        cout
<<"matrix A"<<endl;
        print(A);   
    cout
<<endl;cout<<"*"<<endl;cout<<"matrix B"<<endl;
        print(B);   
    cout
<<endl;cout<<"="<<endl;cout<<"matrix C"<<endl;
    print(matrix_multiply(A,B));
    cout
<<endl;  

    
return 0;
}
 

posted @ 2008-09-09 20:21 阿二 阅读(1550) | 评论 (1)编辑 收藏

仅列出标题  

导航

统计

常用链接

留言簿

随笔分类

随笔档案

搜索

最新评论

阅读排行榜

评论排行榜