Golang 操作 Zookeeper
# Golang 操作 Zookeeper
完整的源码地址:https://github.com/xuqil/experiments/tree/main/zookeeper-client
go-zookeeper 库地址:https://pkg.go.dev/github.com/go-zookeeper/zk@v1.0.3
# 连接 Zookeeper
conn, _, err := zk.Connect([]string{"192.168.122.20:2181", "192.168.122.21:2181", "192.168.122.22:2181"},
5*time.Second)
1
2
2
# 对 Zookeeper 的 CRUD
// getPath 获取 path 的 data 和 stat
// Cmd: get -s /path
func getPath(t *testing.T, conn *zk.Conn, path string) ([]byte, *zk.Stat) {
data, stat, err := conn.Get(path)
if err != nil {
t.Fatal(err)
}
return data, stat
}
// deletePath 删除指定的 path,如果 path 下还有 children path,会删除失败
// Cmd: delete /path
func deletePath(t *testing.T, conn *zk.Conn, path string) {
_, stat, err := conn.Get(path)
if err != nil {
t.Fatal(err)
}
// version是用于 CAS支持,可以通过此种方式保证原子性
if err := conn.Delete(path, stat.Version); err != nil {
t.Fatal(err)
}
}
// modifyData 修改节点的值
func modifyData(t *testing.T, conn *zk.Conn, path string, data []byte) {
// 获取 path 的属性
_, stat, err := conn.Get(path)
if err != nil {
t.Fatal(err)
}
stat, err = conn.Set(path, data, stat.Version)
if err != nil {
t.Fatal(err)
}
}
// createPath 创建节点
func createPath(t *testing.T, conn *zk.Conn, path string, data []byte, flags int32) string {
// 获取访问控制权限
acl := zk.WorldACL(zk.PermAll)
res, err := conn.Create(path, data, flags, acl)
if err != nil {
t.Fatal(err)
}
return res
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# CRUD 的测试示例
// TestCreatePersistentPath 创建一个持久化节点:/path1
// Cmd: create /path1
func TestCreatePersistentPath(t *testing.T) {
conn := initConn(t)
defer conn.Close()
// flags有4种取值:
// 0: 永久,除非手动删除
// zk.FlagEphemeral = 1: 短暂,session断开则该节点也被删除
// zk.FlagSequence = 2: 会自动在节点后面添加序号
// 3: Ephemeral和Sequence,即,短暂且自动添加序号
var flags int32 = 0
path := "/path1"
res := createPath(t, conn, path, []byte("data"), flags)
assert.Equal(t, path, res)
// 持久化的 path 需要显式删除
deletePath(t, conn, path)
}
// TestCreateEphemeralPath 创建一个临时节点:/path2
// Cmd: create -e /path2
func TestCreateEphemeralPath(t *testing.T) {
conn := initConn(t)
defer conn.Close()
var flags int32 = zk.FlagEphemeral
// 创建临时节点,连接断开后会自动删除
path := "/path2"
data := []byte("ephemeral")
res := createPath(t, conn, path, []byte("ephemeral"), flags)
assert.Equal(t, path, res)
// 获取刚刚创建的临时节点
resData, stat := getPath(t, conn, path)
assert.Equal(t, data, resData)
t.Logf("stat:%+v\n", *stat)
}
// TestCreatePersistentPathWithSequence 创建一个带序号的持久化节点:/path3
// Cmd: create -s /path3
func TestCreatePersistentPathWithSequence(t *testing.T) {
conn := initConn(t)
defer conn.Close()
// flags有4种取值:
// 0: 永久,除非手动删除
// zk.FlagEphemeral = 1: 短暂,session断开则该节点也被删除
// zk.FlagSequence = 2: 会自动在节点后面添加序号
// 3: Ephemeral和Sequence,即,短暂且自动添加序号
var flags int32 = zk.FlagSequence
path := "/path3"
data := []byte("data")
// 第一次创建
res := createPath(t, conn, path, data, flags)
assert.True(t, strings.HasPrefix(res, path))
// 持久化的 path 需要显式删除
deletePath(t, conn, res)
// 第二次创建
res = createPath(t, conn, path, data, flags)
assert.True(t, strings.HasPrefix(res, path))
// 持久化的 path 需要显式删除
deletePath(t, conn, res)
}
// TestCreateEphemeralPathWithSequence 创建一个临时节点:/path4
// Cmd: create -e -s /path4
func TestCreateEphemeralPathWithSequence(t *testing.T) {
conn := initConn(t)
defer conn.Close()
// flags有4种取值:
// 0: 永久,除非手动删除
// zk.FlagEphemeral = 1: 短暂,session断开则该节点也被删除
// zk.FlagSequence = 2: 会自动在节点后面添加序号
// 3: Ephemeral和Sequence,即,短暂且自动添加序号
var flags int32 = 3
// 创建临时节点,连接断开后会自动删除
path := "/path4"
data := []byte("ephemeral")
// 第一次创建
res := createPath(t, conn, path, data, flags)
assert.True(t, strings.HasPrefix(res, path))
// 第二次创建
res = createPath(t, conn, path, data, flags)
assert.True(t, strings.HasPrefix(res, path))
}
// TestModifyPathData 修改指定 path 的值
// Cmd: set /path "new data"
func TestModifyPathData(t *testing.T) {
conn := initConn(t)
defer conn.Close()
// flags有4种取值:
// 0: 永久,除非手动删除
// zk.FlagEphemeral = 1: 短暂,session断开则该节点也被删除
// zk.FlagSequence = 2: 会自动在节点后面添加序号
// 3: Ephemeral和Sequence,即,短暂且自动添加序号
var flags int32 = 0
path := "/path5"
data := []byte("data")
_ = createPath(t, conn, path, data, flags)
// 将 path5 的值修改为 new data
modifyData(t, conn, path, []byte("new data"))
afterData, _ := getPath(t, conn, path)
assert.Equal(t, []byte("new data"), afterData)
deletePath(t, conn, path)
}
// TestGetPathChildren 获取 path 下的所有 children
// Cmd: ls -s /path
func TestGetPathChildren(t *testing.T) {
conn := initConn(t)
defer conn.Close()
var flags int32 = 0
path := "/path6"
data := []byte("ephemeral")
_ = createPath(t, conn, path, data, flags)
_ = createPath(t, conn, path+"/ch1", data, flags)
_ = createPath(t, conn, path+"/ch2", data, flags)
children, _, err := conn.Children(path)
if err != nil {
t.Fatal(err)
}
assert.ElementsMatch(t, []string{"ch1", "ch2"}, children)
for _, child := range children {
deletePath(t, conn, path+"/"+child)
}
deletePath(t, conn, path)
}
// TestExists 判断节点是否存在
func TestExists(t *testing.T) {
conn := initConn(t)
defer conn.Close()
exists, _, err := conn.Exists("/noExist")
if err != nil {
t.Fatal(err)
}
assert.False(t, exists)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# Zookeeper 的 watch
在 go-zookeeper 中,golang 是通过 Event 来实现 watch 机制
# 全局监听
// TestWatchGlobal 全局 watch
func TestWatchGlobal(t *testing.T) {
// 创建监听的option,用于初始化zk
eventCallbackOption := zk.WithEventCallback(callback)
// 连接zk
conn, _, err := zk.Connect([]string{"192.168.122.20:2181", "192.168.122.21:2181", "192.168.122.22:2181"}, time.Second*5, eventCallbackOption)
defer conn.Close()
if err != nil {
t.Fatal(err)
}
data := []byte("hello")
var flags int32 = zk.FlagEphemeral
path := "/watchGlobal"
// 开始监听path
_, _, _, err = conn.ExistsW(path)
if err != nil {
t.Fatal(err)
}
// 触发创建操作
createPath(t, conn, path, data, flags)
//再次监听path
_, _, _, err = conn.ExistsW(path)
if err != nil {
t.Fatal(err)
}
// 触发删除操作
deletePath(t, conn, path)
}
// callback 回调函数
func callback(event zk.Event) {
if event.Err != nil {
log.Fatal(event.Err)
}
log.Println("发生 global watch 回调:")
log.Println("path:", event.Path)
log.Println("type:", event.Type.String())
log.Println("state:", event.State.String())
log.Println("---------------------------")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
提示:
- 如果即设置了全局监听又设置了部分监听,那么最终是都会触发的,并且全局监听会先执行
- 如果设置了监听子节点,那么事件的触发是先子节点后父节点
# 监听节点的值
// TestWatchPathData watch 节点的值
// Cmd: get -w /path
func TestWatchPathData(t *testing.T) {
conn := initConn(t)
path := "/watchData"
data := []byte("data")
var flags int32 = zk.FlagEphemeral
_ = createPath(t, conn, path, data, flags)
t.Log("before data:", string(data))
// 监听 path 的数据
_, _, event, err := conn.GetW(path)
if err != nil {
t.Error(err)
return
}
go watchData(event)
modifyData(t, conn, path, []byte("new data"))
afterData, _ := getPath(t, conn, path)
t.Log("after data:", string(afterData))
}
func watchData(e <-chan zk.Event) {
event := <-e
log.Println("发生 data watch 回调:")
log.Println("path:", event.Path)
log.Println("type:", event.Type.String())
log.Println("state:", event.State.String())
log.Println("---------------------------")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 监听节点的 Children Path
// TestWatchPathChildren watch 节点的子节点
// Cmd: ls -w /path
func TestWatchPathChildren(t *testing.T) {
conn, _, err := zk.Connect([]string{"192.168.122.20:2181", "192.168.122.21:2181", "192.168.122.22:2181"},
5*time.Second)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
path := "/watchData"
data := []byte("data")
var flags int32 = 0
_ = createPath(t, conn, path, data, flags)
// 监听 path 的数据
_, _, event, err := conn.ChildrenW(path)
if err != nil {
t.Error(err)
return
}
_ = createPath(t, conn, path+"/ch1", data, flags)
// 监听 path 的数据
_, _, event, err = conn.ChildrenW(path)
if err != nil {
t.Error(err)
return
}
_ = createPath(t, conn, path+"/ch2", data, flags)
go watchData(event)
modifyData(t, conn, path, []byte("new data"))
children, _, err := conn.Children(path)
if err != nil {
t.Fatal(err)
}
for _, child := range children {
deletePath(t, conn, path+"/"+child)
}
deletePath(t, conn, path)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
评论
上次更新: 2024/05/29, 14:25:22