命短し学べよTechを

Tech系について学んだことやあれやそれを記録します。

Mysql, BigQuery, AthenaをCLI上で処理する

はじめに

社内のデータを見てトラブルシュートすることが多いのですが、コマンド1つでこれらのデータを元にほしい情報が得られる調査用scriptを作成し、自分含め他メンバーにも配布して利用してもらうということを行いました。 その際に得た知見や工夫した点をまとめます。

Mysql

gem mysql2 を利用しました。 MysqlSSH gateway経由で接続しますが、利用頻度が高く複数のscriptで利用されるため、ラッパークラスを作成しました。

class HogeMysqlClient

  # 各種設定は別途yamlで読み込む
  CONFIG_DATASOURCE = YAML::load(ERB.new(IO.read(File.expand_path('../../config/datasource.yml', __FILE__))).result)[ENV['STAGE']].freeze

  def initialize

    # CONFIG_DATASOURCEのvalidationなど記載
    ...

    @gateway = Net::SSH::Gateway.new(
      CONFIG_DATASOURCE['ssh']['host'],
      CONFIG_DATASOURCE['ssh']['name'],
      port: CONFIG_DATASOURCE['ssh']['port'],
      keys: CONFIG_DATASOURCE['ssh']['keys']
    )
    @local_port = @gateway.open(CONFIG_DATASOURCE['mysql']['host'], CONFIG_DATASOURCE['mysql']['port'])

    @client = Mysql2::Client.new(
      host: '127.0.0.1',
      username: CONFIG_DATASOURCE['mysql']['username'],
      password: CONFIG_DATASOURCE['mysql']['password'],
      database: CONFIG_DATASOURCE['mysql']['database'],
      encoding: 'utf8',
      port: @local_port,
    )
    # ファイナライザを登録することでscript終了時にgatewayをcloseする
    ObjectSpace.define_finalizer(self) { @gateway.close(@local_port) }
  end
end

queryメソッドを用意します。 symbolize_keys: true をつけることでハッシュのキーがsymbolになるためscriptで扱いやすくなります。

  def query(query)
    @client.query(query, symbolize_keys: true)
  end

今回はqueryをdisplayしつつ結果が欲しかったので以下のようなメソッドも用意しました。

  def run_query_and_display(query)
    puts query
    query(query)
  end

呼び出し側

@mysql_client = HogeMysqlClient.new

query = "SELECT ..."
records = @mysql_client.run_query_and_display(query)

CLI上で結果を表形式で出力するため、 gem terminal-table を使います。

rows = records.map { |record| record.values }
puts Terminal::Table.new title: "User", headings: records.fields, rows: rows

ちなみに特定のカラムに対しては以下のように値を取得できたりします。

user_ids = records.map { |record| record[:user_id] }

BigQuery

今回は gem open3 を利用して bq query の標準出力を受け取る形にしました。

※bq queryコマンドには[gcloudのsetupが必要です。] (http://cloud.google.com/sdk/docs/?hl=ja)

bq queryコマンドをそのままputsすれば表形式で結果が得られます。

query = "SELECT ..."
puts Open3.capture3(%{bq query --use_legacy_sql=false "#{query}"})

ただしこれだと以下の問題があります。

  • 特定のカラムを処理できない
    • 返り値が表形式の文字列のため
  • Titleがつけられない
    • Mysqlをterminal-tebleで表示する際にはTitleをつけられる

この場合は bq query にformatを指定することでjson形式で結果を受け取ることができます。

stdout, stderr, status = Open3.capture3(%{bq query --use_legacy_sql=false --format=prettyjson "#{query}"})
records = JSON.parse(stdout)

rows = records.map { |record| record.values }
puts Terminal::Table.new title: "User", headings: records.first.keys, rows: rows

こんな感じで表示しつつ、カラム毎に処理できるようになります。 が、これにもまだ欠点があります。 それはJSON.parseしたときにSELECTした順番が保証されないため、これを出力しようとするとカラムの順序がバラバラになってしまうことです。

これに対処する場合は gem google-cloud-bigquery を利用します。

bq_cli = Google::Cloud::Bigquery.new(
  project: "test-bq-xxxxx", #BigQueryのプロジェクトID
)

query = "SELECT ..."
records = bq_cli.query query

rows = records.map { |record| record.values }
puts Terminal::Table.new title: "User", headings: records.headers, rows: rows

スマートですね...! ちなみに今回なぜ gem google-cloud-bigquery を使っていないかというと、scriptの環境構築をdocker化することを見据えたときに、gemの認証周りが面倒になりそうだったからです。 おそらく認証用JSONファイルを手元に落として、Volumeする感じで行けるのかなという感じですがまだ試せていない。

Athena

gem aws-sdk-athena を利用しました。

ATHENA_CLIENT = Aws::Athena::Client.new
ATHENA_RETRY_INTERVAL = 1
ATHENA_TIMEOUT_SEC = 300

def run_athena_query(query)
  start_query_response = ATHENA_CLIENT.start_query_execution({
    query_string: query,
    query_execution_context: {
      database: CONFIG_DATASOURCE['athena']['database'],
    },
    result_configuration: {
      output_location: CONFIG_DATASOURCE['athena']['output_location']
    },
  })

  batch_started_time = Time.new
  execution_id = start_query_response.query_execution_id
  loop do
    sleep(ATHENA_RETRY_INTERVAL)
    resp = ATHENA_CLIENT.batch_get_query_execution({ query_execution_ids: [execution_id] })
    break unless resp.query_executions[0].status.state == "RUNNING"

    if (Time.now - batch_started_time).to_i >= ATHENA_TIMEOUT_SEC
      ATHENA_CLIENT.stop_query_execution({ query_execution_id: execution_id })
      raise RuntimeError, "queryの実行時間が長すぎるため処理を中断しました。"
    end
  end

  ATHENA_CLIENT.get_query_results({ query_execution_id: execution_id })
end

query = "SELECT ..."
query_response = run_athena_query(query)

急にコード量増えたぞオイという感じですが、ここではqueryを投げつつ以下の処理も実装しています。

  • 1秒毎にjobのstatusを確認して完了を待つ
  • 300秒経っても完了しない場合はtimeoutする

AWS SDKにはWaiterというclassがあり、これを継承しているClientは wait_untill という上記機能を持ったmethodが使えるのですが、現時点ではAthenaは未対応だったため自前で行っています。

また、 get_query_results の返り値は非常に扱いづらいです。 そのためMysqlなどと同じように扱えるようformatしました。

def parse_athena_query_response(query_response)
  rows = query_response.result_set.rows

  # headerのみの場合
  return [] if rows.size == 1

  header = rows[0].data.map { |row| row.var_char_value }
  rows[1..-1].map do |row|
    row.data.each_with_index.reduce({}) { |acc, (value, idx)| acc[header[idx].to_sym] = value.var_char_value; acc; }
  end
end

こうしちゃえば後は同じように扱えます。

records = parse_athena_query_response(query_response)
puts Terminal::Table.new title: "User", headings: records.first.keys, rows: records.map { |record| record.values }