loop_in_codes

低调做技术__欢迎移步我的独立博客 codemaro.com 微博 kevinlynx

#

写了一个分布式名字服务JCM

之前在公司里维护了一个名字服务,这个名字服务日常管理了近4000台机器,有4000个左右的客户端连接上来获取机器信息,由于其基本是一个单点服务,所以某些模块接近瓶颈。后来倒是有重构计划,详细设计做了,代码都写了一部分,结果由于某些原因重构就被终止了。

JCM是我业余时间用Java重写的一个版本,功能上目前只实现了基础功能。由于它是个完全分布式的架构,所以理论上可以横向扩展,大大增强系统的服务能力。

名字服务

在分布式系统中,某个服务为了提升整体服务能力,通常部署了很多实例。这里我把这些提供相同服务的实例统称为集群(cluster),每个实例称为一个节点(Node)。一个应用可能会使用很多cluster,每次访问一个cluster时,就通过名字服务获取该cluster下一个可用的node。那么,名字服务至少需要包含的功能:

  • 根据cluster名字获取可用的node
  • 对管理的所有cluster下的所有node进行健康度的检测,以保证始终返回可用的node

有些名字服务仅对node管理,不参与应用与node间的通信,而有些则可能作为应用与node间的通信转发器。虽然名字服务功能简单,但是要做一个分布式的名字服务还是比较复杂的,因为数据一旦分布式了,就会存在同步、一致性问题的考虑等。

What’s JCM

JCM围绕前面说的名字服务基础功能实现。包含的功能:

  • 管理cluster到node的映射
  • 分布式架构,可水平扩展以实现管理10,000个node的能力,足以管理一般公司的后台服务集群
  • 对每个node进行健康检查,健康检查可基于HTTP协议层的检测或TCP连接检测
  • 持久化cluster/node数据,通过zookeeper保证数据一致性
  • 提供JSON HTTP API管理cluster/node数据,后续可提供Web管理系统
  • 以库的形式提供与server的交互,库本身提供各种负载均衡策略,保证对一个cluster下node的访问达到负载均衡

项目地址git jcm

JCM主要包含两部分:

  • jcm.server,JCM名字服务,需要连接zookeeper以持久化数据
  • jcm.subscriber,客户端库,负责与jcm.server交互,提供包装了负载均衡的API给应用使用

架构

基于JCM的系统整体架构如下:

simple-arch.jpg

cluster本身是不需要依赖JCM的,要通过JCM使用这些cluster,只需要通过JCM HTTP API注册这些cluster到jcm.server上。要通过jcm.server使用这些cluster,则是通过jcm.subscriber来完成。

使用

可参考git READMe.md

需要jre1.7+

  1. 启动zookeeper
  2. 下载jcm.server git jcm.server-0.1.0.jar
  3. jcm.server-0.1.0.jar目录下建立config/application.properties文件进行配置,参考config/application.properties
  4. 启动jcm.server

     java -jar jcm.server-0.1.0.jar
    
  5. 注册需要管理的集群,参考cluster描述:doc/cluster_sample.json,通过HTTP API注册:

     curl -i -X POST http://10.181.97.106:8080/c -H "Content-Type:application/json" --data-binary @./doc/cluster_sample.json
    

部署好了jcm.server,并注册了cluster后,就可以通过jcm.subscriber使用:

// 传入需要使用的集群名hello9/hello,以及传入jcm.server地址,可以多个:127.0.0.1:8080
Subscriber subscriber = new Subscriber( Arrays.asList("127.0.0.1:8080"), Arrays.asList("hello9", "hello"));
// 使用轮询负载均衡策略
RRAllocator rr = new RRAllocator();
subscriber.addListener(rr);
subscriber.startup();
for (int i = 0; i < 2; ++i) {
  // rr.alloc 根据cluster名字获取可用的node
  System.out.println(rr.alloc("hello9", ProtoType.HTTP));
}
subscriber.shutdown();

JCM实现

JCM目前的实现比较简单,参考模块图:

impl-module

  • model,即cluster/node这些数据结构的描述,同时被jcm.server和jcm.subscriber依赖
  • storage,持久化数据到zookeeper,同时包含jcm.server实例之间的数据同步
  • health check,健康检查模块,对各个node进行健康检查

以上模块都不依赖Spring,基于以上模块又有:

  • http api,使用spring-mvc,包装了一些JSON HTTP API
  • Application,基于spring-boot,将各个基础模块组装起来,提供standalone的模式启动,不用部署到tomcat之类的servlet容器中

jcm.subscriber的实现更简单,主要是负责与jcm.server进行通信,以更新自己当前的model层数据,同时提供各种负载均衡策略接口:

  • subscriber,与jcm.server通信,定期增量拉取数据
  • node allocator,通过listener方式从subscriber中获取数据,同时实现各种负载均衡策略,对外统一提供alloc node的接口

接下来看看关键功能的实现

数据同步

既然jcm.server是分布式的,每一个jcm.server instance(实例)都是支持数据读和写的,那么当jcm.server管理着一堆cluster上万个node时,每一个instance是如何进行数据同步的?jcm.server中的数据主要有两类:

  • cluster本身的数据,包括cluster/node的描述,例如cluster name、node IP、及其他附属数据
  • node健康检查的数据

对于cluster数据,因为cluster对node的管理是一个两层的树状结构,而对cluster有增删node的操作,所以并不能在每一个instance上都提供真正的数据写入,这样会导致数据丢失。假设同一时刻在instance A和instance B上同时对cluster c1添加节点N1和N2,那么instance A写入c1(N1),而instance B还没等到数据同步就写入c1(N2),那么c1(N1)就被覆盖为c1(N2),从而导致添加的节点N1丢失。

所以,jcm.server instance是分为leaderfollower的,真正的写入操作只有leader进行,follower收到写操作请求时转发给leader。leader写数据优先更新内存中的数据再写入zookeeper,内存中的数据更新当然是需要加锁互斥的,从而保证数据的正确性。

write-watch.jpg

leader和follower是如何确定角色的?这个很简单,标准的利用zookeeper来进行主从选举的实现

jcm.server instance数据间的同步是基于zookeeper watch机制的。这个可以算做是一个JCM的一个瓶颈,每一个instance都会作为一个watch,使得实际上jcm.server并不能无限水平扩展,扩展到一定程度后,watch的效率就可能不足以满足性能了,参考zookeeper节点数与watch的性能测试 (那个时候我就在考虑对我们系统的重构了) 。

jcm.server中对node健康检查的数据采用同样的同步机制,但node健康检查数据是每一个instance都会写入的,下面看看jcm.server是如何通过分布式架构来分担压力的。

健康检查

jcm.server的另一个主要功能的是对node的健康检查,jcm.server集群可以管理几万的node,既然已经是分布式了,那么显然是要把node均分到多个instance的。这里我是以cluster来分配的,方法就是简单的使用一致性哈希。通过一致性哈希,决定一个cluster是否属于某个instance负责。每个instance都有一个server spec,也就是该instance对外提供服务的地址(IP+port),这样在任何一个instance上,它看到的所有instance server spec都是相同的,从而保证在每一个instance上计算cluster的分配得到的结果都是一致的。

健康检查按cluster划分,可以简化数据的写冲突问题,在正常情况下,每个instance写入的健康检查结果都是不同的。

health-check.jpg

健康检查一般以1秒的频率进行,jcm.server做了优化,当检查结果和上一次一样时,并不写入zookeeper。写入的数据包含了node的完整key (IP+Port+Spec),这样可以简化很多地方的数据同步问题,但会增加写入数据的大小,写入数据的大小是会影响zookeeper的性能的,所以这里简单地对数据进行了压缩。

健康检查是可以支持多种检查实现的,目前只实现了HTTP协议层的检查。健康检查自身是单个线程,在该线程中基于异步HTTP库,发起异步请求,实际的请求在其他线程中发出。

jcm.subscriber通信

jcm.subscriber与jcm.server通信,主要是为了获取最新的cluster数据。subscriber初始化时会拿到一个jcm.server instance的地址列表,访问时使用轮询策略以平衡jcm.server在处理请求时的负载。subscriber每秒都会请求一次数据,请求中描述了本次请求想获取哪些cluster数据,同时携带一个cluster的version。每次cluster在server变更时,version就变更(时间戳)。server回复请求时,如果version已最新,只需要回复node的状态。

subscriber可以拿到所有状态的node,后面可以考虑只拿正常状态的node,进一步减少数据大小。

压力测试

目前只对健康检查部分做了压测,详细参考test/benchmark.md。在A7服务器上测试,发现写zookeeper及zookeeper的watch足以满足要求,jcm.server发起的HTTP请求是主要的性能热点,单jcm.server instance大概可以承载20000个node的健康监测。

网络带宽:

1
2
3
4
5
Time              ---------------------traffic--------------------
Time               bytin  bytout   pktin  pktout  pkterr  pktdrp
01/07/15-21:30:48   3.2M    4.1M   33.5K   34.4K    0.00    0.00
01/07/15-21:30:50   3.3M    4.2M   33.7K   35.9K    0.00    0.00
01/07/15-21:30:52   2.8M    4.1M   32.6K   41.6K    0.00    0.00

CPU,通过jstack查看主要的CPU消耗在HTTP库实现层,以及健康检查线程:

1
2
3
4
  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
13301 admin     20   0 13.1g 1.1g  12m R 76.6  2.3   2:40.74 java         httpchecker
13300 admin     20   0 13.1g 1.1g  12m S 72.9  2.3   0:48.31 java
13275 admin     20   0 13.1g 1.1g  12m S 20.1  2.3   0:18.49 java

代码中增加了些状态监控:

1
checker HttpChecker stat count 20 avg check cost(ms) 542.05, avg flush cost(ms) 41.35

表示平均每次检查耗时542毫秒,写数据因为开启了cache没有参考价值。

虽然还可以从我自己的代码中做不少优化,但既然单机可以承载20000个节点的检测,一般的应用远远足够了。

总结

名字服务在分布式系统中虽然是基础服务,但往往承担了非常重要的角色,数据同步出现错误、节点状态出现瞬时的错误,都可能对整套系统造成较大影响,业务上出现较大故障。所以名字服务的健壮性、可用性非常重要。实现中需要考虑很多异常情况,包括网络不稳定、应用层的错误等。为了提高足够的可用性,一般还会加多层的数据cache,例如subscriber端的本地cache,server端的本地cache,以保证在任何情况下都不会影响应用层的服务。

posted @ 2015-07-04 17:50 Kevin Lynx 阅读(1725) | 评论 (0)编辑 收藏

无锁有序链表的实现

无锁有序链表可以保证元素的唯一性,使其可用于哈希表的桶,甚至直接作为一个效率不那么高的map。普通链表的无锁实现相对简单点,因为插入元素可以在表头插,而有序链表的插入则是任意位置。

本文主要基于论文High Performance Dynamic Lock-Free Hash Tables实现。

主要问题

链表的主要操作包含insertremove,先简单实现一个版本,就会看到问题所在,以下代码只用作示例:

