⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/MyCAT/single-db-single-table-select/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 MyCAT 1.6.5 正式版


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 概述

内容形态以 顺序图 + 核心代码 为主。
如果有地方表述不错误或者不清晰,欢迎留言。
对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。

本文讲解 【单库单表】查询 所涉及到的代码。

😂内容和 《MyCAT 源码分析 —— 【单库单表】插入》 超级相似,一方面本身流程基本相同,另外一方面文章结构没拆分好。我们使用 🚀 标记差异的逻辑。

交互如下图:

单库单表查询简图

整个过程,MyCAT Server 流程如下:

  1. 接收 MySQL Client 请求,解析 SQL。
  2. 获得路由结果,进行路由。
  3. 获得 MySQL 连接,执行 SQL。
  4. 响应执行结果,发送结果给 MySQL Client。

我们逐个步骤分析,一起来看看源码。

2. 接收请求,解析 SQL

【单库单表】查询(01主流程)

【1 - 2】

接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。

【3】

 1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】
2: public class FrontendCommandHandler implements NIOHandler {
3:
4: @Override
5: public void handle(byte[] data) {
6:
7: // .... 省略部分代码
8: switch (data[4]) //
9: {
10: case MySQLPacket.COM_INIT_DB:
11: commands.doInitDB();
12: source.initDB(data);
13: break;
14: case MySQLPacket.COM_QUERY: // 查询命令
15: // 计数查询命令
16: commands.doQuery();
17: // 执行查询命令
18: source.query(data);
19: break;
20: case MySQLPacket.COM_PING:
21: commands.doPing();
22: source.ping();
23: break;
24: // .... 省略部分case
25: }
26: }
27:
28: }

INSERT/SELECT/UPDATE/DELETE 等 SQL 归属于 MySQLPacket.COM_QUERY,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 -> 服务器)》

【4】

将 二进制数组 解析成 SQL。核心代码如下:

 1: // ⬇️⬇️⬇️【FrontendConnection.java】
2: public void query(byte[] data) {
3: // 取得语句
4: String sql = null;
5: try {
6: MySQLMessage mm = new MySQLMessage(data);
7: mm.position(5);
8: sql = mm.readString(charset);
9: } catch (UnsupportedEncodingException e) {
10: writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
11: return;
12: }
13: // 执行语句
14: this.query( sql );
15: }

【5】

解析 SQL 类型。核心代码如下:

 1: // ⬇️⬇️⬇️【ServerQueryHandler.java】
2: @Override
3: public void query(String sql) {
4: // 解析 SQL 类型
5: int rs = ServerParse.parse(sql);
6: int sqlType = rs & 0xff;
7:
8: switch (sqlType) {
9: //explain sql
10: case ServerParse.EXPLAIN:
11: ExplainHandler.handle(sql, c, rs >>> 8);
12: break;
13: // .... 省略部分case
14: break;
15: case ServerParse.SELECT:
16: SelectHandler.handle(sql, c, rs >>> 8);
17: break;
18: // .... 省略部分case
19: default:
20: if(readOnly){
21: LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
22: c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
23: break;
24: }
25: c.execute(sql, rs & 0xff);
26: }
27: }
28:
29:
30: // ⬇️⬇️⬇️【ServerParse.java】
31: public static int parse(String stmt) {
32: int length = stmt.length();
33: //FIX BUG FOR SQL SUCH AS /XXXX/SQL
34: int rt = -1;
35: for (int i = 0; i < length; ++i) {
36: switch (stmt.charAt(i)) {
37: // .... 省略部分case case 'I':
38: case 'i':
39: rt = insertCheck(stmt, i);
40: if (rt != OTHER) {
41: return rt;
42: }
43: continue;
44: // .... 省略部分case
45: case 'S':
46: case 's':
47: rt = sCheck(stmt, i);
48: if (rt != OTHER) {
49: return rt;
50: }
51: continue;
52: // .... 省略部分case
53: default:
54: continue;
55: }
56: }
57: return OTHER;
58: }

🚀【6】【7】

解析 Select SQL 类型,分发到对应的逻辑。核心代码如下:

 1: // ⬇️⬇️⬇️【SelectHandler.java】
