embulkでMySQLに出力してみました

  • #技術ブログ 

今年も秋口に喉をやられております。ダニアレルギー疑いのうえピーです。
布団のダニ除去には、50度以上の熱乾燥とその後の掃除機掛けが効果的らしいです。
※くれぐれも掃除機掛けをお忘れなく、秋口にダニアレルギーが増えるのは、活動を停止したダニを吸い込むことが原因らしいので。

小ねたを挟んだ今回は、前々回「Batch版flentd、embulkでリベンジ!」の続きということで、MySQLに出力をしてみようと思います。

前々回紹介した以下の手順で、input-mysqlとoutput-mysqlのプラグインはGemで導入済み前提です。

$ cd ~
$ embulk mkbundle embulk_bundle
$ vim embulk_bundle/Gemfile
下記を追記。
gem 'embulk-output-command'
gem 'embulk-input-mysql'
gem 'embulk-output-mysql'

ymlの記載方法は、以下のプラグインREADMEを見ていただければ、問題ないかと思います。
となると、ありゃ、ブログに書くことなくなっちゃう??
MySQL input plugin for Embulk
MySQL output plugin for Embulk

実際書くことあまりないですが、今回作成したymlと実行コマンドのサンプル、ちょっとだけ工夫したところを語らせて下さいませ。

とりあえず、作成したyml以下2種類と実行コマンドです。

[ Salesforce -> MySQL ]
account_salesforce.yml
in:
  type: salesforce_bulk
  userName: ひ・み・つ
  password: ひ・み・つ
  authEndpointUrl: https://login.salesforce.com/services/Soap/u/34.0
  objectType: Account
  pollingIntervalMillisecond: 5000
  querySelectFrom: SELECT Id,IsDeleted,Name,CreatedDate,LastModifiedDate FROM Account
  queryOrder: Name desc
  columns:
  - {type: string, name: Id}
  - {type: boolean, name: IsDeleted}
  - {type: string, name: Name}
  - {type: timestamp, name: CreatedDate, format: '%FT%T.%L%Z'}
  - {type: timestamp, name: LastModifiedDate, format: '%FT%T.%L%Z'}
  startRowMarkerName: LastModifiedDate

out:
  type: mysql
  host: localhost
  user: ひ・み・つ
  password: "ひ・み・つ"
  database: ひ・み・つ
  table: test_account
  options: {connectTimeout: 20000}
  mode: merge

[ MySQL -> MySQL ]
test_mysql.yml
in:
  type: mysql
  host: ひ・み・つ
  user: ひ・み・つ
  password: "ひ・み・つ"
  database: ひ・み・つ
  table: "test"
  select: "uniqueid, name, logdate, upd_dtm"
  incremental: true
  incremental_columns: [uniqueid]
  column_options:
   uniqueid: {type: long}
   name: {type: string}
   logdate: {type: timestamp}
   upd_dtm: {type: timestamp}

out:
  type: mysql
  host: localhost
  user: ひ・み・つ
  password: "ひ・み・つ"
  database: ひ・み・つ
  table: test_merge
  options: {connectTimeout: 20000}
  mode: merge

[ 実行コマンド ]
$ embulk run -L ../embulk_bundle/embulk-input-salesforce_bulk-master/ -b ../embulk_bundle ./account_salesforce.yml -c ./next_salesforce.yml
$ embulk run -b ../embulk_bundle ./test_mysql.yml -c ./next_test_mysql.yml

工夫したところは、embulk のオプション -c で前回からの差分取り込みができるので、それを意識して yml を作成しているところです。
in:に
Salesforceであれば、startRowMarkerName: LastModifiedDate、
MySQLは、incremental: true、incremental_columns: [uniqueid]
を定義したって感じ。
※細かい説明は、READMEをご参照くださいませ。

そうだ、忘れてた、出力先テーブルカラム属性は、inと合わせております。

よし、ちょうどいい時間なので、今回はこれにて。
See you !!