struct node_t {
        key_t key;
        value_t val;
        node_t *next;
    };

    int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
        node_t *pred = head;
        node_t *item = head->next;
        while (item) {
            int d = KEY_CMP(item->key, key);
            if (d >= 0) {
                *pred_ptr = pred;
                *item_ptr = item;
                return d == 0 ? TRUE : FALSE;
            }
            pred = item;
            item = item->next;
        } 
        *pred_ptr = pred;
        *item_ptr = NULL;
        return FALSE;
    }

    int l_insert(node_t *head, key_t key, value_t val) {
        node_t *pred, *item, *new_item;
        while (TRUE) {
            if (l_find(&pred, &item, head, key)) {
                return FALSE;
            }
            new_item = (node_t*) malloc(sizeof(node_t));
            new_item->key = key;
            new_item->val = val;
            new_item->next = item;
            // A. 如果pred本身被移除了
            if (CAS(&pred->next, item, new_item)) {
                return TRUE;
            }
            free(new_item);
        }
    }

    int l_remove(node_t *head, key_t key) {
        node_t *pred, *item;
        while (TRUE) {
            if (!l_find(&pred, &item, head, key)) {
                return TRUE;
            }
            // B. 如果pred被移除;如果item也被移除
            if (CAS(&pred->next, item, item->next)) {
                haz_free(item);
                return TRUE;
            }
        }
    }

l_find函数返回查找到的前序元素和元素本身,代码A和B虽然拿到了preditem,但在CAS的时候,其可能被其他线程移除。甚至,在l_find过程中,其每一个元素都可能被移除。问题在于,任何时候拿到一个元素时,都不确定其是否还有效。元素的有效性包括其是否还在链表中,其指向的内存是否还有效。

解决方案

通过为元素指针增加一个有效性标志位,配合CAS操作的互斥性,就可以解决元素有效性判定问题。

因为node_t放在内存中是会对齐的,所以指向node_t的指针值低几位是不会用到的,从而可以在低几位里设置标志,这样在做CAS的时候,就实现了DCAS的效果,相当于将两个逻辑上的操作变成了一个原子操作。想象下引用计数对象的线程安全性,其内包装的指针是线程安全的,但对象本身不是。

CAS的互斥性,在若干个线程CAS相同的对象时,只有一个线程会成功,失败的线程就可以以此判定目标对象发生了变更。改进后的代码(代码仅做示例用,不保证正确):

typedef size_t markable_t;
    // 最低位置1,表示元素被删除
    #define HAS_MARK(p) ((markable_t)p & 0x01)
    #define MARK(p) ((markable_t)p | 0x01)
    #define STRIP_MARK(p) ((markable_t)p & ~0x01)

    int l_insert(node_t *head, key_t key, value_t val) {
        node_t *pred, *item, *new_item;
        while (TRUE) {
            if (l_find(&pred, &item, head, key)) { 
                return FALSE;
            }
            new_item = (node_t*) malloc(sizeof(node_t));
            new_item->key = key;
            new_item->val = val;
            new_item->next = item;
            // A. 虽然find拿到了合法的pred,但是在以下代码之前pred可能被删除,此时pred->next被标记
            //    pred->next != item,该CAS会失败,失败后重试
            if (CAS(&pred->next, item, new_item)) {
                return TRUE;
            }
            free(new_item);
        }
        return FALSE;
    }

    int l_remove(node_t *head, key_t key) {
        node_t *pred, *item;
        while (TRUE) {
            if (!l_find(&pred, &item, head, key)) {
                return FALSE;
            }
            node_t *inext = item->next;
            // B. 删除item前先标记item->next,如果CAS失败,那么情况同insert一样,有其他线程在find之后
            //    删除了item,失败后重试
            if (!CAS(&item->next, inext, MARK(inext))) {
                continue;
            }
            // C. 对同一个元素item删除时,只会有一个线程成功走到这里
            if (CAS(&pred->next, item, STRIP_MARK(item->next))) {
                haz_defer_free(item);
                return TRUE;
            }
        }
        return FALSE;
    }

    int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
        node_t *pred = head;
        node_t *item = head->next;
        hazard_t *hp1 = haz_get(0);
        hazard_t *hp2 = haz_get(1);
        while (item) {
            haz_set_ptr(hp1, pred);
            haz_set_ptr(hp2, item);
            /* 
             如果已被标记,那么紧接着item可能被移除链表甚至释放,所以需要重头查找
            */
            if (HAS_MARK(item->next)) { 
                return l_find(pred_ptr, item_ptr, head, key);
            }
            int d = KEY_CMP(item->key, key);
            if (d >= 0) {
                *pred_ptr = pred;
                *item_ptr = item;
                return d == 0 ? TRUE : FALSE;
            }
            pred = item;
            item = item->next;
        } 
        *pred_ptr = pred;
        *item_ptr = NULL;
        return FALSE;
    }

haz_gethaz_set_ptr之类的函数是一个hazard pointer实现,用于支持多线程下内存的GC。上面的代码中,要删除一个元素item时,会标记item->next,从而使得insert时中那个CAS不需要做任何调整。总结下这里的线程竞争情况:

  • insertfind到正常的preditempred->next == item,然后在CAS前有线程删除了pred,此时pred->next == MARK(item)CAS失败,重试;删除分为2种情况:a) 从链表移除,得到标记,pred可继续访问;b) pred可能被释放内存,此时再使用pred会错误。为了处理情况b,所以引入了类似hazard pointer的机制,可以有效保障任意一个指针p只要还有线程在使用它,它的内存就不会被真正释放
  • insert中有多个线程在pred后插入元素,此时同样由insert中的CAS保证,这个不多说
  • remove中情况同insertfind拿到了有效的prednext,但在CAS的时候pred被其他线程删除,此时情况同insertCAS失败,重试
  • 任何时候改变链表结构时,无论是remove还是insert,都需要重试该操作
  • find中遍历时,可能会遇到被标记删除的item,此时item根据remove的实现很可能被删除,所以需要重头开始遍历

ABA问题

ABA问题还是存在的,insert中:

if (CAS(&pred->next, item, new_item)) {
        return TRUE;
    }

如果CAS之前,pred后的item被移除,又以相同的地址值加进来,但其value变了,此时CAS会成功,但链表可能就不是有序的了。pred->val < new_item->val > item->val

为了解决这个问题,可以利用指针值地址对齐的其他位来存储一个计数,用于表示pred->next的改变次数。当insert拿到pred时,pred->next中存储的计数假设是0,CAS之前其他线程移除了pred->next又新增回了item,此时pred->next中的计数增加,从而导致insertCAS失败。

// 最低位留作删除标志
    #define MASK ((sizeof(node_t) - 1) & ~0x01)

    #define GET_TAG(p) ((markable_t)p & MASK)
    #define TAG(p, tag) ((markable_t)p | (tag))
    #define MARK(p) ((markable_t)p | 0x01)
    #define HAS_MARK(p) ((markable_t)p & 0x01)
    #define STRIP_MARK(p) ((node_t*)((markable_t)p & ~(MASK | 0x01)))

remove的实现:

/* 先标记再删除 */
    if (!CAS(&sitem->next, inext, MARK(inext))) {
        continue;
    }
    int tag = GET_TAG(pred->next) + 1;
    if (CAS(&pred->next, item, TAG(STRIP_MARK(sitem->next), tag))) {
        haz_defer_free(sitem);
        return TRUE;
    }

insert中也可以更新pred->next的计数。

总结

无锁的实现,本质上都会依赖于CAS的互斥性。从头实现一个lock free的数据结构,可以深刻感受到lock free实现的tricky。最终代码可以从这里github获取。代码中为了简单,实现了一个不是很强大的hazard pointer,可以参考之前的博文

posted @ 2015-05-05 19:47 Kevin Lynx 阅读(22605) | 评论 (0)编辑 收藏

并行编程中的内存回收Hazard Pointer

接上篇使用RCU技术实现读写线程无锁,在没有GC机制的语言中,要实现Lock free的算法,就免不了要自己处理内存回收的问题。

Hazard Pointer是另一种处理这个问题的算法,而且相比起来不但简单,功能也很强大。锁无关的数据结构与Hazard指针中讲得很好,Wikipedia Hazard pointer也描述得比较清楚,所以我这里就不讲那么细了。

一个简单的实现可以参考我的github haz_ptr.c

原理

基本原理无非也是读线程对指针进行标识,指针(指向的内存)要释放时都会缓存起来延迟到确认没有读线程了才对其真正释放。

<Lock-Free Data Structures with Hazard Pointers>中的描述:

Each reader thread owns a single-writer/multi-reader shared pointer called “hazard pointer.” When a reader thread assigns the address of a map to its hazard pointer, it is basically announcing to other threads (writers), “I am reading this map. You can replace it if you want, but don’t change its contents and certainly keep your deleteing hands off it.”

关键的结构包括:Hazard pointerThread Free list

Hazard pointer:一个读线程要使用一个指针时,就会创建一个Hazard pointer包装这个指针。一个Hazard pointer会被一个线程写,多个线程读。

struct HazardPointer {
        void *real_ptr; // 包装的指针
        ... // 不同的实现有不同的成员
    };

    void func() {
        HazardPointer *hp = accquire(_real_ptr);
        ... // use _real_ptr
        release(hp);
    }

Thread Free List:每个线程都有一个这样的列表,保存着将要释放的指针列表,这个列表仅对应的线程读写

void defer_free(void *ptr) {
        _free_list.push_back(ptr);
    }

当某个线程要尝试释放Free List中的指针时,例如指针ptr,就检查所有其他线程使用的Hazard pointer,检查是否存在包装了ptr的Hazard pointer,如果没有则说明没有读线程正在使用ptr,可以安全释放ptr

void gc() {
        for(ptr in _free_list) {
            conflict = false
            for (hp in _all_hazard_pointers) {
                if (hp->_real_ptr == ptr) {
                    confilict = true
                    break
                }
            }
            if (!conflict)
                delete ptr
        }
    }

以上,其实就是Hazard Pointer的主要内容。

Hazard Pointer的管理

上面的代码中没有提到_all_hazard_pointersaccquire的具体实现,这就是Hazard Pointer的管理问题。

《锁无关的数据结构与Hazard指针》文中创建了一个Lock free的链表来表示这个全局的Hazard Pointer List。每个Hazard Pointer有一个成员标识其是否可用。这个List中也就保存了已经被使用的Hazard Pointer集合和未被使用的Hazard Pointer集合,当所有Hazard Pointer都被使用时,就会新分配一个加进这个List。当读线程不使用指针时,需要归还Hazard Pointer,直接设置可用成员标识即可。要gc()时,就直接遍历这个List。

要实现一个Lock free的链表,并且仅需要实现头插入,还是非常简单的。本身Hazard Pointer标识某个指针时,都是用了后立即标识,所以这个实现直接支持了动态线程,支持线程的挂起等。

nbds项目中也有一个Hazard Pointer的实现,相对要弱一点。它为每个线程都设置了自己的Hazard Pointer池,写线程要释放指针时,就访问所有其他线程的Hazard Pointer池。

