to myself 的分类学习日志

做自己想做的事
posts - 232, comments - 6, trackbacks - 0, articles - 0

ZooKeeper的使用

Posted on 2015-02-27 11:15 kongkongzi 阅读(1090) 评论(0)  编辑 收藏 引用 所属分类: search engine
总的使用场景:


使用设计1(/Configuration):


OurService的代码示例:
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     
http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 
*/

#include <zookeeper.h>
#include <proto.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#ifndef WIN32
#include <sys/time.h>
#include <unistd.h>
#include <sys/select.h>
#else
#include "winport.h"
//#include <io.h> <-- can't include, conflicting definitions of close()
int read(int _FileHandle, void * _DstBuf, unsigned int _MaxCharCount);
int write(int _Filehandle, const void * _Buf, unsigned int _MaxCharCount);
#define ctime_r(tctime, buffer) ctime_s (buffer, 40, tctime)
#endif

#include <time.h>
#include <errno.h>
#include <assert.h>

#define _LL_CAST_ (long long)

static zhandle_t *zh;
static clientid_t myid;
static const char *clientIdFile = 0;
struct timeval startTime;
static int shutdownThisThing = 0;

static const char* state2String(int state)
{
    if (state == 0)
        return "CLOSED_STATE";
    if (state == ZOO_CONNECTING_STATE)
        return "CONNECTING_STATE";
    if (state == ZOO_ASSOCIATING_STATE)
        return "ASSOCIATING_STATE";
    if (state == ZOO_CONNECTED_STATE)
        return "CONNECTED_STATE";
    if (state == ZOO_EXPIRED_SESSION_STATE)
        return "EXPIRED_SESSION_STATE";
    if (state == ZOO_AUTH_FAILED_STATE)
        return "AUTH_FAILED_STATE";

    return "INVALID_STATE";
}

static const char* type2String(int state)
{
    if (state == ZOO_CREATED_EVENT)
        return "CREATED_EVENT";
    if (state == ZOO_DELETED_EVENT)
        return "DELETED_EVENT";
    if (state == ZOO_CHANGED_EVENT)
        return "CHANGED_EVENT";
    if (state == ZOO_CHILD_EVENT)
        return "CHILD_EVENT";
    if (state == ZOO_SESSION_EVENT)
        return "SESSION_EVENT";
    if (state == ZOO_NOTWATCHING_EVENT)
        return "NOTWATCHING_EVENT";

    return "UNKNOWN_EVENT_TYPE";
}

void dumpStat(const struct Stat *stat)
{
    char tctimes[40];
    char tmtimes[40];
    time_t tctime;
    time_t tmtime;

    if (!stat)
    {
        fprintf(stderr, "null\n");
        return;
    }
    tctime = stat->ctime / 1000;
    tmtime = stat->mtime / 1000;

    ctime_r(&tmtime, tmtimes);
    ctime_r(&tctime, tctimes);

    fprintf(stderr, "\tctime = %s\tczxid=%llx\n"
            "\tmtime=%s\tmzxid=%llx\n"
            "\tversion=%x\taversion=%x\n"
            "\tephemeralOwner = %llx\n", tctimes, _LL_CAST_ stat->czxid,
            tmtimes,
            _LL_CAST_ stat->mzxid, (unsigned int) stat->version,
            (unsigned int) stat->aversion,
            _LL_CAST_ stat->ephemeralOwner);
}

void my_data_completion(int rc, const char *value, int value_len,
        const struct Stat *stat, const void *data)
{
    struct timeval tv;
    int sec;
    int usec;
    gettimeofday(&tv, 0);
    sec = tv.tv_sec - startTime.tv_sec;
    usec = tv.tv_usec - startTime.tv_usec;
    fprintf(stderr, "time = %d msec\n", sec * 1000 + usec / 1000);
    fprintf(stderr, "%s: rc = %d\n", (char*) data, rc);
    if (value)
    {
        fprintf(stderr, " value_len = %d\n", value_len);
        assert(write(2, value, value_len) == value_len);
    }
    fprintf(stderr, "\nStat:\n");
    dumpStat(stat);
    free((void*) data);
}


