⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-storage-module/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

本文主要分享 SkyWalking Collector Storage 存储组件。顾名思义,负责将调用链路、应用、应用实例等等信息存储到存储器,例如,ES 、H2 。

友情提示:建议先阅读 《SkyWalking 源码分析 —— Collector 初始化》 ,以了解 Collector 组件体系。

FROM https://github.com/apache/incubating-skywalking

下面我们来看看整体的项目结构,如下图所示 :

  • apm-collector-coredatadefine :数据的抽象。
  • collector-storage-define :定义存储组件接口。
  • collector-storage-h2-provider :基于 H2 的 存储组件实现。该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用
  • collector-storage-es-provider :基于 Elasticsearch 的集群管理实现。生产环境推荐使用

下面,我们从接口到实现的顺序进行分享。

2. apm-collector-core

apm-collector-coredatadefine ,如下图所示:

我们对类进行梳理分类,如下图:

  • Table :Data 和 TableDefine 之间的桥梁,每个 Table 定义了该表的表名字段名们
  • TableDefine :Table 的详细定义,包括表名字段定义( ColumnDefine )们。在下文中,StorageInstaller 会基于 TableDefine 初始化表的相关信息。
  • Data :数据,包括一条数据的数据值们和数据字段( Column )们。在下文中,Dao 会存储 Data 到存储器中。另外,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》 中,我们也会看到对 Data 的流式处理通用封装。

2.1 Table

org.skywalking.apm.collector.core.data.CommonTable ,通用表。

  • TABLE_TYPE 静态属性,表类型。目前只有 ES 存储组件使用到,下文详细解析。
  • COLUMN_ 前缀的静态属性,通用的字段名。

collector-storage-definetable 下,我们可以看到所有 Table 类,以 "Table" 结尾。每个 Table 的表名,在每个实现类里,例如 ApplicationTable

2.2 TableDefine

org.skywalking.apm.collector.core.data.TableDefine ,表定义抽象类

不同的存储组件实现,有不同的 TableDefine 实现类,如下图:

  • ElasticSearchTableDefine :基于 Elasticsearch 的表定义抽象类,在 collector-storage-es-providerdefine 下,我们可以看到所有 ES 的 TableDefine 类。

  • H2TableDefine :基于 H2 的表定义抽象类,在 collector-storage-h2-providerdefine 下,我们可以看到所有 H2 的 TableDefine 类。

2.2.1 ColumnDefine

org.skywalking.apm.collector.core.data.ColumnDefine ,字段定义抽象类

  • name 属性,字段名。
  • type 属性,字段类型。

collector-storage-xxx-provider 模块中,H2ColumnDefine 、ElasticSearchColumnDefine 实现 ColumnDefine 。

2.2.2 Loader

涉及到的类如下图所示:

org.skywalking.apm.collector.core.data.StorageDefineLoader ,调用 org.skywalking.apm.collector.core.define.DefinitionLoader ,从 org.skywalking.apm.collector.core.data.StorageDefinitionFile 中,加载 TableDefine 实现类数组。

另外,在 collector-storage-es-providercollector-storage-h2-provider 里都有 storage.define 文件,如下图:

  • StorageDefinitionFile 声明了读取该文件。
  • 注意,DefinitionLoader 在加载时,两个文件都会被读取,最终在 StorageInstaller#defineFilter(List<TableDefine>) 方法,进行过滤。

代码比较简单,中文注释已加,胖友自己阅读理解下。

2.3 Data

org.skywalking.apm.collector.core.data.Data ,数据抽象类

collector-storage-definetable 下,我们可以看到所有 Data 类, "Table" 结尾,例如 Application

2.3.1 Column

org.skywalking.apm.collector.core.data.Column ,字段。

  • name 属性,字段名。
  • operation 属性,操作( Operation )。

2.3.2 Operation

org.skywalking.apm.collector.core.data.Operation ,操作接口。用于两个值之间的操作,例如,相加等等。目前实现类有:

3. collector-storage-define

collector-cluster-define :定义存储组件接口。项目结构如下 :

3.1 StorageModule

org.skywalking.apm.collector.storage.StorageModule ,实现 Module 抽象类,集群管理 Module 。

#name() 实现方法,返回模块名为 "storage"

#services() 实现方法,返回 Service 类名:在 org.skywalking.apm.collector.storage.dao 下的所有类 和 IBatchDAO。

3.2 table 包

org.skywalking.apm.collector.storage.table 包下,定义了存储模块所有的 Table 和 Data 实现类。

3.3 StorageInstaller

org.skywalking.apm.collector.storage.StorageInstaller ,存储安装器抽象类,基于 TableDefine ,初始化存储组件的表。

3.4 dao 包

collector-storage-define 项目结构图,我们看到一共有bao 包:

  • org.skywalking.apm.collector.storage.base.dao系统的 DAO 接口。
  • org.skywalking.apm.collector.storage.dao业务的 DAO 接口。
    • 继承系统的 DAO 接口。
    • collector-storage-xxx-providerdao实现

3.4.1 系统 DAO

org.skywalking.apm.collector.storage.base.dao.DAO ,继承 Service 接口,DAO 接口

无任何方法。

3.4.1.1 AbstractDAO

org.skywalking.apm.collector.storage.base.dao.AbstractDAO ,实现 DAO 接口,DAO 抽象基类。

  • client 属性,数据操作客户端。例如,H2Client 、ElasticSearchClient 。

collector-storage-xxx-provider 模块中,H2DAO 、EsDAO 实现 AbstractDAO 。

3.4.1.2 IPersistenceDAO

org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO ,实现 DAO 接口,持久化 DAO 接口,定义了 Data 的增删改查操作。

3.4.1.3 IBatchDAO

org.skywalking.apm.collector.storage.base.dao.IBatchDAO ,实现 DAO 接口,批量操作 DAO 接口

collector-storage-xxx-provider 模块中,BatchH2DAO 、BatchEsDAO 实现 IBatchDAO 。

3.4.2 业务 DAO

StorageModule#services() 方法里,我们可以看到,业务 DAO 按照用途可以拆分成四种

  • Cache :缓存应用、应用实例、服务名
  • Register :注册应用、应用实例、服务名
  • Persistence :持久化,实际可以理解成批量持久化
  • UI :SkyWaling UI 查询使用。

那么整理如下:

Package Data Cache / Register Persistence UI 关联文章
register Application
register Instance
register ServiceName
jvm CpuMetric
jvm CMetric
jvm MemoryMetric
jvm MemoryPoolMetric
global GlobalTrace
instance InstPerformance
node NodeComponent
node NodeMapping
noderef NodeReference
segment SegmentCost
segment Segment
service ServiceEntry
serviceref ServiceReference

4. collector-storage-h2-provider

collector-storage-h2-provider ,基于 H2 的存储组件实现。项目结构如下 :

该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用

由于生产环境主要使用 ES 的存储组件实现,所以本文暂不解析相关实现,感兴趣的胖友自己嗨起来。

5. collector-storage-es-provider

collector-storage-es-provider ,基于 ES 的存储组件实现。项目结构如下 :

实际使用时,通过 application.yml 配置如下:

storage:
elasticsearch:
cluster_name: elasticsearch
cluster_transport_sniffer: true
cluster_nodes: 127.0.0.1:9300
index_shards_number: 2
index_replicas_number: 0
ttl: 7

  • 生产环境下,推荐 Elasticsearch 配置成集群。
  • cluster_namecluster_transport_sniffercluster_nodesindex_shards_numberindex_replicas_number 参数,Elasticsearch 相关参数。
  • ttl :保留 N 天内的数据。超过 N 天的数据,将被自动滚动删除。
    • 该功能目前版本暂未发布,需要等到 5.0 版本后。
  • 《部署集群collector》

5.1 StorageModuleEsProvider