typedef struct haz_local {
        // Free List
        pending_t *pending; // to be freed
        int pending_size;
        int pending_count;

        // Hazard Pointer 池,动态和静态两种
        haz_t static_haz[STATIC_HAZ_PER_THREAD];

        haz_t **dynamic;
        int dynamic_size;
        int dynamic_count;

    } __attribute__ ((aligned(CACHE_LINE_SIZE))) haz_local_t;

    static haz_local_t haz_local_[MAX_NUM_THREADS] = {};

每个线程当然就涉及到haz_local_索引(ID)的分配,就像使用RCU技术实现读写线程无锁中的一样。这个实现为了支持线程动态创建,就需要一套线程ID的重用机制,相对复杂多了。

附录

最后,附上一些并行编程中的一些概念。

Lock Free & Wait Free

常常看到Lock FreeWait Free的概念,这些概念用于衡量一个系统或者说一段代码的并行级别,并行级别可参考并行编程——并发级别。总之Wait Free是一个比Lock Free更牛逼的级别。

我自己的理解,例如《锁无关的数据结构与Hazard指针》中实现的Hazard Pointer链表就可以说是Lock Free的,注意它在插入新元素到链表头时,因为使用CAS,总免不了一个busy loop,有这个特征的情况下就算是Lock Free,虽然没锁,但某个线程的执行情况也受其他线程的影响。

相对而言,Wait Free则是每个线程的执行都是独立的,例如《锁无关的数据结构与Hazard指针》中的Scan函数。“每个线程的执行时间都不依赖于其它任何线程的行为”

锁无关(Lock-Free)意味着系统中总存在某个线程能够得以继续执行;而等待无关(Wait-Free)则是一个更强的条件,它意味着所有线程都能往下进行。

ABA问题

在实现Lock Free算法的过程中,总是要使用CAS原语的,而CAS就会带来ABA问题。

在进行CAS操作的时候,因为在更改V之前,CAS主要询问“V的值是否仍然为A”,所以在第一次读取V之后以及对V执行CAS操作之前,如果将值从A改为B,然后再改回A,会使基于CAS的算法混乱。在这种情况下,CAS操作会成功。这类问题称为ABA问题。

Wiki Hazard Pointer提到了一个ABA问题的好例子:在一个Lock free的栈实现中,现在要出栈,栈里的元素是[A, B, C]head指向栈顶,那么就有compare_and_swap(target=&head, newvalue=B, expected=A)。但是在这个操作中,其他线程把A B都出栈,且删除了B,又把A压入栈中,即[A, C]。那么前一个线程的compare_and_swap能够成功,此时head指向了一个已经被删除的B。stackoverflow上也有个例子 Real-world examples for ABA in multithreading

对于CAS产生的这个ABA问题,通常的解决方案是采用CAS的一个变种DCAS。DCAS,是对于每一个V增加一个引用的表示修改次数的标记符。对于每个V,如果引用修改了一次,这个计数器就加1。然后再这个变量需要update的时候,就同时检查变量的值和计数器的值。

但也早有人提出DCAS也不是ABA problem 的银弹

posted @ 2015-05-03 20:46 Kevin Lynx 阅读(19961) | 评论 (0)编辑 收藏

使用RCU技术实现读写线程无锁

在一个系统中有一个写线程和若干个读线程,读写线程通过一个指针共用了一个数据结构,写线程改写这个结构,读线程读取该结构。在写线程改写这个数据结构的过程中,加锁情况下读线程由于等待锁耗时会增加。

可以利用RCU (Read Copy Update What is rcu)的思想来去除这个锁。本文提到的主要实现代码:gist

RCU

RCU可以说是一种替代读写锁的方法。其基于一个事实:当写线程在改变一个指针时,读线程获取这个指针,要么获取到老的值,要么获取到新的值。RCU的基本思想其实很简单,参考What is RCU中Toy implementation可以很容易理解。一种简单的RCU流程可以描述为:

写线程:

old_ptr = _ptr
tmp_ptr = copy(_ptr)     // copy
change(tmp_ptr)          // change 
_ptr = tmp_ptr           // update
synchroize(tmp_ptr)

写线程要更新_ptr指向的内容时,先复制一份新的,基于新的进行改变,更新_ptr指针,最后同步释放老的内存。

读线程:

tmp_ptr = _ptr
use(tmp_ptr)
dereference(tmp_ptr)

读线程直接使用_ptr,使用完后需要告诉写线程自己不再使用_ptr。读线程获取_ptr时,可能会获取到老的也可能获取到新的,无论哪种RCU都需要保证这块内存是有效的。重点在synchroizedereferencesynchroize会等待所有使用老的_ptr的线程dereference,对于新的_ptr使用者其不需要等待。这个问题说白了就是写线程如何知道old_ptr没有任何读线程在使用,可以安全地释放。

这个问题实际上在wait-free的各种实现中有好些解法,how-when-to-release-memory-in-wait-free-algorithms这里有人总结了几种方法,例如Hazard pointersQuiescence period based reclamation

简单地使用引用计数智能指针是无法解决这个问题的,因为智能指针自己不是线程安全的,例如:

tmp_ptr = _ptr      // 1
tmp_ptr->addRef()   // 2
use
tmp_ptr->release()

代码1/2行不是原子的,所以当取得tmp_ptr准备addRef时,tmp_ptr可能刚好被释放了。

Quiescence period based reclamation方法指的是读线程需要声明自己处于Quiescence period,也就是不使用_ptr的时候,当其使用_ptr的时候实际是进入了一个逻辑上的临界区,当所有读线程都不再使用_ptr的时候,写线程就可以对内存进行安全地释放。

本文正是描述了一种Quiescence period based reclamation实现。这个实现可以用于有一个写线程和多个读线程共用若干个数据的场景。

实现

该方法本质上把数据同步分解为基本的内存单元读写。使用方式上可描述为:

读线程:

tmp_ptr = _ptr
use
update() // 标识自己不再使用任何共享数据

写线程:

old_ptr = _ptr
tmp_ptr = copy(_ptr)
change(tmp_ptr)
_ptr = tmp_ptr
gc()
defer_free(old_ptr)

以下具体描述读写线程的实现。

写线程

写线程负责标识内存需要被释放,以及检查何时可以真正释放内存。其维护了一个释放内存队列:

void *_pending[8]
    uint64_t _head, _tail

    void defer_free(void *p) {
        _head ++
        _pending[PENDING_POS(_head)] = p
    }

    gc() {
        for (_tail -> find_free_pos())
            free(_pending[_tail])
    }

find_free_pos找到一个可释放内存位置,在[_tail, find_free_pos())这个区间内所有内存是可以安全被释放的。

队列位置_head/_tail一直增大,PENDING_POS就是对这个位置取模,限定在队列大小范围内也是可行的,无论哪种方式,_head从逻辑上说一直>=_tail,但在实际中可能小于_tail,所以实现时不使用大小判定,而是:

gc() {
        pos = find_free_pos()
        while (_tail != pos) {
            free(_pending[PENDING_POS(_tail)])
            _tail ++
        }
    }

读线程

读线程不再使用共享内存时,就标识自己:

update() {
        static __thread int tid
        _tmark[tid] = _head
    }

读线程的状态会影响写线程的回收逻辑,其状态分为:

  • 初始
  • 活跃,会调用到update
  • 暂停,其他地方同步,或被挂起
  • 退出

读线程处于活跃状态时,它会不断地更新自己可释放内存位置(_tmark[tid])。写线程检查所有读线程的_tmark[tid][_tail, min(_tmark[]))是所有读线程都不再使用的内存区间,可以被安全释放。

find_free_pos() {
        min = MAX_INTEGER
        pos = 0
        for (tid = 0; tid < max_threads; ++tid) {
            tpos = _tmark[tid]
            offset = tpos - tail
            if (offset < min) {
                min = offset
                pos = tpos
            }
        }
        return pos
    }

当读线程暂停时,其_tmark[tid]可能会在很长一段时间里得不到更新,此时会阻碍写线程释放内存。所以需要方法来标识读线程是否进入暂停状态。通过设置一个上次释放内存位置_tfreeds[tid],标识每个线程当前内存释放到的位置。如果一个线程处于暂停状态了,那么在一定时间后,_tfreeds[tid] == _tmark[tid]。在查找可释放位置时,就需要忽略暂停状态的读线程:

find_free_pos() {
        min = MAX_INTEGER
        pos = _head
        for (tid = 0; tid < max_threads; ++tid) {
            tpos = _tmark[tid]
            if (tpos == _tfreeds[tid]) continue
            offset = tpos - tail
            if (offset < min) {
                min = offset
                pos = tpos
            }
        }
        for (tid = 0; tid < max_threads; ++tid) {
            if (_tfreeds[tid] != _tmark[tid]) 
                _tfreeds[tid] = pos
        }
        return pos
    }

但是当所有线程都处于暂停状态时,写线程可能还在工作,上面的实现就会返回_head,此时写线程依然可以正常释放内存。

小结,该方法原理可用下图表示:

线程动态增加/减少

如果读线程可能中途退出,中途动态增加,那么_tmark[]就需要被复用,此时线程tid的分配调整为动态的即可:

class ThreadIdPool {
    public:
        // 动态获取一个线程tid,某线程每次调用该接口返回相同的值
        int get()
        // 线程退出时回收该tid
        void put(int id)
    }

ThreadIdPool的实现无非就是利用TLS,以及在线程退出时得到通知以回收tid。那么对于读线程的update实现变为:

update() {
        tid = _idPool->get()
        _tmark[tid] = _head
    }

当某个线程退出时,_tmark[tid]_tfreeds[tid]不需要做任何处理,当新创建的线程复用了该tid时,可以立即复用_tmark[tid]_tfreeds[tid],此时这2个值必然是相等的。

以上,就是整个方法的实现。

线程可读可写

以上方法适用场景还是不够通用。在nbds项目(实现了一些无锁数据结构的toy project)中有一份虽然简单但也有启发的实现(rcu.c)。该实现支持任意线程defer_free,所有线程updateupdate除了声明不再使用任何共享内存外,还可能回收内存。任意线程都可能维护一些待释放的内存,任意一块内存可能被任意其他线程使用。那么它是如何内存回收的?

本文描述的方法是所有读线程自己声明自己,然后由写线程主动来检查。不同于此方法, nbds的实现,基于一种通知扩散的方式。该方式以这样一种方式工作:

当某个线程尝试内存回收时,它需要知道所有其他线程的空闲位置(相当于_tmark[tid]),它通知下一个线程我需要释放的范围。当下一个线程update时(离开临界区),它会将上个线程的通知继续告诉下一个线程,直到最后这个通知回到发起线程。那么对于发起线程而言,这个释放请求在所有线程中走了一遍,得到了大家的认可,可以安全释放。每个线程都以这样的方式工作。

void rcu_defer_free (void *x) {
        ...
        rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = pending_[tid_]->head;
        ...
    }

    void rcu_update (void) {
        ...
        for (i = 0; i < num_threads_; ++i) {
            ...     
            uint64_t x = rcu_[tid_][i]; // 其它线程发给自己的通知
            rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x; // 扩散出去
            ...
        }
        ...
        while (q->tail != rcu_[tid_][tid_]) {
            free
        }     
        ...
    }

