⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-cluster-module/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 SkyWalking Collector Cluster Module,负责集群的管理,即 Collector 节点的注册于发现。

友情提示:建议先阅读 《SkyWalking 源码分析 —— Collector 初始化》 ,以了解 Collector 组件体系。

Cluster Module 在 SkyWalking 架构图处于如下位置( 红框 ) :

FROM https://github.com/apache/incubating-skywalking

下面我们来看看整体的项目结构,如下图所示 :

  • collector-cluster-define :定义集群管理接口。
  • collector-cluster-standalone-provider :基于 H2 的 集群管理实现。该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用
  • collector-cluster-redis-provider :基于 Redis 的集群管理实现。目前暂未完成
  • collector-cluster-zookeeper-provider :基于 Zookeeper 的集群管理实现。生产环境推荐使用

下面,我们从接口到实现的顺序进行分享。

2. collector-cluster-define

collector-cluster-define :定义集群管理接口。项目结构如下 :

  • 交互如下图 :

  • ModuleListenerService 暴露给其他 Module 注册监听器 ( ClusterModuleListener ) 到 DataMonitor 。

  • ModuleRegisterService 暴露给其他 Module 注册组件登记( ModuleRegistration ) 到 DataMonitor 。

  • 通过实现 DataMonitor 接口,基于不同的存储器实现注册发现。

2.1 ClusterModule

org.skywalking.apm.collector.cluster.ClusterModule ,实现 Module 抽象类,集群管理 Module 。

#name() 实现方法,返回模块名为 "cluster"

#services() 实现方法,返回 Service 类名:ModuleListenerService / ModuleRegisterService 。

2.2 ModuleRegisterService

org.skywalking.apm.collector.cluster.service.ModuleRegisterService ,继承 Service 接口,模块注册服务接口

#register(moduleName, providerName, registration) 接口方法,注册模块注册信息。一般情况下,实现该接口方法,调用 DataMonitor#register(path, registration) 方法。

2.2.1 ModuleRegistration

org.skywalking.apm.collector.cluster.ModuleRegistration ,模块注册信息抽象类。不同 Module 通过实现 ModuleRegistration ,将它们注册到 ModuleRegisterService。目前子类如下 :

#buildValue() 抽象方法,获得模块注册信息( Value )。

2.3 ModuleListenerService

org.skywalking.apm.collector.cluster.service.ModuleListenerService ,继承 Service 接口,注册监听器服务接口

#addListener(listener) 接口方法,添加监听器。一般情况下,实现该接口方法,调用 DataMonitor#addListener(listener) 方法。

2.3.1 ClusterModuleListener

org.skywalking.apm.collector.cluster.ClusterModuleListener ,集群组件监听器抽象类。目前子类如下 :

构造方法,创建地址数组( addresses )。该数组的读写方法如下:

  • #addAddress(address)
  • #removeAddress(address)
  • #getAddresses()

#path() 抽象方法,返回路径。该路径即为 ClusterModuleListener 监听的**"事件"**。多个 Collector 节点的相同 Module ,通过路径分组形成集群

#serverJoinNotify(serverAddress) / #serverQuitNotify(serverAddress) 抽象方法,通知服务的加入 / 下线。目前只有 GRPCRemoteSenderService 真正( 其它都是空方法 )实现该方法,在 《SkyWalking 源码分析 —— Collector Remote 远程通信服务》「3.2 GRPCRemoteSenderService」 详细解析。

2.4 DataMonitor

org.skywalking.apm.collector.cluster.DataMonitor ,数据监接口。通过实现 DataMonitor 接口,基于不同的存储器实现注册发现。目前子类如下 :

#register(path, registration) 接口方法,注册模块注册信息。

#addListener(ClusterModuleListener) 接口方法,添加监听器。

#setClient(Client) 接口方法,设置 Client 。在 client-component 有 ZookeeperClient / H2Client / ElasticSearchClient 等多种实现。

  • BASE_CATALOG 属性,基础目录为 "/skywalking" 。例如说,在 Zookeeper 为根节点的路径。
  • #createPath(path) 接口方法,使用 Client 创建路径。
  • #setData(path) 接口方法,使用 Client 设置路径的值。

3. collector-cluster-zookeeper-provider

collector-cluster-zookeeper-provider ,基于 Zookeeper 的集群管理实现。项目结构如下 :

实际使用时,通过 application.yml 配置如下:

cluster:
zookeeper:
hostPort: localhost:2181
sessionTimeout: 100000

  • 生产环境下,推荐 Zookeeper 配置成集群。

3.1 ClusterModuleZookeeperProvider

org.skywalking.apm.collector.cluster.zookeeper.ClusterModuleZookeeperProvider ,实现 ModuleProvider 抽象类,基于 Zookeeper 的集群管理服务提供者。

#name() 实现方法,返回组件服务提供者名为 "zookeeper"

module() 实现方法,返回组件类为 ClusterModule 。

#requiredModules() 实现方法,返回依赖组件为空。


#prepare(Properties) 实现方法,执行准备阶段逻辑。

  • 第 63 行 :创建 ClusterZKDataMonitor 对象。
  • 第 69 行 :创建 ZookeeperClient 对象。注意,此时并未连接 Zookeeper
  • 第 71 至 73 行 :创建 ZookeeperModuleListenerService / ZookeeperModuleRegisterService 对象,并调用 #registerServiceImplementation() 父类方法,注册到 services

