/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/#include <zookeeper.h>
#include <proto.h>
#include <stdlib.h>
#include <stdio.h>
#include <
string.h>
#ifndef WIN32
#include <sys/time.h>
#include <unistd.h>
#include <sys/select.h>
#else#include "winport.h"
//#include <io.h> <-- can't include, conflicting definitions of close()
int read(
int _FileHandle,
void * _DstBuf, unsigned
int _MaxCharCount);
int write(
int _Filehandle,
const void * _Buf, unsigned
int _MaxCharCount);
#define ctime_r(tctime, buffer) ctime_s (buffer, 40, tctime)
#endif#include <time.h>
#include <errno.h>
#include <assert.h>
#define _LL_CAST_ (long long)
static zhandle_t *zh;
static clientid_t myid;
static const char *clientIdFile = 0;
struct timeval startTime;
static int shutdownThisThing = 0;
static const char* state2String(
int state)
{
if (state == 0)
return "CLOSED_STATE";
if (state == ZOO_CONNECTING_STATE)
return "CONNECTING_STATE";
if (state == ZOO_ASSOCIATING_STATE)
return "ASSOCIATING_STATE";
if (state == ZOO_CONNECTED_STATE)
return "CONNECTED_STATE";
if (state == ZOO_EXPIRED_SESSION_STATE)
return "EXPIRED_SESSION_STATE";
if (state == ZOO_AUTH_FAILED_STATE)
return "AUTH_FAILED_STATE";
return "INVALID_STATE";
}
static const char* type2String(
int state)
{
if (state == ZOO_CREATED_EVENT)
return "CREATED_EVENT";
if (state == ZOO_DELETED_EVENT)
return "DELETED_EVENT";
if (state == ZOO_CHANGED_EVENT)
return "CHANGED_EVENT";
if (state == ZOO_CHILD_EVENT)
return "CHILD_EVENT";
if (state == ZOO_SESSION_EVENT)
return "SESSION_EVENT";
if (state == ZOO_NOTWATCHING_EVENT)
return "NOTWATCHING_EVENT";
return "UNKNOWN_EVENT_TYPE";
}
void dumpStat(
const struct Stat *stat)
{
char tctimes[40];
char tmtimes[40];
time_t tctime;
time_t tmtime;
if (!stat)
{
fprintf(stderr, "null\n");
return;
}
tctime = stat->ctime / 1000;
tmtime = stat->mtime / 1000;
ctime_r(&tmtime, tmtimes);
ctime_r(&tctime, tctimes);
fprintf(stderr, "\tctime = %s\tczxid=%llx\n"
"\tmtime=%s\tmzxid=%llx\n"
"\tversion=%x\taversion=%x\n"
"\tephemeralOwner = %llx\n", tctimes, _LL_CAST_ stat->czxid,
tmtimes,
_LL_CAST_ stat->mzxid, (unsigned
int) stat->version,
(unsigned
int) stat->aversion,
_LL_CAST_ stat->ephemeralOwner);
}
void my_data_completion(
int rc,
const char *value,
int value_len,
const struct Stat *stat,
const void *data)
{
struct timeval tv;
int sec;
int usec;
gettimeofday(&tv, 0);
sec = tv.tv_sec - startTime.tv_sec;
usec = tv.tv_usec - startTime.tv_usec;
fprintf(stderr, "time = %d msec\n", sec * 1000 + usec / 1000);
fprintf(stderr, "%s: rc = %d\n", (
char*) data, rc);
if (value)
{
fprintf(stderr, " value_len = %d\n", value_len);
assert(write(2, value, value_len) == value_len);
}
fprintf(stderr, "\nStat:\n");
dumpStat(stat);
free((
void*) data);
}
void my_strings_completion(
int rc,
const struct String_vector *strings,
const void *data)
{
struct timeval tv;
int sec;
int usec;
int i;
gettimeofday(&tv, 0);
sec = tv.tv_sec - startTime.tv_sec;
usec = tv.tv_usec - startTime.tv_usec;
fprintf(stderr, "time = %d msec\n", sec * 1000 + usec / 1000);
fprintf(stderr, "%s: rc = %d\n", (
char*) data, rc);
if (strings)
for (i = 0; i < strings->count; i++)
{
fprintf(stderr, "\t%s\n", strings->data[i]);
}
free((
void*) data);
gettimeofday(&tv, 0);
sec = tv.tv_sec - startTime.tv_sec;
usec = tv.tv_usec - startTime.tv_usec;
fprintf(stderr, "time = %d msec\n", sec * 1000 + usec / 1000);
}
void watcher(zhandle_t *zzh,
int type,
int state,
const char *path,
void* context)
{
/* Be careful using zh here rather than zzh - as this may be mt code
* the client lib may call the watcher before zookeeper_init returns */ fprintf(stderr, "Watcher %s state = %s", type2String(type),
state2String(state));
if (path && strlen(path) > 0)
{
fprintf(stderr, " for path %s", path);
}
fprintf(stderr, "\n");
if (type == ZOO_SESSION_EVENT)
{
if (state == ZOO_CONNECTED_STATE)
{
const clientid_t *id = zoo_client_id(zzh);
if (myid.client_id == 0 || myid.client_id != id->client_id)
{
myid = *id;
fprintf(stderr, "Got a new session id: 0x%llx\n",
_LL_CAST_ myid.client_id);
if (clientIdFile)
{
FILE *fh = fopen(clientIdFile, "w");
if (!fh)
{
perror(clientIdFile);
}
else {
int rc = fwrite(&myid,
sizeof(myid), 1, fh);
if (rc !=
sizeof(myid))
{
perror("writing client id");
}
fclose(fh);
}
}
}
}
else if (state == ZOO_AUTH_FAILED_STATE)
{
fprintf(stderr, "Authentication failure. Shutting down
\n");
zookeeper_close(zzh);
shutdownThisThing = 1;
zh = 0;
}
else if (state == ZOO_EXPIRED_SESSION_STATE)
{
fprintf(stderr, "Session expired. Shutting down
\n");
zookeeper_close(zzh);
shutdownThisThing = 1;
zh = 0;
}
}
else if (type == ZOO_CHILD_EVENT)
{
gettimeofday(&startTime, 0);
const char *path = "/";
int rc = zoo_aget_children(zh, path, 1, my_strings_completion, strdup(path));
if (rc)
{
fprintf(stderr, "Error %d for zoo_aget_children %s\n", rc, path);
}
}
else if (type == ZOO_CHANGED_EVENT)
{
gettimeofday(&startTime, 0);
const char *sub_path = "/zk_test";
int rc = zoo_aget(zh, sub_path, 1, my_data_completion, strdup(sub_path));
if (rc)
{
fprintf(stderr, "Error %d for zoo_aget %s\n", rc, sub_path);
}
}
}
int main(
int argc,
char **argv)
{
fd_set rfds, wfds, efds;
FILE *fh;
if (argc < 2)
{
fprintf(stderr, "USAGE %s zookeeper_host_list [clientid_file]\n", argv[0]);
fprintf(stderr, "Version: ZooKeeper cli (c client) version %d.%d.%d\n",
ZOO_MAJOR_VERSION, ZOO_MINOR_VERSION, ZOO_PATCH_VERSION);
return 2;
}
if (argc > 2)
{
clientIdFile = argv[2];
fh = fopen(clientIdFile, "r");
if (fh)
{
if (fread(&myid,
sizeof(myid), 1, fh) !=
sizeof(myid))
{
memset(&myid, 0,
sizeof(myid));
}
fclose(fh);
}
}
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
zoo_deterministic_conn_order(1);
// enable deterministic order
const char *hostPort = argv[1];
zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
if (!zh)
{
return errno;
}
int rc;
gettimeofday(&startTime, 0);
const char *path = "/";
rc = zoo_aget_children(zh, path, 1, my_strings_completion, strdup(path));
if (rc)
{
fprintf(stderr, "Error %d for zoo_aget_children %s\n", rc, path);
}
const char *sub_path = "/zk_test";
rc = zoo_aget(zh, sub_path, 1, my_data_completion, strdup(sub_path));
if (rc)
{
fprintf(stderr, "Error %d for zoo_aget %s\n", rc, sub_path);
}
FD_ZERO(&rfds);
FD_ZERO(&wfds);
FD_ZERO(&efds);
while (1)
{
if (shutdownThisThing)
// reinit
{
zh = zookeeper_init(hostPort, watcher, 30000, &myid, 0, 0);
if (!zh)
{
return errno;
}
gettimeofday(&startTime, 0);
const char *path = "/";
rc = zoo_aget_children(zh, path, 1, my_strings_completion, strdup(path));
if (rc)
{
fprintf(stderr, "Error %d for zoo_aget_children %s\n", rc, path);
}
}
int fd;
int interest;
int events;
struct timeval tv;
rc = zookeeper_interest(zh, &fd, &interest, &tv);
if (rc != ZOK)
{
fprintf(stderr, "fail to interest zookeeper, return %d.\n", rc);
}
if (fd != -1)
{
if (interest & ZOOKEEPER_READ)
{
FD_SET(fd, &rfds);
}
else {
FD_CLR(fd, &rfds);
}
if (interest & ZOOKEEPER_WRITE)
{
FD_SET(fd, &wfds);
}
else {
FD_CLR(fd, &wfds);
}
}
else {
fd = 0;
}
rc = select(fd + 1, &rfds, &wfds, &efds, &tv);
events = 0;
if (rc > 0)
{
if (FD_ISSET(fd, &rfds))
{
events |= ZOOKEEPER_READ;
}
if (FD_ISSET(fd, &wfds))
{
events |= ZOOKEEPER_WRITE;
}
}
// fprintf(stderr, "event %d has happened.\n", events);
zookeeper_process(zh, events);
}
zookeeper_close(zh);
return 0;
}
1. 监视根目录下znode的变化情况。
2. 监控znode /zk_test 的值的变化情况。
2. 在创建完成之后,调用 zoo_aget_children("/_locknode_") 可以获得该目录下的znode列表,比较自己创建的path name和这个列表,看看自己的序列号是不是最小的,如果是,那么就认为自己获得了锁。