MySQLからRedshiftへLoadするタスクを作ってみた。その1(MySQL → S3) | Luigi Advent Calendar 2016 #23

MySQLからRedshiftへLoadするタスクを作ってみた。その1(MySQL → S3) | Luigi Advent Calendar 2016 #23

Clock Icon2016.12.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』23日目の内容となります。
今回と明日で実際にMySQLのデータソースをRedshiftに取り込むタスクを作っていきたいと思います。

先日22日目はCentral Schedulerを使ってみた でした。

基本MySQL等のデータソースからRedshiftにデータを取り込む際はS3に取り込み対象のCSV/TSVを配置し、S3に配置したファイルをRedshiftのCopyコマンドで取り込むといった流れが一般的です。 その為タスクとしてはMySQLからS3への配置・S3からRedshiftの取り込みといった種類のタスクを作成し、requireで繋いでいくと言ったことになりそうです。 今回はその中のMySQL抽出を行いS3への配置を行いたいと思います。

MySQLの抽出結果をS3に配置する

今までやってきたLuigiの特集でMySQLの抽出・S3の配置は各々やっていたので、それらをつなぎ合わせていくことになります。 また、その中で設定ファイルについても作成していき、同一のタスクが異なる環境でも動くようにしていきたいと思います。

参考エントリしては以下になります。
タスクの結果をS3に保存してみた | Luigi Advent Calendar 2016 #08 | Developers.IO
設定ファイルを記載してみた | Luigi Advent Calendar 2016 #07 | Developers.IO
MySQLからデータ取得してみた | Luigi Advent Calendar 2016 #11 | Developers.IO

コードの全容としては以下になります。

# -*- coding: utf-8 -*-
from logging import getLogger, StreamHandler, DEBUG

import luigi
import luigi.s3
import luigi.contrib.mysqldb
import os
import csv
logger = getLogger(__name__)
handler = StreamHandler()
handler.setLevel(DEBUG)
logger.setLevel(DEBUG)
logger.addHandler(handler)
try:
    import mysql
    from mysql.connector import errorcode
except ImportError as e:
    logger.warning(
        "Loading MySQL module without the python package mysql-connector-python. \
        This will crash at runtime if MySQL functionality is used.")


class SQLFile(luigi.ExternalTask):
    sql_file = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget(
            os.path.join(
                '{sql_file}'.format(
                    sql_file=self.sql_file
                )
            ), format=luigi.format.UTF8
        )


class extractMySQLTableToS3(luigi.Task):
    sql_file = luigi.Parameter(default="employee.sql")
    bucket_name = luigi.Parameter()
    host = luigi.Parameter()
    database = luigi.Parameter()
    password = luigi.Parameter()
    user = luigi.Parameter()

    def requires(self):
        return SQLFile(self.sql_file)

    def run(self):
        print("run start")
        mySqlTarget = luigi.contrib.mysqldb.MySqlTarget(
            host=self.host,
            database=self.database,
            password=self.password,
            user=self.user,
            table="dummy",
            update_id="sample")
        connection = mySqlTarget.connect()

        sql_list = []
        with self.input().open('r') as file_in:
            for line in file_in:
                sql_list.append(line)
        sql = ' '.join(sql_list)

        cursor = connection.cursor()
        try:
            cursor.execute(sql)
            with self.output().open('w') as out_file:
                writer = csv.writer(out_file, delimiter='\t', lineterminator='\n')
                writer.writerow([i[0] for i in cursor.description])
                while True:
                    rows = cursor.fetchmany(1000)
                    if not rows:
                        break
                    writer.writerows(rows)

        except mysql.connector.Error as e:
            if e.errno != errorcode.ER_NO_SUCH_TABLE:
                raise

    def output(self):
        return luigi.s3.S3Target("s3://{}/{}".format(self.bucket_name, self.sql_file.replace('.sql', '.tsv.gz')), format=luigi.format.Gzip)


if __name__ == '__main__':
    luigi.run()

今回の作例で新しく使ったluigiの機能としては、23行目に出てくるluigi.ExternalTaskがあります。 何者かというと当該のソースコードに下のような記載があります。

Subclass for references to external dependencies.

An ExternalTask's does not have a run implementation, which signifies to the framework that this Task's :py:meth:output is generated outside of Luigi.

Google翻訳をかけてみました。

外部依存関係への参照のためのサブクラス。 ExternalTask​​には run実装がありません。 このタスクの:py:meth: outputがLuigiの外部で生成されるというフレームワークです。

要はrunメソッドの無いtaskの実装ということになるようです。 今回のケースようにファイルを外部参照させたりする時に用いたりすることができそうです。

タスクの挙動としては参照元のMySQLの情報や配置先のバケット名はすべて設定ファイルに持っておき、 起動時は参照させたいSQLが記載してあるファイルをパラメータとして指定することで、 指定したSQLで抽出した結果のTSVファイルがS3に配されることになります。

実行結果は以下のようになります。

$ LUIGI_CONFIG_PATH=./luigi.cfg python ./MySQLtoRedshiftWithETL.py extractMySQLTableToS3 --sql-file titles.sql
$ aws s3 ls s3://cm-kajiwara-luigi-study
2016-12-23 07:12:00    1314935 titles.tsv.gz

まとめ

明日はこのタスクの後処理として、S3からRedshiftへの取り込みを実施し、ETL処理として完成させます。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.