这个实现相对简单,不支持线程暂停,以及线程动态增加和减少。

posted @ 2015-04-19 19:10 Kevin Lynx 阅读(11394) | 评论 (3)编辑 收藏

记一次tcmalloc分配内存引起的coredump

现象

线上的服务出现coredump,堆栈为:

#0  0x000000000045d145 in GetStackTrace(void**, int, int) ()
#1  0x000000000045ec22 in tcmalloc::PageHeap::GrowHeap(unsigned long) ()
#2  0x000000000045eeb3 in tcmalloc::PageHeap::New(unsigned long) ()
#3  0x0000000000459ee8 in tcmalloc::CentralFreeList::Populate() ()
#4  0x000000000045a088 in tcmalloc::CentralFreeList::FetchFromSpansSafe() ()
#5  0x000000000045a10a in tcmalloc::CentralFreeList::RemoveRange(void**, void**, int) ()
#6  0x000000000045c282 in tcmalloc::ThreadCache::FetchFromCentralCache(unsigned long, unsigned long) ()
#7  0x0000000000470766 in tc_malloc ()
#8  0x00007f75532cd4c2 in __conhash_get_rbnode (node=0x22c86870, hash=30)
        at build/release64/cm_sub/conhash/conhash_inter.c:88
#9  0x00007f75532cd76e in __conhash_add_replicas (conhash=0x24fbc7e0, iden=<value optimized out>)
        at build/release64/cm_sub/conhash/conhash_inter.c:45
#10 0x00007f75532cd1fa in conhash_add_node (conhash=0x24fbc7e0, iden=0) at build/release64/cm_sub/conhash/conhash.c:72
#11 0x00007f75532c651b in cm_sub::TopoCluster::initLBPolicyInfo (this=0x2593a400)
        at build/release64/cm_sub/topo_cluster.cpp:114
#12 0x00007f75532cad73 in cm_sub::TopoClusterManager::processClusterMapTable (this=0xa219e0, ref=0x267ea8c0)
        at build/release64/cm_sub/topo_cluster_manager.cpp:396
#13 0x00007f75532c5a93 in cm_sub::SubRespMsgProcess::reinitCluster (this=0x9c2f00, msg=0x4e738ed0)
        at build/release64/cm_sub/sub_resp_msg_process.cpp:157
...

查看了应用层相关数据结构,基本数据都是没有问题的。所以最初怀疑是tcmalloc内部维护了错误的内存,在分配内存时出错,这个堆栈只是问题的表象。几天后,线上的另一个服务,基于同样的库,也core了,堆栈还是一样的。

最初定位问题都是从最近更新的东西入手,包括依赖的server环境,但都没有明显的问题,所以最后只能从core的直接原因入手。

分析GetStackTrace

确认core的详细位置:

# core在该指令
0x000000000045d145 <_Z13GetStackTracePPvii+21>: mov    0x8(%rax),%r9

(gdb) p/x $rip              # core 的指令位置
$9 = 0x45d145
(gdb) p/x $rax              
$10 = 0x4e73aa58
(gdb) x/1a $rax+0x8         # rax + 8 = 0x4e73aa60
0x4e73aa60:     0x0

该指令尝试从[0x4e73aa60]处读取内容,然后出错,这个内存单元不可读。但是具体这个指令在代码中是什么意思,需要将这个指令对应到代码中。获取tcmalloc的源码,发现GetStackTrace根据编译选项有很多实现,所以这里选择最可能的实现,然后对比汇编以确认代码是否匹配。最初选择的是stacktrace_x86-64-inl.h,后来发现完全不匹配,又选择了stacktrace_x86-inl.h。这个实现版本里也有对64位平台的支持。

stacktrace_x86-inl.h里使用了一些宏来生成函数名和参数,精简后代码大概为:

int GET_STACK_TRACE_OR_FRAMES {
      void **sp;
      unsigned long rbp;
      __asm__ volatile ("mov %%rbp, %0" : "=r" (rbp));
      sp = (void **) rbp;

      int n = 0;
      while (sp && n < max_depth) {
        if (*(sp+1) == reinterpret_cast<void *>(0)) {
          break;
        }
        void **next_sp = NextStackFrame<!IS_STACK_FRAMES, IS_WITH_CONTEXT>(sp, ucp);
        if (skip_count > 0) {
          skip_count--;
        } else {
          result[n] = *(sp+1);
          n++;
        }
        sp = next_sp;
      }
      return n;
    }

NextStackFrame是一个模板函数,包含一大堆代码,精简后非常简单:

template<bool STRICT_UNWINDING, bool WITH_CONTEXT>
    static void **NextStackFrame(void **old_sp, const void *uc) {
      void **new_sp = (void **) *old_sp;
      if (STRICT_UNWINDING) {
        if (new_sp <= old_sp) return NULL;
        if ((uintptr_t)new_sp - (uintptr_t)old_sp > 100000) return NULL;
      } else {
        if (new_sp == old_sp) return NULL;
        if ((new_sp > old_sp)
            && ((uintptr_t)new_sp - (uintptr_t)old_sp > 1000000)) return NULL;
      }
      if ((uintptr_t)new_sp & (sizeof(void *) - 1)) return NULL;

      return new_sp;
    }

上面这个代码到汇编的对比过程还是花了些时间,其中汇编中出现的一些常量可以大大缩短对比时间,例如上面出现了100000,汇编中就有:

0x000000000045d176 <_Z13GetStackTracePPvii+70>: cmp    $0x186a0,%rbx  # 100000=0x186a0

注意NextStackFrame中的 if (STRICT_UNWINDING)使用的是模板参数,这导致生成的代码中根本没有else部分,也就没有1000000这个常量

在对比代码的过程中,可以知道关键的几个寄存器、内存位置对应到代码中的变量,从而可以还原core时的现场环境。分析过程中不一定要从第一行汇编读,可以从较明显的位置读,从而还原整个代码,函数返回指令、跳转指令、比较指令、读内存指令、参数寄存器等都是比较明显对应的地方。

另外注意GetStackTraceRecordGrowth中调用,传入了3个参数:

GetStackTrace(t->stack, kMaxStackDepth-1, 3); // kMaxStackDepth = 31

以下是我分析的简单注解:

(gdb) disassemble
Dump of assembler code for function _Z13GetStackTracePPvii:
0x000000000045d130 <_Z13GetStackTracePPvii+0>:  push   %rbp
0x000000000045d131 <_Z13GetStackTracePPvii+1>:  mov    %rsp,%rbp
0x000000000045d134 <_Z13GetStackTracePPvii+4>:  push   %rbx
0x000000000045d135 <_Z13GetStackTracePPvii+5>:  mov    %rbp,%rax
0x000000000045d138 <_Z13GetStackTracePPvii+8>:  xor    %r8d,%r8d
0x000000000045d13b <_Z13GetStackTracePPvii+11>: test   %rax,%rax
0x000000000045d13e <_Z13GetStackTracePPvii+14>: je     0x45d167 <_Z13GetStackTracePPvii+55>
0x000000000045d140 <_Z13GetStackTracePPvii+16>: cmp    %esi,%r8d        # while ( .. max_depth > n ?
0x000000000045d143 <_Z13GetStackTracePPvii+19>: jge    0x45d167 <_Z13GetStackTracePPvii+55>
0x000000000045d145 <_Z13GetStackTracePPvii+21>: mov    0x8(%rax),%r9    # 关键位置:*(sp+1) -> r9, rax 对应 sp变量
0x000000000045d149 <_Z13GetStackTracePPvii+25>: test   %r9,%r9          # *(sp+1) == 0 ?
0x000000000045d14c <_Z13GetStackTracePPvii+28>: je     0x45d167 <_Z13GetStackTracePPvii+55>
0x000000000045d14e <_Z13GetStackTracePPvii+30>: mov    (%rax),%rcx      # new_sp = *old_sp,这里已经是NextStackFrame的代码
0x000000000045d151 <_Z13GetStackTracePPvii+33>: cmp    %rcx,%rax        # new_sp <= old_sp ? 
0x000000000045d154 <_Z13GetStackTracePPvii+36>: jb     0x45d170 <_Z13GetStackTracePPvii+64>  # new_sp > old_sp 跳转
0x000000000045d156 <_Z13GetStackTracePPvii+38>: xor    %ecx,%ecx
0x000000000045d158 <_Z13GetStackTracePPvii+40>: test   %edx,%edx        # skip_count > 0 ?
0x000000000045d15a <_Z13GetStackTracePPvii+42>: jle    0x45d186 <_Z13GetStackTracePPvii+86>
0x000000000045d15c <_Z13GetStackTracePPvii+44>: sub    $0x1,%edx        # skip_count--
0x000000000045d15f <_Z13GetStackTracePPvii+47>: mov    %rcx,%rax        
0x000000000045d162 <_Z13GetStackTracePPvii+50>: test   %rax,%rax        # while (sp ?
0x000000000045d165 <_Z13GetStackTracePPvii+53>: jne    0x45d140 <_Z13GetStackTracePPvii+16>
0x000000000045d167 <_Z13GetStackTracePPvii+55>: pop    %rbx
0x000000000045d168 <_Z13GetStackTracePPvii+56>: leaveq 
0x000000000045d169 <_Z13GetStackTracePPvii+57>: mov    %r8d,%eax        # r8 存储了返回值,r8=n
0x000000000045d16c <_Z13GetStackTracePPvii+60>: retq                    # return n
0x000000000045d16d <_Z13GetStackTracePPvii+61>: nopl   (%rax)
0x000000000045d170 <_Z13GetStackTracePPvii+64>: mov    %rcx,%rbx        
0x000000000045d173 <_Z13GetStackTracePPvii+67>: sub    %rax,%rbx        # offset = new_sp - old_sp
0x000000000045d176 <_Z13GetStackTracePPvii+70>: cmp    $0x186a0,%rbx    # offset > 100000 ?
0x000000000045d17d <_Z13GetStackTracePPvii+77>: ja     0x45d156 <_Z13GetStackTracePPvii+38> # return NULL
0x000000000045d17f <_Z13GetStackTracePPvii+79>: test   $0x7,%cl         # new_sp & (sizeof(void*) - 1)
0x000000000045d182 <_Z13GetStackTracePPvii+82>: je     0x45d158 <_Z13GetStackTracePPvii+40>
0x000000000045d184 <_Z13GetStackTracePPvii+84>: jmp    0x45d156 <_Z13GetStackTracePPvii+38>
0x000000000045d186 <_Z13GetStackTracePPvii+86>: movslq %r8d,%rax        # rax = n
0x000000000045d189 <_Z13GetStackTracePPvii+89>: add    $0x1,%r8d        # n++
0x000000000045d18d <_Z13GetStackTracePPvii+93>: mov    %r9,(%rdi,%rax,8)# 关键位置:result[n] = *(sp+1)
0x000000000045d191 <_Z13GetStackTracePPvii+97>: jmp    0x45d15f <_Z13GetStackTracePPvii+47>