org.skywalking.apm.collector.storage.es.StorageModuleEsProvider ,实现 ModuleProvider 抽象类,基于 ES 的存储组件服务提供者。

#name() 实现方法,返回组件服务提供者名为 "elasticsearch"

module() 实现方法,返回组件类为 StorageModule 。

#requiredModules() 实现方法,返回依赖组件为 "cluster"


#prepare(Properties) 实现方法,执行准备阶段逻辑。

#start() 实现方法,执行启动阶段逻辑。

#notifyAfterCompleted() 实现方法,执行启动完成逻辑。

  • 第 115 行 :调用 DataTTLKeeperTimer#start() 方法,启动 DataTTLKeeperTimer 。在本文 「5.4 DataTTLKeeperTimer」 详细解析。

5.2 define 包

collector-storage-es-provider 项目结构图,我们看到一共有define 包:

  • org.skywalking.apm.collector.storage.es.base.define系统的 TableDefine 抽象类。
  • org.skywalking.apm.collector.storage.es.define业务的 TableDefine 实现类。
    • 继承系统的 TableDefine 抽象类。

5.2.1 ElasticSearchTableDefine

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine ,实现 TableDefine 接口,基于 Elasticsearch 的表定义抽象类

5.2.2 ElasticSearchColumnDefine

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine ,实现 ColumnDefine 抽象类,基于 ES 的字段定义。

  • Type 枚举类:枚举 ES 字段类型。

5.2.3 业务 TableDefine 实现类

org.apache.skywalking.apm.collector.storage.es.define 里,我们可以看到,所有基于 ES 的业务 TableDefine 实现类。例如:ApplicationEsTableDefine

整体 #refreshInterval() 方法返回的结果如下:

  • 1 s
    • CpuMetricEsTableDefine
    • GCMetricEsTableDefine
    • MemoryMetricEsTableDefine
    • MemoryPoolMetricEsTableDefine
  • 2 s
    • InstPerformanceEsTableDefine
    • NodeComponentEsTableDefine
    • NodeMappingEsTableDefine
    • NodeReferenceEsTableDefine
    • ServiceEntryEsTableDefine
    • ServiceReferenceEsTableDefine
  • 2 s && WriteRequest.RefreshPolicy.IMMEDIATE
  • 5 s
    • GlobalTraceEsTableDefine
    • SegmentCostEsTableDefine
  • 10 s
    • SegmentEsTableDefine

5.2.4 ElasticSearchStorageInstaller

友情提示:ElasticSearchStorageInstaller 主要是对 Elasticsearch Java API 的使用,所以不熟悉的胖友,可以 Google 下。

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller ,实现 StorageInstaller 抽象类, 基于 ES 存储安装器实现类。

  • #defineFilter(List<TableDefine>) 实现方法,过滤数组中,非 ElasticSearchTableDefine 的元素。
  • #createTable(Client, TableDefine) 实现方法,创建 Elasticsearch 索引。
    • 文档数据结构如下:
      • _id :数据编号,String 类型。
      • _type"type"
      • _index :TableDefine 定义的表名
      • source :Data 数据。
    • 了解 Elasticsearch 的胖友可能有和笔者一样的疑惑,网络上很多文章把 _index 类比成关系数据库的 DB ,_type 类比成关系数据库的 Table ,和 SkyWalking 目前使用的方式不一致
      • SkyWalking 彭勇升_index_type 是 ES 特有的,考虑其他数据库接入,所以没有用他这个特性。
      • SkyWalking QQ交流群( 392443393 ) ,小心 群友 :_type 本来就没做物理隔离,Lucene 层面也不存在,ES 6.x 已经废弃了。
      • 《Elasticsearch 6.0 将移除 Type》
  • #deleteTable(Client, TableDefine) 实现方法,删除 Elasticsearch 索引。
  • #isExists(Client, TableDefine) 实现方法,判断 Elasticsearch 索引是否存在。
  • 在方法里,笔者添加了一些 API 的说明,不熟悉的胖友,可以仔细阅读理解。

