⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/MyCAT/single-db-single-table-insert/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 MyCAT 1.6.5 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

内容形态以 顺序图 + 核心代码 为主。
如果有地方表述不错误或者不清晰,欢迎留言。
对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。

本文讲解 【单库单表】插入 所涉及到的代码。交互如下图:

单库单表插入简图

整个过程,MyCAT Server 流程如下:

  1. 接收 MySQL Client 请求,解析 SQL。
  2. 获得路由结果,进行路由。
  3. 获得 MySQL 连接,执行 SQL。
  4. 响应执行结果,发送结果给 MySQL Client。

我们逐个步骤分析,一起来看看源码。

2. 接收请求,解析 SQL

【单库单表】插入(01主流程)

【 1 - 2 】

接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。

【 3 】

不同 MySQL 命令,分发到不同的方法执行。核心代码如下:

 1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】
2: public class FrontendCommandHandler implements NIOHandler {
3:
4: @Override
5: public void handle(byte[] data) {
6:
7: // .... 省略部分代码
8: switch (data[4]) //
9: {
10: case MySQLPacket.COM_INIT_DB:
11: commands.doInitDB();
12: source.initDB(data);
13: break;
14: case MySQLPacket.COM_QUERY: // 查询命令
15: // 计数查询命令
16: commands.doQuery();
17: // 执行查询命令
18: source.query(data);
19: break;
20: case MySQLPacket.COM_PING:
21: commands.doPing();
22: source.ping();
23: break;
24: // .... 省略部分case
25: }
26: }
27:
28: }

INSERT/SELECT/UPDATE/DELETE 等 SQL 归属于 MySQLPacket.COM_QUERY,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 -> 服务器)》

##【 4 】

将 二进制数组 解析成 SQL。核心代码如下:

 1: // ⬇️⬇️⬇️【FrontendConnection.java】
2: public void query(byte[] data) {
3: // 取得语句
4: String sql = null;
5: try {
6: MySQLMessage mm = new MySQLMessage(data);
7: mm.position(5);
8: sql = mm.readString(charset);
9: } catch (UnsupportedEncodingException e) {
10: writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
11: return;
12: }
13: // 执行语句
14: this.query( sql );
15: }

##【 5 】

解析 SQL 类型。核心代码如下:

 1: // ⬇️⬇️⬇️【ServerQueryHandler.java】
2: @Override
3: public void query(String sql) {
4: // 解析 SQL 类型
5: int rs = ServerParse.parse(sql);
6: int sqlType = rs & 0xff;
7:
8: switch (sqlType) {
9: //explain sql
10: case ServerParse.EXPLAIN:
11: ExplainHandler.handle(sql, c, rs >>> 8);
12: break;
13: // .... 省略部分case
14: break;
15: case ServerParse.SELECT:
16: SelectHandler.handle(sql, c, rs >>> 8);
17: break;
18: // .... 省略部分case
19: default:
20: if(readOnly){
21: LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
22: c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
23: break;
24: }
25: c.execute(sql, rs & 0xff);
26: }
27: }
28:
29:
30: // ⬇️⬇️⬇️【ServerParse.java】
31: public static int parse(String stmt) {
32: int length = stmt.length();
33: //FIX BUG FOR SQL SUCH AS /XXXX/SQL
34: int rt = -1;
35: for (int i = 0; i < length; ++i) {
36: switch (stmt.charAt(i)) {
37: // .... 省略部分case case 'I':
38: case 'i':
39: rt = insertCheck(stmt, i);
40: if (rt != OTHER) {
41: return rt;
42: }
43: continue;
44: // .... 省略部分case
45: case 'S':
46: case 's':
47: rt = sCheck(stmt, i);
48: if (rt != OTHER) {
49: return rt;
50: }
51: continue;
52: // .... 省略部分case
53: default:
54: continue;
55: }
56: }
57: return OTHER;
58: }

##【 6 】

执行 SQL,详细解析见下文,核心代码如下:

 1: // ⬇️⬇️⬇️【ServerConnection.java】
2: public class ServerConnection extends FrontendConnection {
3: public void execute(String sql, int type) {
4: // .... 省略代码
5: SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);
6: if (schema == null) {
7: writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,
8: "Unknown MyCAT Database '" + db + "'");
9: return;
10: }
11:
12: // .... 省略代码
13:
14: // 路由到后端数据库,执行 SQL
15: routeEndExecuteSQL(sql, type, schema);
16: }
17:
18: public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
19: // 路由计算
20: RouteResultset rrs = null;
21: try {
22: rrs = MycatServer
23: .getInstance()
24: .getRouterservice()
25: .route(MycatServer.getInstance().getConfig().getSystem(),
26: schema, type, sql, this.charset, this);
27:
28: } catch (Exception e) {
29: StringBuilder s = new StringBuilder();
30: LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);
31: String msg = e.getMessage();
32: writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
33: return;
34: }
35:
36: // 执行 SQL
37: if (rrs != null) {
38: // session执行
39: session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
40: }
41:
42: }
43:
44: }

3. 获得路由结果

【单库单表】插入(02获取路由)

【 1 - 2 】【 12 】

获得路由主流程。核心代码如下:

 1: // ⬇️⬇️⬇️【RouteService.java】
