Mysql, BigQuery, AthenaをCLI上で処理する
はじめに
社内のデータを見てトラブルシュートすることが多いのですが、コマンド1つでこれらのデータを元にほしい情報が得られる調査用scriptを作成し、自分含め他メンバーにも配布して利用してもらうということを行いました。 その際に得た知見や工夫した点をまとめます。
Mysql
gem mysql2
を利用しました。
MysqlはSSH gateway経由で接続しますが、利用頻度が高く複数のscriptで利用されるため、ラッパークラスを作成しました。
class HogeMysqlClient # 各種設定は別途yamlで読み込む CONFIG_DATASOURCE = YAML::load(ERB.new(IO.read(File.expand_path('../../config/datasource.yml', __FILE__))).result)[ENV['STAGE']].freeze def initialize # CONFIG_DATASOURCEのvalidationなど記載 ... @gateway = Net::SSH::Gateway.new( CONFIG_DATASOURCE['ssh']['host'], CONFIG_DATASOURCE['ssh']['name'], port: CONFIG_DATASOURCE['ssh']['port'], keys: CONFIG_DATASOURCE['ssh']['keys'] ) @local_port = @gateway.open(CONFIG_DATASOURCE['mysql']['host'], CONFIG_DATASOURCE['mysql']['port']) @client = Mysql2::Client.new( host: '127.0.0.1', username: CONFIG_DATASOURCE['mysql']['username'], password: CONFIG_DATASOURCE['mysql']['password'], database: CONFIG_DATASOURCE['mysql']['database'], encoding: 'utf8', port: @local_port, ) # ファイナライザを登録することでscript終了時にgatewayをcloseする ObjectSpace.define_finalizer(self) { @gateway.close(@local_port) } end end
queryメソッドを用意します。
symbolize_keys: true
をつけることでハッシュのキーがsymbolになるためscriptで扱いやすくなります。
def query(query) @client.query(query, symbolize_keys: true) end
今回はqueryをdisplayしつつ結果が欲しかったので以下のようなメソッドも用意しました。
def run_query_and_display(query) puts query query(query) end
呼び出し側
@mysql_client = HogeMysqlClient.new query = "SELECT ..." records = @mysql_client.run_query_and_display(query)
CLI上で結果を表形式で出力するため、 gem terminal-table
を使います。
rows = records.map { |record| record.values } puts Terminal::Table.new title: "User", headings: records.fields, rows: rows
ちなみに特定のカラムに対しては以下のように値を取得できたりします。
user_ids = records.map { |record| record[:user_id] }
BigQuery
今回は gem open3
を利用して bq query
の標準出力を受け取る形にしました。
※bq queryコマンドには[gcloudのsetupが必要です。] (http://cloud.google.com/sdk/docs/?hl=ja)
bq queryコマンドをそのままputsすれば表形式で結果が得られます。
query = "SELECT ..." puts Open3.capture3(%{bq query --use_legacy_sql=false "#{query}"})
ただしこれだと以下の問題があります。
- 特定のカラムを処理できない
- 返り値が表形式の文字列のため
- Titleがつけられない
- Mysqlをterminal-tebleで表示する際にはTitleをつけられる
この場合は bq query
にformatを指定することでjson形式で結果を受け取ることができます。
stdout, stderr, status = Open3.capture3(%{bq query --use_legacy_sql=false --format=prettyjson "#{query}"}) records = JSON.parse(stdout) rows = records.map { |record| record.values } puts Terminal::Table.new title: "User", headings: records.first.keys, rows: rows
こんな感じで表示しつつ、カラム毎に処理できるようになります。 が、これにもまだ欠点があります。 それはJSON.parseしたときにSELECTした順番が保証されないため、これを出力しようとするとカラムの順序がバラバラになってしまうことです。
これに対処する場合は gem google-cloud-bigquery
を利用します。
bq_cli = Google::Cloud::Bigquery.new( project: "test-bq-xxxxx", #BigQueryのプロジェクトID ) query = "SELECT ..." records = bq_cli.query query rows = records.map { |record| record.values } puts Terminal::Table.new title: "User", headings: records.headers, rows: rows
スマートですね...!
ちなみに今回なぜ gem google-cloud-bigquery
を使っていないかというと、scriptの環境構築をdocker化することを見据えたときに、gemの認証周りが面倒になりそうだったからです。
おそらく認証用JSONファイルを手元に落として、Volumeする感じで行けるのかなという感じですがまだ試せていない。
Athena
gem aws-sdk-athena
を利用しました。
ATHENA_CLIENT = Aws::Athena::Client.new ATHENA_RETRY_INTERVAL = 1 ATHENA_TIMEOUT_SEC = 300 def run_athena_query(query) start_query_response = ATHENA_CLIENT.start_query_execution({ query_string: query, query_execution_context: { database: CONFIG_DATASOURCE['athena']['database'], }, result_configuration: { output_location: CONFIG_DATASOURCE['athena']['output_location'] }, }) batch_started_time = Time.new execution_id = start_query_response.query_execution_id loop do sleep(ATHENA_RETRY_INTERVAL) resp = ATHENA_CLIENT.batch_get_query_execution({ query_execution_ids: [execution_id] }) break unless resp.query_executions[0].status.state == "RUNNING" if (Time.now - batch_started_time).to_i >= ATHENA_TIMEOUT_SEC ATHENA_CLIENT.stop_query_execution({ query_execution_id: execution_id }) raise RuntimeError, "queryの実行時間が長すぎるため処理を中断しました。" end end ATHENA_CLIENT.get_query_results({ query_execution_id: execution_id }) end query = "SELECT ..." query_response = run_athena_query(query)
急にコード量増えたぞオイという感じですが、ここではqueryを投げつつ以下の処理も実装しています。
- 1秒毎にjobのstatusを確認して完了を待つ
- 300秒経っても完了しない場合はtimeoutする
AWS SDKにはWaiterというclassがあり、これを継承しているClientは wait_untill
という上記機能を持ったmethodが使えるのですが、現時点ではAthenaは未対応だったため自前で行っています。
また、 get_query_results
の返り値は非常に扱いづらいです。
そのためMysqlなどと同じように扱えるようformatしました。
def parse_athena_query_response(query_response) rows = query_response.result_set.rows # headerのみの場合 return [] if rows.size == 1 header = rows[0].data.map { |row| row.var_char_value } rows[1..-1].map do |row| row.data.each_with_index.reduce({}) { |acc, (value, idx)| acc[header[idx].to_sym] = value.var_char_value; acc; } end end
こうしちゃえば後は同じように扱えます。
records = parse_athena_query_response(query_response) puts Terminal::Table.new title: "User", headings: records.first.keys, rows: records.map { |record| record.values }