5.3 dao 包

collector-storage-es-provider 项目结构图,我们看到一共有dao 包:

  • org.skywalking.apm.collector.storage.es.base.dao系统的 DAO 抽象类。
  • org.skywalking.apm.collector.storage.es.dao业务的 DAO 实现类。
    • 继承系统的 DAO 抽象类。

5.3.1 EsDAO

org.skywalking.apm.collector.storage.es.base.dao.EsDAO ,实现 AbstractDAO 抽象类,基于 ES 的 DAO 抽象类

5.3.2 BatchEsDAO

org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO ,实现 IBatchDAO 接口,继承 EsDAO 抽象类,基于 ES 批量操作 DAO 实现类。

  • #batchPersistence(List<?>) 实现方法,将 org.elasticsearch.action.index.IndexRequestBuilderorg.elasticsearch.action.index.UpdateRequestBuilder 数组,创建成 org.elasticsearch.action.bulk.BulkRequestBuilder 对象,批量持久化。

5.3.3 业务 DAO 实现类

org.apache.skywalking.apm.collector.storage.es.dao 里,我们可以看到,所有基于 ES 的业务 DAO 实现类。

实现代码易懂,胖友可以自己阅读。良心如我们,按照 DAO 的业务用途,推荐例子如下:

5.4 DataTTLKeeperTimer

org.skywalking.apm.collector.storage.es.DataTTLKeeperTimer ,过期数据删除定时器。通过该定时器,只保留 N 天内的数据。

  • #start() 方法,启动定时任务。
    • 第 49 行:创建延迟 1 小时,每 8 小时执行一次 #delete() 方法的定时任务。目前该行代码被注释,胖友可以等待 SkyWallking 5.0 版本的发布。
  • #delete() 方法,删除过期数据。

如下是不会删除的数据的表:

  • Application
  • Instance
  • ServiceName
  • ServiceEntry

666. 彩蛋

知识星球

😈 有种自己把简单的东西写的太复杂了,悲伤。

胖友望见谅。

胖友,分享一波朋友圈可好。

文章目录
  1. 1. 1. 概述
  2. 2. 2. apm-collector-core
    1. 2.1. 2.1 Table
    2. 2.2. 2.2 TableDefine
      1. 2.2.1. 2.2.1 ColumnDefine
      2. 2.2.2. 2.2.2 Loader
    3. 2.3. 2.3 Data
      1. 2.3.1. 2.3.1 Column
      2. 2.3.2. 2.3.2 Operation
  3. 3. 3. collector-storage-define
    1. 3.1. 3.1 StorageModule
    2. 3.2. 3.2 table 包
    3. 3.3. 3.3 StorageInstaller
    4. 3.4. 3.4 dao 包
      1. 3.4.1. 3.4.1 系统 DAO
        1. 3.4.1.1. 3.4.1.1 AbstractDAO
        2. 3.4.1.2. 3.4.1.2 IPersistenceDAO
        3. 3.4.1.3. 3.4.1.3 IBatchDAO
      2. 3.4.2. 3.4.2 业务 DAO
  4. 4. 4. collector-storage-h2-provider
  5. 5. 5. collector-storage-es-provider
    1. 5.1. 5.1 StorageModuleEsProvider
    2. 5.2. 5.2 define 包
      1. 5.2.1. 5.2.1 ElasticSearchTableDefine
      2. 5.2.2. 5.2.2 ElasticSearchColumnDefine
      3. 5.2.3. 5.2.3 业务 TableDefine 实现类
      4. 5.2.4. 5.2.4 ElasticSearchStorageInstaller
    3. 5.3. 5.3 dao 包
      1. 5.3.1. 5.3.1 EsDAO
      2. 5.3.2. 5.3.2 BatchEsDAO
      3. 5.3.3. 5.3.3 业务 DAO 实现类
    4. 5.4. 5.4 DataTTLKeeperTimer
  6. 6. 666. 彩蛋