⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-remote-module/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 SkyWalking Collector Remote 远程通信服务。该服务用于 Collector 集群内部通信。

目前集群内部通信的目的,跨节点的流式处理。Remote Module 应用在 SkyWalking 架构图如下位置( 红框 ) :

FROM https://github.com/apache/incubating-skywalking

下面我们来看看整体的项目结构,如下图所示 :

  • collector-remote-define :定义远程通信接口。
  • collector-remote-kafka-provider :基于 Kafka 的远程通信实现。目前暂未完成
  • collector-remote-grpc-provider :基于 Google gRPC 的远程通信实现。生产环境目前使用

下面,我们从接口到实现的顺序进行分享。

2. collector-remote-define

collector-remote-define :定义远程通信接口。项目结构如下 :

整体流程如下图:

我们按照整个流程的处理顺序,逐个解析涉及到的类与接口。

2.1 RemoteModule

org.skywalking.apm.collector.remote.RemoteModule ,实现 Module 抽象类,远程通信 Module 。

#name() 实现方法,返回模块名为 "remote"

#services() 实现方法,返回 Service 类名:RemoteSenderService 、RemoteDataRegisterService 。

2.2 RemoteSenderService

org.skywalking.apm.collector.remote.service.RemoteSenderService ,继承 Service 接口,远程发送服务接口,定义了 #send(graphId, nodeId, data, selector) 接口方法,调用 RemoteClient ,发送数据。

2.3 RemoteClientService

org.skywalking.apm.collector.remote.service.RemoteClientService ,继承 Service 接口,远程客户端服务接口,定义了 #create(host, port, channelSize, bufferSize) 接口方法,创建 RemoteClient 对象。

2.4 RemoteClient

org.skywalking.apm.collector.remote.service.RemoteClient ,继承 java.lang.Comparable 接口,远程客户端接口。定义了如下接口方法:

2.5 CommonRemoteDataRegisterService

在说 CommonRemoteDataRegisterService 之前,首先来说下 CommonRemoteDataRegisterService 的意图。

在上文中,我们可以看到发送给 Collector 是 Data 对象,而 Data 是数据的抽象类,在具体反序列化 Data 对象之前,程序是无法得知它是 Data 的哪个实现对象。这个时候,我们可以给 Data 对象的每个实现类,生成一个对应的数据协议编号

  • 在发送数据之前,序列化 Data 对象时,增加该 Data 对应的协议编号,一起发送。
  • 在接收数据之后,反序列化数据时,根据协议编号,创建 Data 对应的实现类对象。

org.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService ,通用远程数据注册服务。

  • id 属性,数据协议自增编号。
  • dataClassMapping 属性,数据类型( Class<? extends Data> )与数据协议编号的映射。
  • dataInstanceCreatorMapping 属性,数据协议编号与数据对象创建器( RemoteDataInstanceCreator )的映射。

2.5.1 RemoteDataRegisterService

org.skywalking.apm.collector.remote.service.RemoteDataRegisterService ,继承 Service 接口,远程客户端服务接口,定义了 #register(Class<? extends Data>, RemoteDataInstanceCreator) 接口方法,注册数据类型对应的远程数据创建器( RemoteDataRegisterService.RemoteDataInstanceCreator )对象。

CommonRemoteDataRegisterService 实现了 RemoteDataRegisterService 接口,#register(Class<? extends Data>, RemoteDataInstanceCreator) 实现方法。

另外,AgentStreamRemoteDataRegister 会调用 RemoteDataRegisterService#register(Class<? extends Data>, RemoteDataInstanceCreator) 方法,注册每个数据类型的 RemoteDataInstanceCreator 对象。注意,例如 Application::new 是 RemoteDataInstanceCreator 的匿名实现类

2.5.2 RemoteDataIDGetter

org.skywalking.apm.collector.remote.service.RemoteDataIDGetter ,继承 Service 接口,远程数据协议编号获取器接口,定义了 #getRemoteDataId(Class<? extends Data>) 接口方法,根据数据类型获取数据协议编号。

CommonRemoteDataRegisterService 实现了 RemoteDataIDGetter 接口,#getRemoteDataId(Class<? extends Data>) 实现方法。

