はじめてのEMR/fluentdでS3にアップロードしたログをElastic MapReduceで検索・集計する
今回解決したい課題
こんにちは植木和樹です。本日の課題はタイトルの通り「fluentdでS3のバケットにアップロードしたログを検索・集計する」です。EC2でサーバを構築した場合、インスタンスがTerminateした後もログが参照できるようfluentdを使ってS3にファイルをアップロードしておくのがAWSのベストプラクティスです。
「Amazon Recommends Fluentd as “Best Practice for Data Collection” over Flume and Scribe」
しかしS3にアップロードしたログファイルはツールを用いなければアクセスすることができず、このままでは容易に検索できません。EC2からS3をs3fsでマウントしてgrepという方法はありますが、遅すぎて実用的とはいえません。s3cmdでいったんローカルにファイルをダウンロードしてから検索するという方法もありますが、ダウンロードの手間がかかってしまいます。
そこで今回はAmazon EMR(Elastic MapReduce)使ってログを検索・集計する方法を試してみました。ちなみ筆者はEMRを使うのは今回が初めてです。EMRについての技術的な解説はできませんので、行った手順だけご紹介したいと思います。
準備するもの
- AWSアカウント
- EC2のsshログインに使うキーペア
- fluentdを使ってS3のバケットにいくつかのログファイルが保存されていること(扱うのはApacheのアクセスログ)
なおS3には/<bucknet>/yyyy/mm/dd
というパスで、1日ごとにフォルダをわけてログが保存されているものとします。1日ごとのフォルダには、fluentdがログを書き込むごとにファイルが出力されるため複数のファイルがあります。
EMRのクラスターを作成する
まずはEMRのクラスターを作成します。マネージメントコンソールのダッシュボードから「Elastic MapReduce」を選択しEMRの画面を表示します。
[Create New Job Flow]をクリックして新しいジョブフローを作成します。
[Job Flow Name]を入力し[Run your own application]を選択します。アプリケーションのタイプは[Hive Program]を選択します。
今回はバッチモードでなく、コマンドを入力しながらの対話モードで実行するので[Start an Interactive Hive Session]を選択します。
EMRのクラスター全体をコントロールする役目をもつ"Master Instance"と実際に処理を行う"Core Instance"のインスタンスタイプと数を入力します。今回はお試しなのでインスタンスタイプはm1.small、コアインスタンスの数は1にします。
キーペアを選択します。このキーでEMRのマスターインスタンスにsshでログインすることになります。次にVPCのサブネットを選択しますが、PublicDNSで直接ログインしたいため今回はVPCは使いません。念のためデバッグログを有効にし”Termination Protection"や"Visible To All IAM Users"をチェックしておきました。
クラスター構築時の処理は特に必要ないので[Proceed with no Bootstrap Actions]を選択します。
[Create Job Flow]ボタンをクリックしてEMRクラスターを作成します。
[Close]ボタンをクリックしてジョブフローの作成を完了します。これでEMRのクラスターの構築が始まりますが、5〜10分程度時間がかかるので、しばらく待ちます。
EMRの画面に戻り、一覧に表示されたジョブフローのステータスが"STARTING"から"RUNNING"に変わったら準備完了です。
作成したジョブフローを選択するとマスターノードのエンドポイントが表示されます。このホスト名にsshで接続するとEMRのマスターノードにログインし、Hiveコマンドでの操作ができるようになります。sshでの接続はec2-userではなくhadoopユーザーになります。
ssh [email protected] $ hive
HiveコマンドでS3のログをテーブルにマッピングする
以降の作業は「EMRってなんじゃ?(ログ、ゆりかごから墓場まで)」を参考にさせていただきました。
Hiveを使うと処理対象のログをSQLのようなクエリーで操作できます。
まずはS3のログをテーブルにマッピングします。hiveプロンプトで以下のコマンドを実行します。S3に格納されたログファイルは1日ごとにフォルダが分かれていますが、クエリーするときには、それらをまとめて扱いたいと思います。そこでテーブルは1つにし、パーティションという形で1日ごとのフォルダを対応付けることで実現しています。
なおfluentdで出力されるApacheのアクセスログはタイムスタンプ < TAB > タグ < TAB > JSONというようにタブで分割された3つのフィールドになっています。
hive> CREATE EXTERNAL TABLE IF NOT EXISTS fluentLog (dt string, tag string, json string) PARTITIONED BY ( PT STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'; ALTER TABLE fluentLog ADD PARTITION ( pt='2013-08-28' ) LOCATION 's3://my-fluentd-test/2013/08/28'; ALTER TABLE fluentLog ADD PARTITION ( pt='2013-08-29' ) LOCATION 's3://my-fluentd-test/2013/08/29'; ALTER TABLE fluentLog ADD PARTITION ( pt='2013-08-30' ) LOCATION 's3://my-fluentd-test/2013/08/30'; ALTER TABLE fluentLog ADD PARTITION ( pt='2013-08-31' ) LOCATION 's3://my-fluentd-test/2013/08/31'; hive> show tables; OK fluentlog Time taken: 10.91 seconds, Fetched: 2 row(s) hive> desc fluentlog; OK dt string None tag string None json string None pt string None # Partition Information # col_name data_type comment pt string None Time taken: 0.534 seconds, Fetched: 9 row(s) hive> show partitions fluentlog; OK pt=2013-08-28 pt=2013-08-29 pt=2013-08-30 pt=2013-08-31 Time taken: 0.445 seconds, Fetched: 5 row(s)
年月日でわかれたS3上のフォルダをパーティションに対応させることができました。これで日付をまたいだ検索もできますし、この後にご紹介するようパーティション(年月日)を指定して検索することもできます。
Hiveコマンドでログを検索する
Hive QLというSQLによく似たクエリーでログファイルを検索することができます。まずはCSVを検索してみましょう。次のクエリーを実行します。
SELECT unix_timestamp(), dt, host, user, method, path, code, size, referer, CONCAT('"', agent ,'"') FROM fluentLog LATERAL VIEW json_tuple(fluentLog.json, 'host', 'user', 'method', 'path', 'code', 'size', 'referer', 'agent') j AS host, user, method, path, code, size, referer, agent WHERE agent LIKE '%AppleWebKit%' ORDER BY dt;
SQLでは見かけない「LATERAL VIEW」と「json_tuple」キーワードが出てきています。LATERAL VIEWはユーザー関数の実行結果をビューとして扱うためのキーワードで、今回の場合はjson_tupleで展開されたJSONをビューとして参照できます。json_tupleはfluentdテーブルのjson列に格納されたJSON文字列をパースして値を返す関数です。つまりこのクエリーではカラムに含まれたJSONを分解して、あたかも事前定義されたテーブルのように扱っています。
Hiveコマンドでログを検索し結果をS3のファイルへ出力する
検索した結果は画面に表示されますが、「出力テーブル」を定義することでS3に保存することができます。まずは下記を実行してarchivelog_201308テーブルをS3上のフォルダs3://my-fluentd-emr/archives/2013/08に紐付けます。
CREATE EXTERNAL TABLE IF NOT EXISTS archivelog_201308 ( version bigint, dt string, host string, user string, method string, path string, code string, size string, referer string, agent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 's3://my-fluentd-emr/archives/2013/08';
次に先ほどのSELECTクエリーを実行しますが、SELECT文の前にINSERT句を加えることで実行結果をテーブルに出力します。
INSERT OVERWRITE TABLE archivelog_201308 SELECT unix_timestamp(), dt, host, user, method, path, code, size, referer, agent FROM fluentLog LATERAL VIEW json_tuple(fluentLog.json, 'host', 'user', 'method', 'path', 'code', 'size', 'referer', 'agent') j AS host, user, method, path, code, size, referer, agent WHERE pt >= '2013-08-01' AND pt <= '2013-08-31' ORDER BY dt;
Hiveコマンドでログを集計し結果をS3のファイルへ出力する
集計もできます。Apacheのステータスコードごとに件数を集計してみましょう。まずは下記を実行してstatusSummary_201308テーブルをS3上のフォルダs3://my-fluentd-emr/summary/2013/08に紐付けます。
hive> CREATE EXTERNAL TABLE IF NOT EXISTS statusSummary_201308 (code string, count bigint) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 's3://my-fluentd-emr/summary/2013/08';
次に作成した出力テーブルにクエリーの実行結果をINSERTすると、S3にファイルが出力されます。
INSERT OVERWRITE TABLE statusSummary_201308 SELECT code,count(1) FROM fluentLog LATERAL VIEW json_tuple(fluentLog.json, 'host', 'user', 'method', 'path', 'code', 'size', 'referer', 'agent') j AS host,user,method, path, code, size, referer, agent WHERE pt >= '2013-08-28' AND pt <= '2013-08-31' GROUP BY code;
S3に接続してs3://my-fluentd-emr/summary/2013/08に出力されたCSVファイルを見てみましょう。
\N,1 200,1 304,2 400,5 403,11 404,136
ちゃんと集計結果が出力されていますね!
まとめ
Elastic MapReduceとHiveを使ってfluentdのログを直接検索する方法を試してみました。EMRは初めて触ってみたのですが、Hiveを使うと見慣れたSQLを使ってデータ操作が行えるため、当初想像していたよりも簡単にMapReduceを経験することができました。ただHiveでS3のファイルを検索する方法は簡単は簡単なのですが、やはり直接S3のファイルを操作しているので遅いです。手軽にgrep間隔で使うにはコストが高すぎます。
やはりEMRは大量のデータを一斉に処理して別の形式に変換する目的で使うのが良さそうですね。最近ではAmazon RedshiftというデータウェアハウスにデータをインポートしてBIツールで検索するのが良さそうです。今度はそちらも試してみたいと思います。