FeelingLife FeelingLife
首页
  • Go

    • Go基础知识
  • Python

    • Python进阶
  • 操作系统
  • 计算机网络
  • MySQL
  • 学习笔记
  • 常用到的算法
  • Docker
  • Kubernetes
  • Observability
  • 容器底层
其他技术
  • 友情链接
  • 收藏
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

xuqil

一介帆夫
首页
  • Go

    • Go基础知识
  • Python

    • Python进阶
  • 操作系统
  • 计算机网络
  • MySQL
  • 学习笔记
  • 常用到的算法
  • Docker
  • Kubernetes
  • Observability
  • 容器底层
其他技术
  • 友情链接
  • 收藏
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Redis

  • Nginx

  • Linux

  • SSO和OAuth2

  • 分布式

    • 分布式锁

    • 服务注册与发现

      • Golang 基于 Zookeeper 实现的服务注册与发现
        • 需求
        • 服务注册与发现的流程
        • 服务注册的实现
        • 服务发现的实现
        • 测试
  • Other

  • F5

  • 其他技术
  • 分布式
  • 服务注册与发现
xuqil
2023-05-21
目录

Golang 基于 Zookeeper 实现的服务注册与发现

# 基于 Zookeeper 实现服务注册与发现

代码地址:https://github.com/xuqil/experiments/tree/main/zookeeper-client/registry

# 需求

在分布式系统中,一个集群可能会有多个 Server ,例如100个 Server 组成的集群,此时如果集群中有几台 Server 挂了,或者又有新的 Server 上线了,如果 Client 不清楚哪些 Server 是可用的,哪些是不可用的,就会导致服务异常(访问到异常的 Server 了)。那么 Client 怎么知道新上线了哪些 Server ,又怎么知道下线了哪些 Server 呢?

此时就需要用到 Server 的动态上下线,任意一台 Client 都能实时感知到主节点服务器的下线。负载均衡器(LoadBalancer,例如 NGINX、F5)可以实现, 使用 ETCD 和 Zookeeper 的 watch 机制也可以实现。

此处将使用 Zookeeper 实现一个服务发现与注册的案例。

# 服务注册与发现的流程

Zookeeper 实现服务注册与发现的流程:

  1. Server 启动时主动在/servers(可以自定义)下注册信息,在 Zookeeper 指定的 node 上创建临时节点,数据可以是 server 的一些关键信息;
  2. Client 启动后,会从 Zookeeper 获取的/servers获取子节点,于获取当前在线的服务器列表,然后注册监听/servers;
  3. 当有 Server 下线时,由于 Server 在 Zookeeper 上创建的是临时节点,此时 Server 与 Zookeeper 的连接端开,Zookeeper 上/servers下对应的临时节点就会自动被删除;
  4. 由于/servers下有临时节点删除事件发生,Zookeeper 就会通知 Client,有 Server 下线了;此时 Client 可以获取到最新的在线服务器列表;
  5. 最后,Client 会重新注册监听/servers,如此往复。

image-20230521000819960

# 服务注册的实现


// Registry 注册中心接口
type Registry interface {
	Register(key string, data []byte) error
	Unregister(key string) error
}

// ZkRegistry 实现了 Registry
type ZkRegistry struct {
	RootPath string
	Conn     *zk.Conn
}

func NewZkRegistry(rootPath string, conn *zk.Conn) *ZkRegistry {
	return &ZkRegistry{
		RootPath: fmt.Sprintf("/%s", rootPath),
		Conn:     conn,
	}
}

func (r *ZkRegistry) getAcl() []zk.ACL {
	return zk.WorldACL(zk.PermAll)
}

func (r *ZkRegistry) getAbsPath(path string) string {
	return fmt.Sprintf("%s/%s", r.RootPath, path)
}

// Register 向 zookeeper 创建一个临时节点
func (r *ZkRegistry) Register(path string, data []byte) error {
	_, err := r.Conn.Create(r.getAbsPath(path), data, 3, r.getAcl())
	return err
}

// Unregister 显式删除一个临时节点
func (r *ZkRegistry) Unregister(path string) error {
	path = r.getAbsPath(path)
	_, stat, err := r.Conn.Get(path)
	if err != nil {
		if errors.Is(err, zk.ErrNodeExists) {
			return nil
		}
		return err
	}
	return r.Conn.Delete(path, stat.Version)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

上面代码定义了一个Registry接口,包含了两个函数Register(服务注册)与Unregister(服务注销)。ZkRegistry实现了Registry接口,Server端可以使用ZkRegistry来实现服务的上下线。

# 服务发现的实现

// Client 客户端
type Client struct {
	zk         *ZkRegistry
	serverChan chan []string
}

func NewClient(registry *ZkRegistry) *Client {
	return &Client{
		zk:         registry,
		serverChan: make(chan []string),
	}
}

// GetServerList 获取服务列表
func (c *Client) GetServerList() <-chan []string {
	go c.Watch()
	return c.serverChan
}

func (c *Client) Business(fn func()) {
	fn()
}

// Watch 监听一个节点的子节点,获取所有子节点的数据
func (c *Client) Watch() {
	children, _, event, err := c.zk.Conn.ChildrenW(c.zk.RootPath)
	if err != nil {
		return
	}

	data := make([]string, 0)
	for _, child := range children {
		res, _, er := c.zk.Conn.Get(fmt.Sprintf("%s/%s", c.zk.RootPath, child))
		if er != nil {
			log.Println(er)
			continue
		}
		data = append(data, string(res))
	}
	go c.watchChildren(event)

	c.serverChan <- data
}

func (c *Client) watchChildren(event <-chan zk.Event) {
	<-event
	// 再次监听
	c.Watch()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

跟上面所述服务发现的流程一样,Client的GetServerList方法可以获取在线的服务列表,Watch方法可以监听 Zookeeper 上指定的 Node 下的所有的子节点的增删改事件。

# 测试

func getConn() *zk.Conn {
	conn, _, err := zk.Connect([]string{"192.168.122.20:2181", "192.168.122.21:2181", "192.168.122.22:2181"},
		5*time.Second)
	if err != nil {
		log.Fatal(err)
	}
	return conn
}

var registry = NewZkRegistry("servers", getConn())

// TestClientWatch 测试客户端的 watch
func TestClientWatch(t *testing.T) {
	// 1. 初始化 Client ,连接 zookeeper
	client := NewClient(registry)
	//	2. 获取 servers 的子节点信息,从中获取服务器信息列表
	go func() {
		for server := range client.GetServerList() {
			log.Println(server)
		}
	}()

	// 3. 业务进程启动
	client.Business(func() {
		for {
			time.Sleep(1 * time.Second)
		}
	})
}

// TestServerRegister 测试服务端上线
func TestServerRegister(t *testing.T) {
	// 依次启动5个服务
	for i := 0; i < 5; i++ {
		// 1. 初始化 server,连接 zk
		server := NewServer("server1", registry)
		// 2. 注册服务器
		server.Register()
		// 3. 业务进程启动
		server.Business(func() {
			for {
				time.Sleep(10 * time.Second)
			}
		})
		time.Sleep(5 * time.Second)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
上次更新: 2024/05/29, 06:25:22
Redis分布式锁
WSL 中配置 Ubuntu 22.04 开发环境

← Redis分布式锁 WSL 中配置 Ubuntu 22.04 开发环境→

最近更新
01
ip命令的使用
11-02
02
veth-pair容器中的网络
11-02
03
sendfile、aio和directIO
09-25
更多文章>
Theme by Vdoing | Copyright © 2018-2025 FeelingLife | 粤ICP备2022093535号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式