2: public static void handle(String stmt, ServerConnection c, int offs) {
3: int offset = offs;
4: switch (ServerParseSelect.parse(stmt, offs)) { // 解析 Select SQL 类型
5: case ServerParseSelect.VERSION_COMMENT: // select @@VERSION_COMMENT;
6: SelectVersionComment.response(c);
7: break;
8: case ServerParseSelect.DATABASE: // select DATABASE();
9: SelectDatabase.response(c);
10: break;
11: case ServerParseSelect.USER: // select CURRENT_USER();
12: SelectUser.response(c);
13: break;
14: case ServerParseSelect.VERSION: // select VERSION();
15: SelectVersion.response(c);
16: break;
17: case ServerParseSelect.SESSION_INCREMENT: // select @@session.auto_increment_increment;
18: SessionIncrement.response(c);
19: break;
20: case ServerParseSelect.SESSION_ISOLATION: // select @@session.tx_isolation;
21: SessionIsolation.response(c);
22: break;
23: case ServerParseSelect.LAST_INSERT_ID: // select LAST_INSERT_ID();
24: // ....省略代码
25: break;
26: case ServerParseSelect.IDENTITY: // select @@identity
27: // ....省略代码
28: break;
29: case ServerParseSelect.SELECT_VAR_ALL: //
30: SelectVariables.execute(c,stmt);
31: break;
32: case ServerParseSelect.SESSION_TX_READ_ONLY: //
33: SelectTxReadOnly.response(c);
34: break;
35: default: // 其他,例如 select * from table
36: c.execute(stmt, ServerParse.SELECT);
37: }
38: }
39: // ⬇️⬇️⬇️【ServerParseSelect.java】
40: public static int parse(String stmt, int offset) {
41: int i = offset;
42: for (; i < stmt.length(); ++i) {
43: switch (stmt.charAt(i)) {
44: case ' ':
45: continue;
46: case '/':
47: case '#':
48: i = ParseUtil.comment(stmt, i);
49: continue;
50: case '@':
51: return select2Check(stmt, i);
52: case 'D':
53: case 'd':
54: return databaseCheck(stmt, i);
55: case 'L':
56: case 'l':
57: return lastInsertCheck(stmt, i);
58: case 'U':
59: case 'u':
60: return userCheck(stmt, i);
61: case 'C':
62: case 'c':
63: return currentUserCheck(stmt, i);
64: case 'V':
65: case 'v':
66: return versionCheck(stmt, i);
67: default:
68: return OTHER;
69: }
70: }
71: return OTHER;
72: }

【8】

执行 SQL,详细解析见下文,核心代码如下:

 1: // ⬇️⬇️⬇️【ServerConnection.java】
2: public class ServerConnection extends FrontendConnection {
3: public void execute(String sql, int type) {
4: // .... 省略代码
5: SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);
6: if (schema == null) {
7: writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,
8: "Unknown MyCAT Database '" + db + "'");
9: return;
10: }
11:
12: // .... 省略代码
13:
14: // 路由到后端数据库,执行 SQL
15: routeEndExecuteSQL(sql, type, schema);
16: }
17:
18: public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
19: // 路由计算
20: RouteResultset rrs = null;
21: try {
22: rrs = MycatServer
23: .getInstance()
24: .getRouterservice()
25: .route(MycatServer.getInstance().getConfig().getSystem(),
26: schema, type, sql, this.charset, this);
27:
28: } catch (Exception e) {
29: StringBuilder s = new StringBuilder();
30: LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);
31: String msg = e.getMessage();
32: writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
33: return;
34: }
35:
36: // 执行 SQL
37: if (rrs != null) {
38: // session执行
39: session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
40: }
41:
42: }
43:
44: }

3. 获得路由结果

【单库单表】插入(02获取路由)

【 1 - 5 】

获得路由主流程。核心代码如下:

 1: // ⬇️⬇️⬇️【SelectHandler.java】
