前回 MySQL X Protocol で使用している Protobuf について書きましたが、それだけでは MySQL のプロトコルは解析できません。
TCP を流れるデータは区切りがないので、書き込み側が Protobuf データをただ垂れ流しても、読み込む側がどう読んで良いのかわかりません。
書き込むデータの大きさと、書き込む Protobuf データの型を相手に伝える必要があります。
MySQL X Protocol のパケットは次のようになっているようです。
┌────┬─────────────── │size(4) │type(1) + Protobuf(size-1) └────┴───────────────
最初の4バイト(リトルエンディアン)で続くデータ部のサイズを示します。 データ部の先頭1バイトは Protobuf データの型を示します。
Protobuf データの型は、クライアントから送るデータは ClientMessages::Type で、サーバーから送るデータは ServerMessages::Type に enum で定義されています。
TCP 上を流れるデータの形式がわかったので、あとは、どの型のデータがどのタイミングでサーバー/クライアントのどちらから送られるかがわかればいいです。
ドキュメント https://dev.mysql.com/doc/internals/en/x-protocol.html もありますが、実際に mysqlsh の通信を見てみるのが手っ取り早いかもしれません。
次のような MySQL X Protocol を中継するプログラムを作って動かしてみました。
require 'mysqlx.pb' require 'socket' ClientMessage = {} Mysqlx::ClientMessages::Type.constants.each do |c| v = Mysqlx::ClientMessages::Type.const_get(c) if v.is_a? Protobuf::Enum ClientMessage[v.to_i] = c end end ServerMessage = {} Mysqlx::ServerMessages::Type.constants.each do |c| v = Mysqlx::ServerMessages::Type.const_get(c) if v.is_a? Protobuf::Enum ServerMessage[v.to_i] = c end end localport, host, port = ARGV def relay(r, w, from) while true head = r.read(5) break unless head && head.length == 5 size, type = head.unpack('VC') if from == :client puts "C: #{ClientMessage[type] || type}" else puts "S: #{ServerMessage[type] || type}" end data = r.read(size-1) break unless data && data.length == size-1 w.write(head + data) end rescue => e p e end Socket.tcp_server_loop(localport) do |client, _addrinfo| server = TCPSocket.new(host, port) Thread.new(client) do |_client| relay(_client, server, :client) end Thread.new(client) do |_client| relay(server, _client, :server) end end
33061 ポートで待ち受けて 127.0.0.1 の 33060 に中継するように動かします。
% ruby -I. ./mysqlx-relay.rb 33061 127.0.0.1 33060
別の端末から mysqlsh を次のように起動します。
% mysqlsh --uri mysql://hoge@127.0.0.1:33061/test --sql Creating a Node Session to hoge@127.0.0.1:33061/test Enter password: Default schema `test` accessible through db.
mysql-sql>
プロンプトが出るまでのパケット。結構多い…。
C: CON_CAPABILITIES_GET S: CONN_CAPABILITIES C: SESS_AUTHENTICATE_START S: SESS_AUTHENTICATE_CONTINUE C: SESS_AUTHENTICATE_CONTINUE S: NOTICE S: SESS_AUTHENTICATE_OK C: SQL_STMT_EXECUTE S: RESULTSET_COLUMN_META_DATA S: RESULTSET_ROW S: RESULTSET_ROW S: RESULTSET_FETCH_DONE S: NOTICE S: SQL_STMT_EXECUTE_OK C: SQL_STMT_EXECUTE S: RESULTSET_COLUMN_META_DATA S: RESULTSET_COLUMN_META_DATA S: RESULTSET_ROW S: RESULTSET_FETCH_DONE S: NOTICE S: SQL_STMT_EXECUTE_OK C: SQL_STMT_EXECUTE S: RESULTSET_COLUMN_META_DATA S: RESULTSET_ROW S: RESULTSET_FETCH_DONE S: NOTICE S: SQL_STMT_EXECUTE_OK C: SQL_STMT_EXECUTE S: NOTICE S: SQL_STMT_EXECUTE_OK C: SQL_STMT_EXECUTE S: RESULTSET_COLUMN_META_DATA S: RESULTSET_ROW S: RESULTSET_FETCH_DONE S: NOTICE S: SQL_STMT_EXECUTE_OK C: SQL_STMT_EXECUTE S: RESULTSET_COLUMN_META_DATA S: RESULTSET_COLUMN_META_DATA S: RESULTSET_ROW S: RESULTSET_FETCH_DONE S: SQL_STMT_EXECUTE_OK C: SQL_STMT_EXECUTE S: RESULTSET_COLUMN_META_DATA S: RESULTSET_ROW S: RESULTSET_FETCH_DONE S: NOTICE S: SQL_STMT_EXECUTE_OK C: SQL_STMT_EXECUTE S: RESULTSET_COLUMN_META_DATA S: RESULTSET_COLUMN_META_DATA S: RESULTSET_ROW S: RESULTSET_FETCH_DONE S: SQL_STMT_EXECUTE_OK
SELECT したり、
mysql-sql> SELECT * FROM t;
C: SQL_STMT_EXECUTE S: RESULTSET_COLUMN_META_DATA S: RESULTSET_COLUMN_META_DATA S: RESULTSET_ROW S: RESULTSET_ROW S: RESULTSET_ROW S: RESULTSET_FETCH_DONE S: NOTICE S: SQL_STMT_EXECUTE_OK
INSERT したり、
mysql-sql> INSERT INTO t (id, value) VALUES (1, 'abc'),(2,'def');
C: SQL_STMT_EXECUTE S: NOTICE S: NOTICE S: SQL_STMT_EXECUTE_OK
いい感じに動いてるようなので、あとは色々試してみます。