Golang 基于 Zookeeper 实现的服务注册与发现
# 基于 Zookeeper 实现服务注册与发现
代码地址:https://github.com/xuqil/experiments/tree/main/zookeeper-client/registry
# 需求
在分布式系统中,一个集群可能会有多个 Server ,例如100个 Server 组成的集群,此时如果集群中有几台 Server 挂了,或者又有新的 Server 上线了,如果 Client 不清楚哪些 Server 是可用的,哪些是不可用的,就会导致服务异常(访问到异常的 Server 了)。那么 Client 怎么知道新上线了哪些 Server ,又怎么知道下线了哪些 Server 呢?
此时就需要用到 Server 的动态上下线,任意一台 Client 都能实时感知到主节点服务器的下线。负载均衡器(LoadBalancer,例如 NGINX、F5)可以实现, 使用 ETCD 和 Zookeeper 的 watch 机制也可以实现。
此处将使用 Zookeeper 实现一个服务发现与注册的案例。
# 服务注册与发现的流程
Zookeeper 实现服务注册与发现的流程:
- Server 启动时主动在
/servers
(可以自定义)下注册信息,在 Zookeeper 指定的 node 上创建临时节点,数据可以是 server 的一些关键信息; - Client 启动后,会从 Zookeeper 获取的
/servers
获取子节点,于获取当前在线的服务器列表,然后注册监听/servers
; - 当有 Server 下线时,由于 Server 在 Zookeeper 上创建的是临时节点,此时 Server 与 Zookeeper 的连接端开,Zookeeper 上
/servers
下对应的临时节点就会自动被删除; - 由于
/servers
下有临时节点删除事件发生,Zookeeper 就会通知 Client,有 Server 下线了;此时 Client 可以获取到最新的在线服务器列表; - 最后,Client 会重新注册监听
/servers
,如此往复。
# 服务注册的实现
// Registry 注册中心接口
type Registry interface {
Register(key string, data []byte) error
Unregister(key string) error
}
// ZkRegistry 实现了 Registry
type ZkRegistry struct {
RootPath string
Conn *zk.Conn
}
func NewZkRegistry(rootPath string, conn *zk.Conn) *ZkRegistry {
return &ZkRegistry{
RootPath: fmt.Sprintf("/%s", rootPath),
Conn: conn,
}
}
func (r *ZkRegistry) getAcl() []zk.ACL {
return zk.WorldACL(zk.PermAll)
}
func (r *ZkRegistry) getAbsPath(path string) string {
return fmt.Sprintf("%s/%s", r.RootPath, path)
}
// Register 向 zookeeper 创建一个临时节点
func (r *ZkRegistry) Register(path string, data []byte) error {
_, err := r.Conn.Create(r.getAbsPath(path), data, 3, r.getAcl())
return err
}
// Unregister 显式删除一个临时节点
func (r *ZkRegistry) Unregister(path string) error {
path = r.getAbsPath(path)
_, stat, err := r.Conn.Get(path)
if err != nil {
if errors.Is(err, zk.ErrNodeExists) {
return nil
}
return err
}
return r.Conn.Delete(path, stat.Version)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
上面代码定义了一个Registry
接口,包含了两个函数Register
(服务注册)与Unregister
(服务注销)。ZkRegistry
实现了Registry
接口,Server
端可以使用ZkRegistry
来实现服务的上下线。
# 服务发现的实现
// Client 客户端
type Client struct {
zk *ZkRegistry
serverChan chan []string
}
func NewClient(registry *ZkRegistry) *Client {
return &Client{
zk: registry,
serverChan: make(chan []string),
}
}
// GetServerList 获取服务列表
func (c *Client) GetServerList() <-chan []string {
go c.Watch()
return c.serverChan
}
func (c *Client) Business(fn func()) {
fn()
}
// Watch 监听一个节点的子节点,获取所有子节点的数据
func (c *Client) Watch() {
children, _, event, err := c.zk.Conn.ChildrenW(c.zk.RootPath)
if err != nil {
return
}
data := make([]string, 0)
for _, child := range children {
res, _, er := c.zk.Conn.Get(fmt.Sprintf("%s/%s", c.zk.RootPath, child))
if er != nil {
log.Println(er)
continue
}
data = append(data, string(res))
}
go c.watchChildren(event)
c.serverChan <- data
}
func (c *Client) watchChildren(event <-chan zk.Event) {
<-event
// 再次监听
c.Watch()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
跟上面所述服务发现的流程一样,Client
的GetServerList
方法可以获取在线的服务列表,Watch
方法可以监听 Zookeeper 上指定的 Node 下的所有的子节点的增删改事件。
# 测试
func getConn() *zk.Conn {
conn, _, err := zk.Connect([]string{"192.168.122.20:2181", "192.168.122.21:2181", "192.168.122.22:2181"},
5*time.Second)
if err != nil {
log.Fatal(err)
}
return conn
}
var registry = NewZkRegistry("servers", getConn())
// TestClientWatch 测试客户端的 watch
func TestClientWatch(t *testing.T) {
// 1. 初始化 Client ,连接 zookeeper
client := NewClient(registry)
// 2. 获取 servers 的子节点信息,从中获取服务器信息列表
go func() {
for server := range client.GetServerList() {
log.Println(server)
}
}()
// 3. 业务进程启动
client.Business(func() {
for {
time.Sleep(1 * time.Second)
}
})
}
// TestServerRegister 测试服务端上线
func TestServerRegister(t *testing.T) {
// 依次启动5个服务
for i := 0; i < 5; i++ {
// 1. 初始化 server,连接 zk
server := NewServer("server1", registry)
// 2. 注册服务器
server.Register()
// 3. 业务进程启动
server.Business(func() {
for {
time.Sleep(10 * time.Second)
}
})
time.Sleep(5 * time.Second)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
评论
上次更新: 2024/05/29, 14:25:22