#start() 实现方法,执行启动阶段逻辑。

  • 第 79 行 :调用 ZookeeperClient#initialize() 方法,初始化 ZookeeperClient ,此时会连接 Zookeeper

#notifyAfterCompleted() 实现方法,执行启动完成逻辑。

3.2 ZookeeperModuleRegisterService

org.skywalking.apm.collector.cluster.zookeeper.service.ZookeeperModuleRegisterService ,基于 Zookeeper 的模块注册服务实现类

#register(moduleName, providerName, registration) 实现方法,调用 ClusterZKDataMonitor#register(path, registration) 方法,注册模块注册信息。

3.3 ZookeeperModuleListenerService

org.skywalking.apm.collector.cluster.zookeeper.service.ZookeeperModuleListenerService ,基于 Zookeeper 的注册监听器服务实现类

#addListener(ClusterModuleListener) 实现方法,调用 ClusterZKDataMonitor#addListener(ClusterModuleListener) 方法,注册模块注册信息。

3.4 ClusterZKDataMonitor

org.skywalking.apm.collector.cluster.zookeeper.ClusterZKDataMonitor ,基于 Zookeeper 的数据监视器实现类

在看具体代码实现之前,我们先来看看 Zookeeper 是如何存储数据的,如下图所示 :

  • 紫色部分,通过调用 #createPath(path) 方法,顺着路径,逐层创建持久节点。

  • 黄色部分,通过调用 #setData(path) 方法,创建临时节点,设置 Collector 模块地址。若 Collector 集群有 N 个节点,则此处会有 N 个临时节点。

  • 打开 zkClient.sh ,我们来看一个例子 :

    [zk: localhost:2181(CONNECTED) 1] ls /skywalking
    [remote, ui, agent_jetty, agent_gRPC]

    [zk: localhost:2181(CONNECTED) 2] ls /skywalking/ui
    [jetty]

    [zk: localhost:2181(CONNECTED) 3] ls /skywalking/ui/jetty
    [localhost:12800]

    [zk: localhost:2181(CONNECTED) 4] get /skywalking/ui/jetty/localhost:12800
    /
    cZxid = 0x24
    ctime = Thu Dec 14 16:05:25 CST 2017
    mZxid = 0x24
    mtime = Thu Dec 14 16:05:25 CST 2017
    pZxid = 0x24
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x16052d8b9f40006
    dataLength = 1
    numChildren = 0


#register(path, registration) 实现方法,添加到组件注册信息集合( registrations )。

#start() 方法,启动 ClusterZKDataMonitor ,将组件注册信息( registrations ) 写到 Zookeeper 中。


#addListener(listener) 实现方法,添加到监听器集合( listeners )。

#process(WatchedEvent) 实现方法,处理有 Collector 节点的组件加入或下线。总体逻辑是,从 Zookeeper 获取变更的路径下的地址数组,和本地的地址( ClusterModuleListener.addresses )比较,处理加入或移除逻辑的地址。

  • ClusterZKDataMonitor 实现 org.apache.zookeeper.Watcher 接口,所以实现该方法。
  • 该方法是 synchronized 方法,以保证不会出现并发问题。

3.5 ZookeeperClient

org.skywalking.apm.collector.client.zookeeper.ZookeeperClient ,实现 Client 接口,Zookeeper 客户端。

代码比较简单,胖友自己阅读理解。

4. collector-cluster-standalone-provider

collector-cluster-standalone-provider.ClusterStandaloneDataMonitor ,基于 H2 的 集群管理实现。该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用。项目结构如下 :

大体实现和 collector-cluster-zookeeper-provider 差不多,差异在对 DataMonitor 的实现类 ClusterStandaloneDataMonitor 上。

在 ClusterStandaloneDataMonitor 里,实际并未使用 H2Client ,而是基于内存,胖友可以自己查看下。

5. collector-cluster-redis-provider

collector-cluster-redis-provider :基于 Redis 的集群管理实现。目前暂未完成

【TODO 4003】等实现后来写写,基于 Redis Pub Sub 保证实时性

666. 彩蛋

知识星球

有一种硬生生把很简单的东西,写的很复杂的感觉。

胖友,分享个朋友圈可好?

文章目录
  1. 1. 1. 概述
  2. 2. 2. collector-cluster-define
    1. 2.1. 2.1 ClusterModule
    2. 2.2. 2.2 ModuleRegisterService
      1. 2.2.1. 2.2.1 ModuleRegistration
    3. 2.3. 2.3 ModuleListenerService
      1. 2.3.1. 2.3.1 ClusterModuleListener
    4. 2.4. 2.4 DataMonitor
  3. 3. 3. collector-cluster-zookeeper-provider
    1. 3.1. 3.1 ClusterModuleZookeeperProvider
    2. 3.2. 3.2 ZookeeperModuleRegisterService
    3. 3.3. 3.3 ZookeeperModuleListenerService
    4. 3.4. 3.4 ClusterZKDataMonitor
    5. 3.5. 3.5 ZookeeperClient
  4. 4. 4. collector-cluster-standalone-provider
  5. 5. 5. collector-cluster-redis-provider
  6. 6. 666. 彩蛋