void my_strings_completion(int rc, const struct String_vector *strings,
        const void *data)
{
    struct timeval tv;
    int sec;
    int usec;
    int i;

    gettimeofday(&tv, 0);
    sec = tv.tv_sec - startTime.tv_sec;
    usec = tv.tv_usec - startTime.tv_usec;
    fprintf(stderr, "time = %d msec\n", sec * 1000 + usec / 1000);
    fprintf(stderr, "%s: rc = %d\n", (char*) data, rc);
    if (strings)
        for (i = 0; i < strings->count; i++)
        {
            fprintf(stderr, "\t%s\n", strings->data[i]);
        }
    free((void*) data);
    gettimeofday(&tv, 0);
    sec = tv.tv_sec - startTime.tv_sec;
    usec = tv.tv_usec - startTime.tv_usec;
    fprintf(stderr, "time = %d msec\n", sec * 1000 + usec / 1000);
}

void watcher(zhandle_t *zzh, int type, int state, const char *path,
        void* context)
{
    /* Be careful using zh here rather than zzh - as this may be mt code
     * the client lib may call the watcher before zookeeper_init returns 
*/

    fprintf(stderr, "Watcher %s state = %s", type2String(type),
            state2String(state));
    if (path && strlen(path) > 0)
    {
        fprintf(stderr, " for path %s", path);
    }
    fprintf(stderr, "\n");

    if (type == ZOO_SESSION_EVENT)
    {
        if (state == ZOO_CONNECTED_STATE)
        {
            const clientid_t *id = zoo_client_id(zzh);
            if (myid.client_id == 0 || myid.client_id != id->client_id)
            {
                myid = *id;
                fprintf(stderr, "Got a new session id: 0x%llx\n",
                _LL_CAST_ myid.client_id);
                if (clientIdFile)
                {
                    FILE *fh = fopen(clientIdFile, "w");
                    if (!fh)
                    {
                        perror(clientIdFile);
                    }
                    else
                    {
                        int rc = fwrite(&myid, sizeof(myid), 1, fh);
                        if (rc != sizeof(myid))
                        {
                            perror("writing client id");
                        }
                        fclose(fh);
                    }
                }
            }
        }
        else if (state == ZOO_AUTH_FAILED_STATE)
        {
            fprintf(stderr, "Authentication failure. Shutting down\n");
            zookeeper_close(zzh);
            shutdownThisThing = 1;
            zh = 0;
        }
        else if (state == ZOO_EXPIRED_SESSION_STATE)
        {
            fprintf(stderr, "Session expired. Shutting down\n");
            zookeeper_close(zzh);
            shutdownThisThing = 1;
            zh = 0;
        }
    }
    else if (type == ZOO_CHILD_EVENT)
    {
        gettimeofday(&startTime, 0);
        const char *path = "/";
        int rc = zoo_aget_children(zh, path, 1, my_strings_completion, strdup(path));
        if (rc)
        {
            fprintf(stderr, "Error %d for zoo_aget_children %s\n", rc, path);
        }
    }
    else if (type == ZOO_CHANGED_EVENT)
    {
        gettimeofday(&startTime, 0);
        const char *sub_path = "/zk_test";
        int rc = zoo_aget(zh, sub_path, 1, my_data_completion, strdup(sub_path));
        if (rc)
        {
            fprintf(stderr, "Error %d for zoo_aget %s\n", rc, sub_path);
        }
    }
}