2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
3: int sqlType, String stmt, String charset, ServerConnection sc)
4: throws SQLNonTransientException {
5: RouteResultset rrs = null;
6: // .... 省略代码
7: int hintLength = RouteService.isHintSql(stmt);
8: if(hintLength != -1){ // TODO 待读:hint
9: // .... 省略代码
10: }
11: } else {
12: stmt = stmt.trim();
13: rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,
14: charset, sc, tableId2DataNodeCache);
15: }
16:
17: // .... 省略代码 return rrs;
18: }
19: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
20: @Override
21: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
22: String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
23:
24: // .... 省略代码
25:
26: // 处理一些路由之前的逻辑;全局序列号,父子表插入
27: if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
28: return null;
29: }
30:
31: // .... 省略代码
32:
33: // 检查是否有分片
34: if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
35: rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
36: } else {
37: RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
38: if (returnedSet == null) {
39: rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
40: }
41: }
42:
43: return rrs;
44: }

路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。

【 3 - 6 】

路由前置处理。当符合如下三种情况下,进行处理:

{ 1 } 使用全局序列号

insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')

{ 2 } ER 子表插入
{ 3 } 主键使用自增 ID 插入:

insert into table (name) values ('name')
===>
insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
```

情况 { 1 } { 3 } 情况类似,使用全局序列号。

核心代码如下:

```Java
1: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
2: private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)
3: throws SQLNonTransientException {
4: return // 处理 id 使用 全局序列号
5: RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)
6: // 处理 ER 子表
7: || (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))
8: // 处理 id 自增长
9: || (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc));
10: }

RouterUtil.java 处理 SQL 考虑性能,实现会比较 C-style,代码咱就不贴了,传送门:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/route/util/RouterUtil.java。 (😈该仓库从官方 Fork,逐步完善中文注释,欢迎 Star)

【 7 - 11 】

前置路由处理全局序列号时,添加到全局序列处理器(MyCATSequnceProcessor)。该处理器会异步生成 ID,替换 SQL 内的 NEXT VALUE FOR MYCATSEQ_ 正则。例如:

insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
===>
insert into table (id, name) values (868348974560579584, 'name')

异步处理完后,调用 ServerConnection#routeEndExecuteSQL(sql, type, schema) 方法重新执行 SQL。

核心代码如下:

 1: // ⬇️⬇️⬇️【RouterUtil.java】
2: public static void processSQL(ServerConnection sc,SchemaConfig schema,String sql,int sqlType){
3: SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);
4: MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);
5: }
6: // ⬇️⬇️⬇️【MyCATSequnceProcessor.java】
7: public class MyCATSequnceProcessor {
8: private LinkedBlockingQueue<SessionSQLPair> seqSQLQueue = new LinkedBlockingQueue<SessionSQLPair>();
9: private volatile boolean running=true;
10:
11: public void addNewSql(SessionSQLPair pair) {
12: seqSQLQueue.add(pair);
13: }
14:
15: private void executeSeq(SessionSQLPair pair) {
16: try {
17:
18: // 使用Druid解析器实现sequence处理 @兵临城下
19: DruidSequenceHandler sequenceHandler = new DruidSequenceHandler(MycatServer
20: .getInstance().getConfig().getSystem().getSequnceHandlerType());
21:
22: // 生成可执行 SQL :目前主要是生成 id
23: String charset = pair.session.getSource().getCharset();
24: String executeSql = sequenceHandler.getExecuteSql(pair.sql,charset == null ? "utf-8":charset);
25:
26: // 执行 SQL
27: pair.session.getSource().routeEndExecuteSQL(executeSql, pair.type,pair.schema);
28: } catch (Exception e) {
29: LOGGER.error("MyCATSequenceProcessor.executeSeq(SesionSQLPair)",e);
30: pair.session.getSource().writeErrMessage(ErrorCode.ER_YES,"mycat sequnce err." + e);
31: return;
32: }
33: }
34:
35: class ExecuteThread extends Thread {
36:
37: public ExecuteThread() {
38: setDaemon(true); // 设置为后台线程,防止throw RuntimeExecption进程仍然存在的问题
39: }
40:
41: public void run() {
42: while (running) {
43: try {
44: SessionSQLPair pair=seqSQLQueue.poll(100,TimeUnit.MILLISECONDS);
45: if(pair!=null){
46: executeSeq(pair);
47: }
48: } catch (Exception e) {
49: LOGGER.warn("MyCATSequenceProcessor$ExecutorThread",e);
50: }
51: }
52: }
53: }
54: }

❓此处有个疑问:MyCATSequnceProcessor 是单线程,会不会插入性能有一定的影响?后续咱做下性能测试。

4. 获得 MySQL 连接,执行 SQL

【单库单表】插入(03执行 SQL)

【 1 - 8 】

获得 MySQL 连接。

  • PhysicalDBNode :物理数据库节点。
  • PhysicalDatasource :物理数据库数据源。

【 9 - 13 】

发送 SQL 到 MySQL Server,执行 SQL。

5. 响应执行 SQL 结果

【单库单表】插入(04执行响应)

【 1 - 4 】

处理 MySQL Server 响应数据包。

【 5 - 8 】

发送插入成功结果给 MySQL Client。

666. 彩蛋

知识星球

文章目录
  1. 1. 1. 概述
  2. 2. 2. 接收请求,解析 SQL
    1. 2.1. 【 1 - 2 】
    2. 2.2. 【 3 】
  3. 3. 3. 获得路由结果
    1. 3.1. 【 1 - 2 】【 12 】
    2. 3.2. 【 3 - 6 】
    3. 3.3. 【 7 - 11 】
  4. 4. 4. 获得 MySQL 连接,执行 SQL
    1. 4.1. 【 1 - 8 】
    2. 4.2. 【 9 - 13 】
  5. 5. 5. 响应执行 SQL 结果
    1. 5.1. 【 1 - 4 】
    2. 5.2. 【 5 - 8 】
  6. 6. 666. 彩蛋