2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
3: int sqlType, String stmt, String charset, ServerConnection sc)
4: throws SQLNonTransientException {
5: RouteResultset rrs = null;
6:
7: // SELECT 类型的SQL, 检测缓存是否存在
8: if (sqlType == ServerParse.SELECT) {
9: cacheKey = schema.getName() + stmt;
10: rrs = (RouteResultset) sqlRouteCache.get(cacheKey);
11: if (rrs != null) {
12: checkMigrateRule(schema.getName(),rrs,sqlType);
13: return rrs;
14: }
15: }
16: }
17:
18: // .... 省略代码
19: int hintLength = RouteService.isHintSql(stmt);
20: if(hintLength != -1){ // TODO 待读:hint
21: // .... 省略代码
22: }
23: } else {
24: stmt = stmt.trim();
25: rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,
26: charset, sc, tableId2DataNodeCache);
27: }
28:
29: // 记录查询命令路由结果缓存
30: if (rrs != null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) {
31: sqlRouteCache.putIfAbsent(cacheKey, rrs);
32: }
33: // .... 省略代码 return rrs;
34: }
35: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
36: @Override
37: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
38: String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
39:
40: // .... 省略代码
41:
42: // 处理一些路由之前的逻辑;全局序列号,父子表插入
43: if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
44: return null;
45: }
46:
47: // .... 省略代码
48:
49: // 检查是否有分片
50: if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
51: rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
52: } else {
53: RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
54: if (returnedSet == null) {
55: rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
56: }
57: }
58:
59: return rrs;
60: }

🚀【3】第 7 至 16 行 :当 Select SQL 存在路由结果缓存时,直接返回缓存。 🚀【6】第 29 至 32 行 :记录 Select SQL 路由结果到缓存。

路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。

4. 获得 MySQL 连接,执行 SQL

【单库单表】查询(03执行 SQL)

【 1 - 8 】

获得 MySQL 连接。

  • PhysicalDBNode :物理数据库节点。
  • PhysicalDatasource :物理数据库数据源。

【 9 - 13 】

发送 SQL 到 MySQL Server,执行 SQL。

🚀 5. 响应执行 SQL 结果

【单库单表】查询(04执行响应)

核心代码如下:

 1: // ⬇️⬇️⬇️【MySQLConnectionHandler.java】
2: @Override
3: protected void handleData(byte[] data) {
4: switch (resultStatus) {
5: case RESULT_STATUS_INIT:
6: switch (data[4]) {
7: case OkPacket.FIELD_COUNT:
8: handleOkPacket(data);
9: break;
10: case ErrorPacket.FIELD_COUNT:
11: handleErrorPacket(data);
12: break;
13: case RequestFilePacket.FIELD_COUNT:
14: handleRequestPacket(data);
15: break;
16: default: // 初始化 header fields
17: resultStatus = RESULT_STATUS_HEADER;
18: header = data;
19: fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data,
20: 4));
21: }
22: break;
23: case RESULT_STATUS_HEADER:
24: switch (data[4]) {
25: case ErrorPacket.FIELD_COUNT:
26: resultStatus = RESULT_STATUS_INIT;
27: handleErrorPacket(data);
28: break;
29: case EOFPacket.FIELD_COUNT: // 解析 fields 结束
30: resultStatus = RESULT_STATUS_FIELD_EOF;
31: handleFieldEofPacket(data);
32: break;
33: default: // 解析 fields
34: fields.add(data);
35: }
36: break;
37: case RESULT_STATUS_FIELD_EOF:
38: switch (data[4]) {
39: case ErrorPacket.FIELD_COUNT:
40: resultStatus = RESULT_STATUS_INIT;
41: handleErrorPacket(data);
42: break;
43: case EOFPacket.FIELD_COUNT: // 解析 每行记录 结束
44: resultStatus = RESULT_STATUS_INIT;
45: handleRowEofPacket(data);
46: break;
47: default: // 每行记录
48: handleRowPacket(data);
49: }
50: break;
51: default:
52: throw new RuntimeException("unknown status!");
53: }
54: }

6. 其他 :更新 / 删除

流程基本和 《MyCAT源码分析:【单库单表】插入》 相同。我们就不另外文章解析。

666. 彩蛋

知识星球

文章目录
  1. 1. 1. 概述
  2. 2. 2. 接收请求,解析 SQL
    1. 2.1. 【1 - 2】
    2. 2.2. 【3】
    3. 2.3. 【4】
    4. 2.4. 【5】
    5. 2.5. 🚀【6】【7】
    6. 2.6. 【8】
  3. 3. 3. 获得路由结果
    1. 3.1. 【 1 - 5 】
  4. 4. 4. 获得 MySQL 连接,执行 SQL
    1. 4.1. 【 1 - 8 】
    2. 4.2. 【 9 - 13 】
  5. 5. 🚀 5. 响应执行 SQL 结果
  6. 6. 6. 其他 :更新 / 删除
  7. 7. 666. 彩蛋