分析过程比较耗时,同时还可以分析下GetStackTrace函数的实现原理,其实就是利用RBP寄存器不断回溯,从而得到整个调用堆栈各个函数的地址(严格来说是返回地址)。简单示意下函数调用中RBP的情况:

   ...
saved registers          # i.e push rbx
local variabes           # i.e sub 0x10, rsp
return address           # call xxx
last func RBP            # push rbp; mov rsp, rbp
saved registers
local variables 
return address
last func RBP
...                      # rsp

总之,一般情况下,任何一个函数中,RBP寄存器指向了当前函数的栈基址,该栈基址中又存储了调用者的栈基址,同时该栈基址前面还存储了调用者的返回地址。所以,GetStackTrace的实现,简单来说大概就是:

sp = rbp  // 取得当前函数GetStackTrace的栈基址
    while (n < max_depth) {
        new_sp = *sp
        result[n] = *(new_sp+1)
        n++
    }

以上,最终就知道了以下关键信息:

  • r8 对应变量 n,表示当前取到第几个栈帧了
  • rax 对应变量 sp,代码core在 *(sp+1)
  • rdi 对应变量 result,用于存储取得的各个地址

然后可以看看现场是怎样的:

(gdb) x/10a $rdi
0x1ffc9b98:     0x45a088 <_ZN8tcmalloc15CentralFreeList18FetchFromSpansSafeEv+40>       0x45a10a <_ZN8tcmalloc15CentralFreeList11RemoveRangeEPPvS2_i+106>
0x1ffc9ba8:     0x45c282 <_ZN8tcmalloc11ThreadCache21FetchFromCentralCacheEmm+114>      0x470766 <tc_malloc+790>
0x1ffc9bb8:     0x7f75532cd4c2 <__conhash_get_rbnode+34>        0x0
0x1ffc9bc8:     0x0     0x0
0x1ffc9bd8:     0x0     0x0

(gdb) p/x $r8
$3 = 0x5

(gdb) p/x $rax
$4 = 0x4e73aa58

小结:

GetStackTrace在取调用__conhash_get_rbnode的函数时出错,取得了5个函数地址。当前使用的RBP为0x4e73aa58

错误的RBP

RBP也是从堆栈中取出来的,既然这个地址有问题,首先想到的就是有代码局部变量/数组写越界。例如sprintf的使用。而且,一般写越界破坏堆栈,都可能是把调用者的堆栈破坏了,例如:

char s[32];
memcpy(s, p, 1024);

因为写入都是从低地址往高地址写,而调用者的堆栈在高地址。当然,也会遇到写坏调用者的调用者的堆栈,也就是跨栈帧越界写,例如以前遇到的:

len = vsnprintf(buf, sizeof(buf), fmt, wtf-long-string);
buf[len] = 0;

__conhash_get_rbnode的RBP是在tcmalloc的堆栈中取的:

(gdb) f 7
#7  0x0000000000470766 in tc_malloc ()
(gdb) x/10a $rsp
0x4e738b80:     0x4e73aa58      0x22c86870
0x4e738b90:     0x4e738bd0      0x85
0x4e738ba0:     0x4e73aa58      0x7f75532cd4c2 <__conhash_get_rbnode+34>   # 0x4e73aa58

所以这里就会怀疑是tcmalloc这个函数里有把堆栈破坏,这个时候就是读代码,看看有没有疑似危险的地方,未果。这里就陷入了僵局,怀疑又遇到了跨栈帧破坏的情况,这个时候就只能__conhash_get_rbnode调用栈中周围的函数翻翻,例如调用__conhash_get_rbnode的函数__conhash_add_replicas中恰好有字符串操作:

void __conhash_add_replicas(conhash_t *conhash, int32_t iden)
    {
        node_t* node = __conhash_create_node(iden, conhash->replica);
        ...
        char buf[buf_len]; // buf_len = 64
        ...
        snprintf(buf, buf_len, VIRT_NODE_HASH_FMT, node->iden, i);
        uint32_t hash = conhash->cb_hashfunc(buf);
        if(util_rbtree_search(&(conhash->vnode_tree), hash) == NULL)
        {
            util_rbtree_node_t* rbnode = __conhash_get_rbnode(node, hash);
            ...

这段代码最终发现是没有问题的,这里又耗费了不少时间。后来发现若干个函数里的RBP都有点奇怪,这个调用栈比较正常的范围是:0x4e738c90

(gdb) f 8
#8  0x00007f75532cd4c2 in __conhash_get_rbnode (node=0x22c86870, hash=30)
(gdb) p/x $rbp
$6 = 0x4e73aa58     # 这个还不算特别可疑
(gdb) f 9
#9  0x00007f75532cd76e in __conhash_add_replicas (conhash=0x24fbc7e0, iden=<value optimized out>)
(gdb) p/x $rbp
$7 = 0x4e738c60     # 这个也不算特别可疑
(gdb) f 10
#10 0x00007f75532cd1fa in conhash_add_node (conhash=0x24fbc7e0, iden=0) at build/release64/cm_sub/conhash/conhash.c:72
(gdb) p/x $rbp      # 可疑
$8 = 0x0
(gdb) f 11
#11 0x00007f75532c651b in cm_sub::TopoCluster::initLBPolicyInfo (this=0x2593a400)
(gdb) p/x $rbp      # 可疑
$9 = 0x2598fef0

为什么很多函数中RBP都看起来不正常? 想了想真要是代码里把堆栈破坏了,这错误得发生得多巧妙?

错误RBP的来源

然后转机来了,脑海中突然闪出-fomit-frame-pointer。编译器生成的代码中是可以不需要栈基址指针的,也就是RBP寄存器不作为栈基址寄存器。大部分函数或者说开启了frame-pointer的函数,其函数头都会有以下指令:

push   %rbp
mov    %rsp,%rbp
...

表示保存调用者的栈基址到栈中,以及设置自己的栈基址。看下__conhash系列函数;

Dump of assembler code for function __conhash_get_rbnode:
0x00007f75532cd4a0 <__conhash_get_rbnode+0>:    mov    %rbx,-0x18(%rsp)
0x00007f75532cd4a5 <__conhash_get_rbnode+5>:    mov    %rbp,-0x10(%rsp)
...

这个库是单独编译的,没有显示指定-fno-omit-frame-pointer,查阅gcc手册,o2优化是开启了omit-frame-pinter 的。

在没有RBP的情况下,tcmalloc的GetStackTrace尝试读RBP取获取调用返回地址,自然是有问题的。但是,如果整个调用栈中的函数,要么有RBP,要么没有RBP,那么GetStackTrace取出的结果最多就是跳过一些栈帧,不会出错。 除非,这中间的某个函数把RBP寄存器另作他用(编译器省出这个寄存器肯定是要另作他用的)。所以这里继续追查这个错误地址0x4e73aa58的来源。

来源已经比较明显,肯定是__conhash_get_rbnode中设置的,因为这个函数的RBP是在被调用者tcmalloc中保存的。

Dump of assembler code for function __conhash_get_rbnode:
0x00007f75532cd4a0 <__conhash_get_rbnode+0>:    mov    %rbx,-0x18(%rsp)
0x00007f75532cd4a5 <__conhash_get_rbnode+5>:    mov    %rbp,-0x10(%rsp)
0x00007f75532cd4aa <__conhash_get_rbnode+10>:   mov    %esi,%ebp                    # 改写了RBP
0x00007f75532cd4ac <__conhash_get_rbnode+12>:   mov    %r12,-0x8(%rsp)
0x00007f75532cd4b1 <__conhash_get_rbnode+17>:   sub    $0x18,%rsp
0x00007f75532cd4b5 <__conhash_get_rbnode+21>:   mov    %rdi,%r12
0x00007f75532cd4b8 <__conhash_get_rbnode+24>:   mov    $0x30,%edi
0x00007f75532cd4bd <__conhash_get_rbnode+29>:   callq  0x7f75532b98c8 <malloc@plt>  # 调用tcmalloc,汇编到这里即可

这里打印RSI寄存器的值可能会被误导,因为任何时候打印寄存器的值可能都是错的,除非它有被显示保存。不过这里可以看出RSI的值来源于参数(RSI对应第二个参数):

void __conhash_add_replicas(conhash_t *conhash, int32_t iden)
    {
        node_t* node = __conhash_create_node(iden, conhash->replica);
        ...
        char buf[buf_len]; // buf_len = 64
        ...
        snprintf(buf, buf_len, VIRT_NODE_HASH_FMT, node->iden, i);
        uint32_t hash = conhash->cb_hashfunc(buf); // hash值由一个字符串哈希函数计算
        if(util_rbtree_search(&(conhash->vnode_tree), hash) == NULL)
        {
            util_rbtree_node_t* rbnode = __conhash_get_rbnode(node, hash);  // hash值
            ...

追到__conhash_add_replicas

0x00007f75532cd764 <__conhash_add_replicas+164>:        mov    %ebx,%esi    # 来源于rbx
0x00007f75532cd766 <__conhash_add_replicas+166>:        mov    %r15,%rdi
0x00007f75532cd769 <__conhash_add_replicas+169>:        callq  0x7f75532b9e48 <__conhash_get_rbnode@plt>

(gdb) p/x $rbx
$11 = 0x4e73aa58
(gdb) p/x hash
$12 = 0x4e73aa58      # 0x4e73aa58

找到了0x4e73aa58的来源。这个地址值竟然是一个字符串哈希算法算出来的!这里还可以看看这个字符串的内容:

(gdb) x/1s $rsp
0x4e738bd0:      "conhash-00000-00133"

这个碉堡的哈希函数是conhash_hash_def

coredump的条件

以上,既然只要某个库omit-frame-pointer,那tcmalloc就可能出错,为什么发生的频率并不高呢?这个可以回到GetStackTrace尤其是NextStackFrame的实现,其中包含了几个合法RBP的判定:

if (new_sp <= old_sp) return NULL;  // 上一个栈帧的RBP肯定比当前的大
        if ((uintptr_t)new_sp - (uintptr_t)old_sp > 100000) return NULL; // 指针值范围还必须在100000内
        ...
    if ((uintptr_t)new_sp & (sizeof(void *) - 1)) return NULL; // 由于本身保存的是指针,所以还必须是sizeof(void*)的整数倍,对齐

有了以上条件,才使得这个core几率变得很低。

总结

最后,如果你很熟悉tcmalloc,整个问题估计就被秒解了:tcmalloc INSTALL

另外附上另一个有意思的东西。

在分析__conhash_add_replicas时,其内定义了一个64字节的字符数组,查看其堆栈:

(gdb) x/20a $rsp
0x4e738bd0:     0x2d687361686e6f63      0x30302d3030303030          # 这些是字符串conhash-00000-00133
0x4e738be0:     0x333331        0x0
0x4e738bf0:     0x0     0x7f75532cd69e <__conhash_create_node+78>
0x4e738c00:     0x24fbc7e0      0x4e738c60
0x4e738c10:     0x24fbc7e0      0x7f75532cd6e3 <__conhash_add_replicas+35>
0x4e738c20:     0x0     0x24fbc7e8
0x4e738c30:     0x4e738c20      0x24fbc7e0
0x4e738c40:     0x22324360      0x246632c0
0x4e738c50:     0x0     0x0
0x4e738c60:     0x0     0x7f75532cd1fa <conhash_add_node+74>

最开始我觉得buf占64字节,也就是整个[0x4e738bd0, 0x4e738c10)内存,但是这块内存里居然有函数地址,这一度使我怀疑这里有问题。后来醒悟这些地址是定义buf前调用__conhash_create_node产生的,调用过程中写到堆栈里,调用完后栈指针改变,但并不需要清空栈中的内容。

posted @ 2015-04-06 18:33 Kevin Lynx 阅读(7910) | 评论 (2)编辑 收藏

基于内存查看STL常用容器内容

有时候在线上使用gdb调试程序core问题时,可能没有符号文件,拿到的仅是一个内存地址,如果这个指向的是一个STL对象,那么如何查看这个对象的内容呢?

只需要知道STL各个容器的数据结构实现,就可以查看其内容。本文描述了SGI STL实现中常用容器的数据结构,以及如何在gdb中查看其内容。

string

string,即basic_string bits/basic_string.h

mutable _Alloc_hider  _M_dataplus;
    ... 
      const _CharT*
      c_str() const
      { return _M_data(); }
    ...    
      _CharT*
      _M_data() const 
      { return  _M_dataplus._M_p; }

    ...
      struct _Alloc_hider : _Alloc
      {
    _Alloc_hider(_CharT* __dat, const _Alloc& __a)
    : _Alloc(__a), _M_p(__dat) { }

    _CharT* _M_p; // The actual data.
      };
   
      size_type
      length() const
      { return _M_rep()->_M_length; }

      _Rep*
      _M_rep() const
      { return &((reinterpret_cast<_Rep*> (_M_data()))[-1]); }

      ...
       struct _Rep_base
      {
    size_type       _M_length;
    size_type       _M_capacity;
    _Atomic_word        _M_refcount;
      };

      struct _Rep : _Rep_base

即,string内有一个指针,指向实际的字符串位置,这个位置前面有一个_Rep结构,其内保存了字符串的长度、可用内存以及引用计数。当我们拿到一个string对象的地址时,可以通过以下代码获取相关值:

void ds_str_i(void *p) {
        char **raw = (char**)p;
        char *s = *raw;
        size_t len = *(size_t*)(s - sizeof(size_t) * 3);
        printf("str: %s (%zd)\n", s, len);
    }

    size_t ds_str() {
        std::string s = "hello";
        ds_str_i(&s);
        return s.size();
    }

在gdb中拿到一个string的地址时,可以以下打印出该字符串及长度:

(gdb) x/1a p
0x7fffffffe3a0: 0x606028
(gdb) p (char*)0x606028
$2 = 0x606028 "hello"
(gdb) x/1dg 0x606028-24
0x606010:       5

vector

众所周知vector实现就是一块连续的内存,bits/stl_vector.h

template<typename _Tp, typename _Alloc = std::allocator<_Tp> >
    class vector : protected _Vector_base<_Tp, _Alloc>

    ...
    template<typename _Tp, typename _Alloc>
    struct _Vector_base
    {
      typedef typename _Alloc::template rebind<_Tp>::other _Tp_alloc_type;

      struct _Vector_impl
      : public _Tp_alloc_type
      {
    _Tp*           _M_start;
    _Tp*           _M_finish;
    _Tp*           _M_end_of_storage;
    _Vector_impl(_Tp_alloc_type const& __a)
    : _Tp_alloc_type(__a), _M_start(0), _M_finish(0), _M_end_of_storage(0)
    { }
      };


      _Vector_impl _M_impl;

可以看出sizeof(vector<xxx>)=24,其内也就是3个指针,_M_start指向首元素地址,_M_finish指向最后一个节点+1,_M_end_of_storage是可用空间最后的位置。

iterator
      end()
      { return iterator (this->_M_impl._M_finish); }
      const_iterator
      ...
      begin() const
      { return const_iterator (this->_M_impl._M_start); }
      ...
      size_type
      capacity() const
      { return size_type(const_iterator(this->_M_impl._M_end_of_storage)
             - begin()); }

可以通过代码从一个vector对象地址输出其信息:

template <typename T>
    void ds_vec_i(void *p) {
        T *start = *(T**)p;
        T *finish = *(T**)((char*)p + sizeof(void*));
        T *end_storage = *(T**)((char*)p + 2 * sizeof(void*));
        printf("vec size: %ld, avaiable size: %ld\n", finish - start, end_storage - start); 
    }

    size_t ds_vec() {
        std::vector<int> vec;
        vec.push_back(0x11);
        vec.push_back(0x22);
        vec.push_back(0x33);
        ds_vec_i<int>(&vec);
        return vec.size();
    }

使用gdb输出一个vector中的内容:

(gdb) p p
$3 = (void *) 0x7fffffffe380
(gdb) x/1a p
0x7fffffffe380: 0x606080
(gdb) x/3xw 0x606080
0x606080:       0x00000011      0x00000022      0x00000033

list

众所周知list被实现为一个链表。准确来说是一个双向链表。list本身是一个特殊节点,其代表end,其指向的下一个元素才是list真正的第一个节点:

bits/stl_list.h

bool
      empty() const
      { return this->_M_impl._M_node._M_next == &this->_M_impl._M_node; }

      const_iterator
      begin() const
      { return const_iterator(this->_M_impl._M_node._M_next); }

      iterator
      end()
      { return iterator(&this->_M_impl._M_node); }

      ...

    struct _List_node_base
    {
        _List_node_base* _M_next;   ///< Self-explanatory
        _List_node_base* _M_prev;   ///< Self-explanatory
        ...
    };
         
    template<typename _Tp>
    struct _List_node : public _List_node_base
    {
      _Tp _M_data;                ///< User's data.
    };
      
    template<typename _Tp, typename _Alloc>
    class _List_base
    {
        ...
      struct _List_impl
      : public _Node_alloc_type
      {
    _List_node_base _M_node;
        ...
      };

      _List_impl _M_impl;

          
    template<typename _Tp, typename _Alloc = std::allocator<_Tp> >
    class list : protected _List_base<_Tp, _Alloc>

所以sizeof(list<xx>)=16,两个指针。每一个真正的节点首先是包含两个指针,然后是元素内容(_List_node)。

通过代码输出list的内容:

#define NEXT(ptr, T) do { \
        void *n = *(char**)ptr; \
        T val = *(T*)((char**)ptr + 2); \
        printf("list item %p val: 0x%x\n", ptr, val); \
        ptr = n; \
    } while (0)

    template <typename T>
    void ds_list_i(void *p) {
        void *ptr = *(char**)p;

        NEXT(ptr, T);
        NEXT(ptr, T);
        NEXT(ptr, T);
    }

    size_t ds_list() {
        std::list<int> lst;
        lst.push_back(0x11);
        lst.push_back(0x22);
        lst.push_back(0x33);
        ds_list_i<int>(&lst);
        return lst.size();
    }

在gdb中可以以下方式遍历该list:

(gdb) p p
$4 = (void *) 0x7fffffffe390
(gdb) x/1a p
0x7fffffffe390: 0x606080
(gdb) x/1xw 0x606080+16         # 元素1 
0x606090:       0x00000011
(gdb) x/1a 0x606080
0x606080:       0x6060a0
(gdb) x/1xw 0x6060a0+16         # 元素2
0x6060b0:       0x00000022

map

map使用的是红黑树实现,实际使用的是stl_tree.h实现:

bits/stl_map.h

typedef _Rb_tree<key_type, value_type, _Select1st<value_type>,
               key_compare, _Pair_alloc_type> _Rep_type;
    ...
     _Rep_type _M_t;
    ... 

      iterator
      begin()
      { return _M_t.begin(); }

bits/stl_tree.h

struct _Rb_tree_node_base
      {
        typedef _Rb_tree_node_base* _Base_ptr;
        typedef const _Rb_tree_node_base* _Const_Base_ptr;

        _Rb_tree_color  _M_color;
        _Base_ptr       _M_parent;
        _Base_ptr       _M_left;
        _Base_ptr       _M_right;
        
        ...
      };

    template<typename _Val>
    struct _Rb_tree_node : public _Rb_tree_node_base
    {
      typedef _Rb_tree_node<_Val>* _Link_type;
      _Val _M_value_field;
    };


    template<typename _Key_compare,
           bool _Is_pod_comparator = std::__is_pod<_Key_compare>::__value>
        struct _Rb_tree_impl : public _Node_allocator
        {
      _Key_compare      _M_key_compare;
      _Rb_tree_node_base    _M_header;
      size_type         _M_node_count; // Keeps track of size of tree.
      ...
        }
    
    _Rb_tree_impl<_Compare> _M_impl;
    ...

      iterator
      begin()
      {
    return iterator(static_cast<_Link_type>
            (this->_M_impl._M_header._M_left));
      }

所以可以看出,大部分时候(取决于_M_key_compare) sizeof(map<xx>)=48,主要的元素是:

_Rb_tree_color  _M_color; // 节点颜色
        _Base_ptr       _M_parent; // 父节点
        _Base_ptr       _M_left; // 左节点
        _Base_ptr       _M_right; // 右节点
        _Val            _M_value_field // 同list中节点技巧一致,后面是实际的元素

同list中的实现一致,map本身作为一个节点,其不是一个存储数据的节点,

_Rb_tree::end

iterator
      end()
      { return iterator(static_cast<_Link_type>(&this->_M_impl._M_header)); }

由于节点值在_Rb_tree_node_base后,所以任意时候拿到节点就可以偏移这个结构体拿到节点值,节点的值是一个pair,包含了key和value。

在gdb中打印以下map的内容:

size_t ds_map() {
        std::map<std::string, int> imap;
        imap["abc"] = 0xbbb;
        return imap.size();
    }
(gdb) p/x &imap
$7 = 0x7fffffffe370
(gdb) x/1a (char*)&imap+24       # _M_left 真正的节点
0x7fffffffe388: 0x606040          
(gdb) x/1xw 0x606040+32+8        # 偏移32字节是节点值的地址,再偏移8则是value的地址
0x606068:       0x00000bbb
(gdb) p *(char**)(0x606040+32)   # 偏移32字节是string的地址
$8 = 0x606028 "abc"

或者很多时候没有必要这么装逼+蛋疼:

(gdb) p *(char**)(imap._M_t._M_impl._M_header._M_left+1)
$9 = 0x606028 "abc"
(gdb) x/1xw (char*)(imap._M_t._M_impl._M_header._M_left+1)+8
0x606068:       0x00000bbb

posted @ 2014-12-03 22:08 Kevin Lynx 阅读(3813) | 评论 (2)编辑 收藏

linux动态库的种种要点

linux下使用动态库,基本用起来还是很容易。但如果我们的程序中大量使用动态库来实现各种框架/插件,那么就会遇到一些坑,掌握这些坑才有利于程序更稳健地运行。

本篇先谈谈动态库符号方面的问题。

测试代码可以在github上找到

符号查找

一个应用程序test会链接一个动态库libdy.so,如果一个符号,例如函数callfn定义于libdy.so中,test要使用该函数,简单地声明即可:

// dy.cpp libdy.so
void callfn() {
    ...
}

// main.cpp test
extern void callfn();

callfn();

在链接test的时候,链接器会统一进行检查。

同样,在libdy.so中有相同的规则,它可以使用一个外部的符号,在它被链接/载入进一个可执行程序时才会进行符号存在与否的检查。这个符号甚至可以定义在test中,形成一种双向依赖,或定义在其他动态库中:

// dy.cpp libdy.so
extern void mfunc();

mfunc();

// main.cpp test
void mfunc() {
    ...
}

在生成libdy.so时mfunc可以找不到,此时mfunc为未定义:

$ nm libdy.so | grep mfun
U _Z5mfuncv

但在libdy.so被链接进test时则会进行检查,试着把mfunc函数的定义去掉,就会得到一个链接错误:

./libdy.so: undefined reference to `mfunc()'

同样,如果我们动态载入libdy.so,此时当然可以链接通过,但是在载入时同样得到找不到符号的错误:

#ifdef DY_LOAD
    void *dp = dlopen("./libdy.so", RTLD_LAZY);
    typedef void (*callfn)();
    callfn f = (callfn) dlsym(dp, "callfn");
    f();
    dlclose(dp);
#else
    callfn();
#endif

得到错误:

./test: symbol lookup error: ./libdy.so: undefined symbol: _Z5mfuncv

结论:基于以上,我们知道,如果一个动态库依赖了一些外部符号,这些外部符号可以位于其他动态库甚至应用程序中。我们可以再链接这个动态库的时候就把依赖的其他库也链接上,或者推迟到链接应用程序时再链接。而动态加载的库,则要保证在加载该库时,进程中加载的其他动态库里已经存在该符号。

例如,通过LD_PRELOAD环境变量可以让一个进程先加载指定的动态库,上面那个动态加载启动失败的例子,可以通过预先加载包含mfunc符号的动态库解决:

$ LD_PRELOAD=libmfun.so ./test
...

但是如果这个符号存在于可执行程序中则不行:

$ nm test | grep mfunc
0000000000400a00 T _Z5mfuncv
$ nm test | grep mfunc
0000000000400a00 T _Z5mfuncv
$ ./test
...
./test: symbol lookup error: ./libdy.so: undefined symbol: _Z5mfuncv

符号覆盖

前面主要讲的是符号缺少的情况,如果同一个符号存在多分,则更能引发问题。这里谈到的符号都是全局符号,一个进程中某个全局符号始终是全局唯一的。为了保证这一点,在链接或动态载入动态库时,就会出现忽略重复符号的情况。

这里就不提同一个链接单位(如可执行程序、动态库)里符号重复的问题了

函数

当动态库和libdy.so可执行程序test中包含同名的函数时会怎样?根据是否动态加载情况还有所不同。

当直接链接动态库时,libdy.so和test都会链接包含func函数的fun.o,为了区分,我把func按照条件编译得到不同的版本:

// fun.cpp
#ifdef V2
extern "C" void func() {
    printf("func v2\n");
}
#else
extern "C" void func() {
    printf("func v1\n");
}
#endif

// Makefile
test: libdy obj.o mainfn
    g++ -g -Wall -c fun.cpp -o fun.o # 编译为fun.o
    g++ -g -Wall -c main.cpp #-DDY_LOAD
    g++ -g -Wall -o test main.o obj.o fun.o -ldl mfun.o -ldy -L.

libdy: obj
    g++ -Wall -fPIC -c fun.cpp -DV2 -o fun-dy.o  # 定义V2宏,编译为fun-dy.o
    g++ -Wall -fPIC -shared -o libdy.so dy.cpp -g obj.o fun-dy.o

这样,test中的func就会输出func v1;libdy.so中的func就会输出func v2。test和libdy.o确实都有func符号:

$ nm libdy.so | grep func
0000000000000a60 T func

$nm test | grep func
0000000000400a80 T func

在test和libdy.so中都会调用func函数:

// main.cpp test
int main(int argc, char **argv) {
    func();
    ...
    callfn(); // 调用libdy.so中的函数
    ...
}

// dy.cpp libdy.so
extern "C" void callfn() {
    ... 
    printf("callfn\n");
    func();
    ...
}

运行后发现,都调用的是同一个func

$ ./test
...
func v1
...
callfn
func v1

结论,直接链接动态库时,整个程序运行的时候符号会发生覆盖,只有一个符号被使用。在实践中,如果程序和链接的动态库都依赖了一个静态库,而后他们链接的这个静态库版本不同,则很有可能因为符号发生了覆盖而导致问题。(静态库同普通的.o性质一样,参考浅析静态库链接原理)

更复杂的情况中,多个动态库和程序都有相同的符号,情况也是一样,会发生符号覆盖。如果程序里没有这个符号,而多个动态库里有相同的符号,也会覆盖。

但是对于动态载入的情况则不同,同样的libdy.so我们在test中不链接,而是动态载入:

int main(int argc, char **argv) {
    func();
#ifdef DY_LOAD
    void *dp = dlopen("./libdy.so", RTLD_LAZY);
    typedef void (*callfn)();
    callfn f = (callfn) dlsym(dp, "callfn");
    f();
    func();
    dlclose(dp);
#else
    callfn();
#endif
    return 0;
}

运行得到:

$ ./test
func v1
...
callfn
func v2
func v1

都正确地调用到各自链接的func

结论,实践中,动态载入的动态库一般会作为插件使用,那么其同程序链接不同版本的静态库(相同符号不同实现),是没有问题的。

变量

变量本质上也是符号(symbol),但其处理规则和函数还有点不一样(是不是有点想吐槽了)。

// object.h
class Object {
public:
    Object() {
#ifdef DF
        s = malloc(32);
        printf("s addr %p\n", s);
#endif
        printf("ctor %p\n", this);
    }

    ~Object() {
        printf("dtor %p\n", this);
#ifdef DF
        printf("s addr %p\n", s);
        free(s);
#endif
    }

    void *s;
};

extern Object g_obj;

我们的程序test和动态库libdy.so都会链接object.o。首先测试test链接libdy.so,test和libdy.so中都会有g_obj这个符号:

// B g_obj 表示g_obj位于BSS段,未初始化段

$ nm test | grep g_obj
0000000000400a14 t _GLOBAL__I_g_obj
00000000006012c8 B g_obj
$ nm libdy.so | grep g_obj
000000000000097c t _GLOBAL__I_g_obj
0000000000200f30 B g_obj

运行:

$ ./test
ctor 0x6012c8
ctor 0x6012c8
...
dtor 0x6012c8
dtor 0x6012c8

g_obj被构造了两次,但地址一样。全局变量只有一个实例,似乎在情理之中。

动态载入libdy.so,变量地址还是相同的:

$ ./test
ctor 0x6012a8
...
ctor 0x6012a8
...
dtor 0x6012a8
dtor 0x6012a8

结论,不同于函数,全局变量符号重复时,不论动态库是动态载入还是直接链接,变量始终只有一个。

但诡异的情况是,对象被构造和析构了两次。构造两次倒无所谓,浪费点空间,但是析构两次就有问题。因为析构时都操作的是同一个对象,那么如果这个对象内部有分配的内存,那就会对这块内存造成double free,因为指针相同。打开DF宏实验下:

$ ./test
s addr 0x20de010
ctor 0x6012b8
s addr 0x20de040
ctor 0x6012b8
...
dtor 0x6012b8
s addr 0x20de040
dtor 0x6012b8
s addr 0x20de040

因为析构的两次都是同一个对象,所以其成员s指向的内存被释放了两次,从而产生了double free,让程序coredump了。

总结,全局变量符号重复时,始终会只使用一个,并且会被初始化/释放两次,是一种较危险的情况,应当避免在使用动态库的过程中使用全局变量。

posted @ 2014-11-04 00:55 Kevin Lynx 阅读(7909) | 评论 (1)编辑 收藏

图解zookeeper FastLeader选举算法

zookeeper配置为集群模式时,在启动或异常情况时会选举出一个实例作为Leader。其默认选举算法为FastLeaderElection

不知道zookeeper的可以考虑这样一个问题:某个服务可以配置为多个实例共同构成一个集群对外提供服务。其每一个实例本地都存有冗余数据,每一个实例都可以直接对外提供读写服务。在这个集群中为了保证数据的一致性,需要有一个Leader来协调一些事务。那么问题来了:如何确定哪一个实例是Leader呢?

问题的难点在于:

  • 没有一个仲裁者来选定Leader
  • 每一个实例本地可能已经存在数据,不确定哪个实例上的数据是最新的

分布式选举算法正是用来解决这个问题的。

本文基于zookeeper 3.4.6 的源码进行分析。FastLeaderElection算法的源码全部位于FastLeaderElection.java文件中,其对外接口为FastLeaderElection.lookForLeader,该接口是一个同步接口,直到选举结束才会返回。同样由于网上已有类似文章,所以我就从图示的角度来阐述。阅读一些其他文章有利于获得初步印象:

主要流程

阅读代码和以上推荐文章可以把整个流程梳理清楚。实现上,包括了一个消息处理主循环,也是选举的主要逻辑,以及一个消息发送队列处理线程和消息解码线程。主要流程可概括为下图:

fle-flow.png

推荐对照着推荐的文章及代码理解,不赘述。

我们从感性上来理解这个算法。

每一个节点,相当于一个选民,他们都有自己的推荐人,最开始他们都推荐自己。谁更适合成为Leader有一个简单的规则,例如sid够大(配置)、持有的数据够新(zxid够大)。每个选民都告诉其他选民自己目前的推荐人是谁,类似于出去搞宣传拉拢其他选民。每一个选民发现有比自己更适合的人时就转而推荐这个更适合的人。最后,大部分人意见一致时,就可以结束选举。

就这么简单。总体上有一种不断演化逼近结果的感觉。

当然,会有些特殊情况的处理。例如总共3个选民,1和2已经确定3是Leader,但3还不知情,此时就走入LEADING/FOLLOWING的分支,选民3只是接收结果。

代码中不是所有逻辑都在这个大流程中完成的。在接收消息线程中,还可能单独地回应某个节点(WorkerReceiver.run):

recv.png

从这里可以看出,当某个节点已经确定选举结果不再处于LOOKING状态时,其收到LOOKING消息时都会直接回应选举的最终结果。结合上面那个比方,相当于某次选举结束了,这个时候来了选民4又发起一次新的选举,那么其他选民就直接告诉它当前的Leader情况。相当于,在这个集群主从已经就绪的情况下,又开启了一个实例,这个实例就会直接使用当前的选举结果。

状态转换

每个节点上有一些关键的数据结构:

  • 当前推荐人,初始推荐自己,每次收到其他更好的推荐人时就更新
  • 其他人的投票集合,用于确定何时选举结束

每次推荐人更新时就会进行广播,正是这个不断地广播驱动整个算法趋向于结果。假设有3个节点A/B/C,其都还没有数据,按照sid关系为C>B>A,那么按照规则,C更可能成为Leader,其各个节点的状态转换为:

state.png

图中,v(A)表示当前推荐人为A;r[]表示收到的投票集合。

可以看看当其他节点已经确定投票结果时,即不再是LOOKING时的状态:

state-ret.png

代码中有一个特殊的投票集合outofelection,我理解为选举已结束的那些投票,这些投票仅用于表征选举结果。

当一个新启动的节点加入集群时,它对集群内其他节点发出投票请求,而其他节点已不处于LOOKING状态,此时其他节点回应选举结果,该节点收集这些结果到outofelection中,最终在收到合法LEADER消息且这些选票也构成选举结束条件时,该节点就结束自己的选举行为。注意到代码中会logicalclock = n.electionEpoch;更新选举轮数

posted @ 2014-10-19 15:58 Kevin Lynx 阅读(4456) | 评论 (0)编辑 收藏

图解分布式一致性协议Paxos

Paxos协议/算法是分布式系统中比较重要的协议,它有多重要呢?

<分布式系统的事务处理>

Google Chubby的作者Mike Burrows说过这个世界上只有一种一致性算法,那就是Paxos,其它的算法都是残次品。

<大规模分布式存储系统>

理解了这两个分布式协议之后(Paxos/2PC),学习其他分布式协议会变得相当容易。

学习Paxos算法有两部分:a) 算法的原理/证明;b) 算法的理解/运作。

理解这个算法的运作过程其实基本就可以用于工程实践。而且理解这个过程相对来说也容易得多。

网上我觉得讲Paxos讲的好的属于这篇:paxos图解Paxos算法详解,我这里就结合wiki上的实例进一步阐述。一些paxos基础通过这里提到的两篇文章,以及wiki上的内容基本可以理解。

算法内容

Paxos在原作者的《Paxos Made Simple》中内容是比较精简的:

Phase 1

(a) A proposer selects a proposal number n and sends a prepare request with number n to a majority of acceptors.

(b) If an acceptor receives a prepare request with number n greater than that of any prepare request to which it has already responded, then it responds to the request with a promise not to accept any more proposals numbered less than n and with the highest-numbered pro-posal (if any) that it has accepted.

Phase 2

(a) If the proposer receives a response to its prepare requests (numbered n) from a majority of acceptors, then it sends an accept request to each of those acceptors for a proposal numbered n with a value v , where v is the value of the highest-numbered proposal among the responses, or is any value if the responses reported no proposals.

(b) If an acceptor receives an accept request for a proposal numbered n, it accepts the proposal unless it has already responded to a prepare request having a number greater than n.

借用paxos图解文中的流程图可概括为:

实例及详解

Paxos中有三类角色ProposerAcceptorLearner,主要交互过程在ProposerAcceptor之间。

ProposerAcceptor之间的交互主要有4类消息通信,如下图:

这4类消息对应于paxos算法的两个阶段4个过程:

  • phase 1
    • a) proposer向网络内超过半数的acceptor发送prepare消息
    • b) acceptor正常情况下回复promise消息
  • phase 2
    • a) 在有足够多acceptor回复promise消息时,proposer发送accept消息
    • b) 正常情况下acceptor回复accepted消息

因为在整个过程中可能有其他proposer针对同一件事情发出以上请求,所以在每个过程中都会有些特殊情况处理,这也是为了达成一致性所做的事情。如果在整个过程中没有其他proposer来竞争,那么这个操作的结果就是确定无异议的。但是如果有其他proposer的话,情况就不一样了。

paxos中文wiki上的例子为例。简单来说该例子以若干个议员提议税收,确定最终通过的法案税收比例。

以下图中基本只画出proposer与一个acceptor的交互。时间标志T2总是在T1后面。propose number简称N。

情况之一如下图:

A3在T1发出accepted给A1,然后在T2收到A5的prepare,在T3的时候A1才通知A5最终结果(税率10%)。这里会有两种情况:

  • A5发来的N5小于A1发出去的N1,那么A3直接拒绝(reject)A5
  • A5发来的N5大于A1发出去的N1,那么A3回复promise,但带上A1的(N1, 10%)

这里可以与paxos流程图对应起来,更好理解。acceptor会记录(MaxN, AcceptN, AcceptV)

A5在收到promise后,后续的流程可以顺利进行。但是发出accept时,因为收到了(AcceptN, AcceptV),所以会取最大的AcceptN对应的AcceptV,例子中也就是A1的10%作为AcceptV。如果在收到promise时没有发现有其他已记录的AcceptV,则其值可以由自己决定。

针对以上A1和A5冲突的情况,最终A1和A5都会广播接受的值为10%。

其实4个过程中对于acceptor而言,在回复promise和accepted时由于都可能因为其他proposer的介入而导致特殊处理。所以基本上看在这两个时间点收到其他proposer的请求时就可以了解整个算法了。例如在回复promise时则可能因为proposer发来的N不够大而reject:

如果在发accepted消息时,对其他更大N的proposer发出过promise,那么也会reject该proposer发出的accept,如图:

这个对应于Phase 2 b):

it accepts the proposal unless it has already responded to a prepare request having a number greater than n.

总结

Leslie Lamport没有用数学描述Paxos,但是他用英文阐述得很清晰。将Paxos的两个Phase的内容理解清楚,整个算法过程还是不复杂的。

至于Paxos中一直提到的一个全局唯一且递增的proposer number,其如何实现,引用如下:

如何产生唯一的编号呢?在《Paxos made simple》中提到的是让所有的Proposer都从不相交的数据集合中进行选择,例如系统有5个Proposer,则可为每一个Proposer分配一个标识j(0~4),则每一个proposer每次提出决议的编号可以为5*i + j(i可以用来表示提出议案的次数)

参考文档

posted @ 2014-10-15 22:45 Kevin Lynx 阅读(10311) | 评论 (6)编辑 收藏

淘宝分布式配置管理服务Diamond

在一个分布式环境中,同类型的服务往往会部署很多实例。这些实例使用了一些配置,为了更好地维护这些配置就产生了配置管理服务。通过这个服务可以轻松地管理这些应用服务的配置问题。应用场景可概括为:

zookeeper的一种应用就是分布式配置管理(基于ZooKeeper的配置信息存储方案的设计与实现)。百度也有类似的实现:disconf

Diamond则是淘宝开源的一种分布式配置管理服务的实现。Diamond本质上是一个Java写的Web应用,其对外提供接口都是基于HTTP协议的,在阅读代码时可以从实现各个接口的controller入手。

分布式配置管理

分布式配置管理的本质基本上就是一种推送-订阅模式的运用。配置的应用方是订阅者,配置管理服务则是推送方。概括为下图:

其中,客户端包括管理人员publish数据到配置管理服务,可以理解为添加/更新数据;配置管理服务notify数据到订阅者,可以理解为推送。

配置管理服务往往会封装一个客户端库,应用方则是基于该库与配置管理服务进行交互。在实际实现时,客户端库可能是主动拉取(pull)数据,但对于应用方而言,一般是一种事件通知方式。

Diamond中的数据是简单的key-value结构。应用方订阅数据则是基于key来订阅,未订阅的数据当然不会被推送。数据从类型上又划分为聚合和非聚合。因为数据推送者可能很多,在整个分布式环境中,可能有多个推送者在推送相同key的数据,这些数据如果是聚合的,那么所有这些推送者推送的数据会被合并在一起;反之如果是非聚合的,则会出现覆盖。

数据的来源可能是人工通过管理端录入,也可能是其他服务通过配置管理服务的推送接口自动录入。

架构及实现

Diamond服务是一个集群,是一个去除了单点的协作集群。如图:

图中可分为以下部分讲解:

服务之间同步

Diamond服务集群每一个实例都可以对外完整地提供服务,那么意味着每个实例上都有整个集群维护的数据。Diamond有两种方式保证这一点:

  • 任何一个实例都有其他实例的地址;任何一个实例上的数据变更时,都会将改变的数据同步到mysql上,然后通知其他所有实例从mysql上进行一次数据拉取(DumpService::dump),这个过程只拉取改变了的数据
  • 任何一个实例启动后都会以较长的时间间隔(几小时),从mysql进行一次全量的数据拉取(DumpAllProcessor)

实现上为了一致性,通知其他实例实际上也包含自己。以服务器收到添加聚合数据为例,处理过程大致为:

DatumController::addDatum // /datum.do?method=addDatum
    PersistService::addAggrConfigInfo 
    MergeDatumService::addMergeTask // 添加一个MergeDataTask,异步处理

MergeTaskProcessor::process
    PersistService::insertOrUpdate
        EventDispatcher.fireEvent(new ConfigDataChangeEvent // 派发一个ConfigDataChangeEvent事件

NotifyService::onEvent // 接收事件并处理
    TaskManager::addTask(..., new NotifyTask // 由此,当数据发生变动,则最终创建了一个NoticyTask

// NotifyTask同样异步处理
NotifyTaskProcessor::process
    foreach server in serverList // 包含自己
        notifyToDump // 调用 /notify.do?method=notifyConfigInfo 从mysql更新变动的数据

虽然Diamond去除了单点问题,不过问题都下降到了mysql上。但由于其作为配置管理的定位,其数据量就mysql的应用而言算小的了,所以可以一定程度上保证整个服务的可用性。

数据一致性

由于Diamond服务器没有master,任何一个实例都可以读写数据,那么针对同一个key的数据则可能面临冲突。这里应该是通过mysql来保证数据的一致性。每一次客户端请求写数据时,Diamond都将写请求投递给mysql,然后通知集群内所有Diamond实例(包括自己)从mysql拉取数据。当然,拉取数据则可能不是每一次写入都能拉出来,也就是最终一致性。

Diamond中没有把数据放入内存,但会放到本地文件。对于客户端的读操作而言,则是直接返回本地文件里的数据。

服务实例列表

Diamond服务实例列表是一份静态数据,直接将每个实例的地址存放在一个web server上。无论是Diamond服务还是客户端都从该web server上取出实例列表。

对于客户端而言,当其取出了该列表后,则是随机选择一个节点(ServerListManager.java),以后的请求都会发往该节点。

数据同步

客户端库中以固定时间间隔从服务器拉取数据(ClientWorker::ClientWorkerClientWorker::checkServerConfigInfo)。只有应用方关心的数据才可能被拉取。另外,为了数据推送的及时,Diamond还使用了一种long polling的技术,其实也是为了突破HTTP协议的局限性。如果整个服务是基于TCP的自定义协议,客户端与服务器保持长连接则没有这些问题

数据的变更

Diamond中很多操作都会检查数据是否发生了变化。标识数据变化则是基于数据对应的MD5值来实现的。

容灾

在整个Diamond系统中,几个角色为了提高容灾性,都有自己的缓存,概括为下图:

每一个角色出问题时,都可以尽量保证客户端对应用层提供服务。

参考文档

posted @ 2014-10-12 12:57 Kevin Lynx 阅读(2698) | 评论 (4)编辑 收藏

仅列出标题
共12页: 1 2 3 4 5 6 7 8 9 Last