2.5.3 RemoteDataInstanceCreatorGetter

org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorGetter ,继承 Service 接口,远程数据创建器的获取器接口,定义了 #getInstanceCreator(remoteDataId 接口方法,根据数据协议编号获得远程数据创建器( RemoteDataInstanceCreator )。

CommonRemoteDataRegisterService 实现了 RemoteDataInstanceCreatorGetter 接口,#getInstanceCreator(remoteDataId) 实现方法。

2.6 RemoteSerializeService

org.skywalking.apm.collector.remote.service.RemoteSerializeService ,远程通信序列化服务接口,定义了 #serialize(Data) 接口方法,序列化数据,生成 Builder 对象。

2.7 RemoteSerializeService

org.skywalking.apm.collector.remote.service.RemoteDeserializeService ,远程通信序反列化服务接口,定义了 #deserialize(RemoteData, Data) 接口方法,反序列化传输数据。

3. collector-remote-grpc-provider

collector-remote-grpc-provider ,基于 Google gRPC 的远程通信实现。

项目结构如下 :

默认配置,在 application-default.yml 已经配置如下:

remote:
gRPC:
host: localhost
port: 11800

3.1 RemoteModuleGRPCProvider

org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider ,实现 ModuleProvider 抽象类,基于 gRPC 的组件服务提供者实现类。

#name() 实现方法,返回组件服务提供者名为 "gRPC"

module() 实现方法,返回组件类为 RemoteModule 。

#requiredModules() 实现方法,返回依赖组件为 clustergRPC_manager


#prepare(Properties) 实现方法,执行准备阶段逻辑。

  • 第 53 至 56 行 :创建 CommonRemoteDataRegisterService 、GRPCRemoteSenderService 对象,并调用 #registerServiceImplementation() 父类方法,注册到 services

#start() 实现方法,执行启动阶段逻辑。

#notifyAfterCompleted() 实现方法,方法为空。

3.2 GRPCRemoteSenderService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService ,继承 ClusterModuleListener 抽象类,实现 RemoteSenderService 接口,基于 gPRC 的远程发送服务实现类。

3.2.1 注册发现

通过继承 ClusterModuleListener 抽象类,实现了监听 Collector 集群节点的加入或离开。

  • remoteClients 属性,连接 Collector 集群节点的客户端数组。每个 Collector 集群节点,对应一个客户端。
  • #path() 实现方法,返回监听的目录 "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME 。Collector 集群中,每个节点的 Remote Server 都会注册到该目录下。
  • #serverJoinNotify(serverAddress) 实现方法,当新的节点加入,创建新的客户端连接。
  • #serverQuitNotify(serverAddress) 实现方法,当老的节点离开,移除对应的客户端连接。

3.2.2 负载均衡

RemoteModuleGRPCProvider 基于不同的选择器 ( Selector ) ,提供不同的客户端选择( org.skywalking.apm.collector.remote.grpc.service.selector.RemoteClientSelector )实现 :

3.3 GRPCRemoteClientService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService ,实现 RemoteClientService 接口,基于 gRPC 的远程客户端服务实现类。

#create(host, port, channelSize, bufferSize) 实现方法,创建 GRPCRemoteClient 对象。

3.4 GRPCRemoteClient

友情提示:本小节会涉及较多 gRPC 相关的知识,建议不熟悉的胖友自己 Google ,补充下姿势。

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClient ,实现 RemoteClient 接口,基于 gRPC 的远程客户端实现类。

  • client 属性,GRPCClient 对象。相比来说,GRPCRemoteClient 偏业务的封装,内部调用 GRPCClient 对象。
  • carrier 属性,DataCarrier 对象,本地消息队列。GRPCRemoteClient 在被调用发送数据时,先提交到本地队列,异步消费进行发送到远程 Collector 节点。DataCarrier 在 《SkyWalking 源码分析 —— DataCarrier 异步处理库》 详细解析。
    • 第 63 行:调用 DataCarrier#consume(IConsumer, num) 方法,设置消费者为 RemoteMessageConsumer 对象。