int main(int argc, char **argv)
{
    fd_set rfds, wfds, efds;
    FILE *fh;

    if (argc < 2)
    {
        fprintf(stderr, "USAGE %s zookeeper_host_list [clientid_file]\n", argv[0]);
        fprintf(stderr, "Version: ZooKeeper cli (c client) version %d.%d.%d\n",
                ZOO_MAJOR_VERSION, ZOO_MINOR_VERSION, ZOO_PATCH_VERSION);
        return 2;
    }
    if (argc > 2)
    {
            clientIdFile = argv[2];
            fh = fopen(clientIdFile, "r");
            if (fh)
            {
                if (fread(&myid, sizeof(myid), 1, fh) != sizeof(myid))
                {
                    memset(&myid, 0, sizeof(myid));
                }
                fclose(fh);
            }
    }

    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
    zoo_deterministic_conn_order(1); // enable deterministic order
    const char *hostPort = argv[1];
    zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
    if (!zh)
    {
        return errno;
    }

    int rc;

    gettimeofday(&startTime, 0);
    const char *path = "/";
    rc = zoo_aget_children(zh, path, 1, my_strings_completion, strdup(path));
    if (rc)
    {
        fprintf(stderr, "Error %d for zoo_aget_children %s\n", rc, path);
    }

    const char *sub_path = "/zk_test";
    rc = zoo_aget(zh, sub_path, 1, my_data_completion, strdup(sub_path));
    if (rc)
    {
        fprintf(stderr, "Error %d for zoo_aget %s\n", rc, sub_path);
    }

    FD_ZERO(&rfds);
    FD_ZERO(&wfds);
    FD_ZERO(&efds);
    while (1)
    {
        if (shutdownThisThing) // reinit
        {
            zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
            if (!zh)
            {
                return errno;
            }

            gettimeofday(&startTime, 0);
            const char *path = "/";
            rc = zoo_aget_children(zh, path, 1, my_strings_completion, strdup(path));
            if (rc)
            {
                fprintf(stderr, "Error %d for zoo_aget_children %s\n", rc, path);
            }
        }

        int fd;
        int interest;
        int events;
        struct timeval tv;

        rc = zookeeper_interest(zh, &fd, &interest, &tv);
        if (rc != ZOK)
        {
            fprintf(stderr, "fail to interest zookeeper, return %d.\n", rc);
        }

        if (fd != -1)
        {
            if (interest & ZOOKEEPER_READ)
            {
                FD_SET(fd, &rfds);
            }
            else
            {
                FD_CLR(fd, &rfds);
            }
            if (interest & ZOOKEEPER_WRITE)
            {
                FD_SET(fd, &wfds);
            }
            else
            {
                FD_CLR(fd, &wfds);
            }
        }
        else
        {
            fd = 0;
        }

        rc = select(fd + 1, &rfds, &wfds, &efds, &tv);
        events = 0;
        if (rc > 0)
        {
            if (FD_ISSET(fd, &rfds))
            {
                events |= ZOOKEEPER_READ;
            }
            if (FD_ISSET(fd, &wfds))
            {
                events |= ZOOKEEPER_WRITE;
            }
        }

//        fprintf(stderr, "event %d has happened.\n", events);
        zookeeper_process(zh, events);
    }

    zookeeper_close(zh);
    return 0;
}
OurService示例代码的功能:
   1. 监视根目录下znode的变化情况。
   2. 监控znode /zk_test 的值的变化情况。


使用设计2(/GroupMembers or /Apps):
App在启动的时候连接到ZooKeeper,并创建一个对应的临时的znode,当该App崩溃时,该znode会被删除。创建临时znode的代码示例如下:
const char *path = "/Apps/App1";
const char *value = "new";
int flags = ZOO_EPHEMERAL;
zoo_acreate(zh, path, value, strlen(value), &ZOO_OPEN_ACL_UNSAFE, flags,
                my_string_completion_free_data, strdup(path));

使用设计3(Lock):
create znode 时使用 ZOO_SEQUENCE 标志可以实现为指定的path追加序列号,序列号从“0000000000”开始,每创建一次,序列号加1。
const char *path = "/_locknode_/lock-";
const char *value = "new";
int flags = ZOO_EPHEMERAL | ZOO_SEQUENCE;
zoo_acreate(zh, path, value, strlen(value), &ZOO_OPEN_ACL_UNSAFE, flags,
                my_string_completion_free_data, strdup(path));
注:
1. zoo_acreate 之后会在回调函数 my_string_completion_free_data 中返回本次创建的path的名字。
2. 在创建完成之后,调用 zoo_aget_children("/_locknode_") 可以获得该目录下的znode列表,比较自己创建的path name和这个列表,看看自己的序列号是不是最小的,如果是,那么就认为自己获得了锁。




只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理