#push(graphId, nodeId, data) 实现方法,异步发送消息到远程 Collector 。

  • 第 73 行:调用 RemoteDataIDGetter#getRemoteDataId(Class<? extends Data>) 方法,获得数据协议编号
  • 第 76 至 80 行:创建传输数据( RemoteMessage.Builder ) 对象。RemoteMessage 通过 Protobuf 创建定义,如下图所示:
  • 第 83 行:调用 DataCarrier#produce(data) 方法,发送数据到本地队列。

RemoteMessageConsumer批量消费本地队列的数据,逐条发送数据到远程 Collector 节点。

  • #consume(List<RemoteMessage>) 实现方法,代码如下:
    • 第 100 行:创建 StreamObserver 对象。StreamObserver 主要是 gPRC 相关的 API 的调用。
    • 第 101 至 103 行:调用 io.grpc.stub.StreamObserver#onNext(RemoteMessage) 方法,逐条发送数据。
    • 第 106 行:调用 io.grpc.stub.StreamObserver#onCompleted() 方法,全部请求数据发送完成

3.5 RemoteCommonServiceHandler

org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler ,实现 org.skywalking.apm.collector.server.grpc.GRPCHandler 接口,继承 RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 抽象类,远程通信通用逻辑处理器。

其中,RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 在 RemoteCommonService.proto 文件的定义如下图:

#call(StreamObserver<Empty>) 实现方法,代码如下:

  • #onNext(RemoteMessage) 方法,处理每一条消息,代码如下:
    • 第 65 行:调用 RemoteDataInstanceCreatorGetter#getInstanceCreator(remoteDataId) 方法,获得数据协议编号对应的 RemoteDataInstanceCreator 对象。然后,调用 RemoteDataInstanceCreator#createInstance(id) 方法,创建数据协议编号对应的 Data 实现类对应的对象。
    • 第 70 行:调用 GraphManager#findGraph(graphId) 方法,获得 graphId 对应的 Graph 对象。然后,调动 GraphNodeFinder#findNext(nodeId) 方法,获得 Next 对象。
    • 第 71 行:调用 Next#execute(Data) 方法,继续流式处理。

3.6 GRPCRemoteSerializeService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService ,实现 RemoteSerializeService 接口,基于 gRPC 的远程通信序列化服务实现类。

3.7 GRPCRemoteDeserializeService

org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService ,实现 GRPCRemoteDeserializeService 接口,基于 gRPC 的远程通信反序列化服务实现类。

4. collector-remote-grpc-provider

collector-remote-kafka-provider :基于 Kafka 的远程通信实现。

目前暂未完成

TODO 【4005】collector-remote-grpc-provider

666. 彩蛋

知识星球

写的有丢丢烦躁,不清晰或者错误的地方,胖友望见谅。

欢迎微信我一起交流。

胖友,分享一波朋友圈可好。

文章目录
  1. 1. 1. 概述
  2. 2. 2. collector-remote-define
    1. 2.1. 2.1 RemoteModule
    2. 2.2. 2.2 RemoteSenderService
    3. 2.3. 2.3 RemoteClientService
    4. 2.4. 2.4 RemoteClient
    5. 2.5. 2.5 CommonRemoteDataRegisterService
      1. 2.5.1. 2.5.1 RemoteDataRegisterService
      2. 2.5.2. 2.5.2 RemoteDataIDGetter
      3. 2.5.3. 2.5.3 RemoteDataInstanceCreatorGetter
    6. 2.6. 2.6 RemoteSerializeService
    7. 2.7. 2.7 RemoteSerializeService
  3. 3. 3. collector-remote-grpc-provider
    1. 3.1. 3.1 RemoteModuleGRPCProvider
    2. 3.2. 3.2 GRPCRemoteSenderService
      1. 3.2.1. 3.2.1 注册发现
    3. 3.3. 3.2.2 负载均衡
    4. 3.4. 3.3 GRPCRemoteClientService
    5. 3.5. 3.4 GRPCRemoteClient
    6. 3.6. 3.5 RemoteCommonServiceHandler
    7. 3.7. 3.6 GRPCRemoteSerializeService
    8. 3.8. 3.7 GRPCRemoteDeserializeService
  4. 4. 4. collector-remote-grpc-provider
  5. 5. 666. 彩蛋