こんにちは!!こんにちは!!
インフラエンジニアのyamamotoです。
AWS CloudWatch Logs に貯めこんだログを、Kinesis Data Firehose を使って S3 に保管し、Athenaで検索しよう、と思ったらいろいろつまづいたのでまとめてみました。
きっかけ
当社の新プロジェクトで、ログをどげんかせんといかん、という話に。
ひとまず CloudWatch Logs に保存しておいて後でどうにかしようと思ったのですが、検索するにも保管するにも良くないので、S3に保管してAthenaで読めたらいいよねー、ということになりました。
しかし CloudWatch Logs のログを S3 に出そうとすると、手動での実行か、Lambdaでゴニョゴニョしないといけなさそうです。
もっとスマートに、逐次出力できないものか、と思って調べてみたところ、Kinesis Data Firehose を使ってストリーミングできるということが判明。
すごい。さすがはAWS。
ことの一部始終
Kinesis Data Firehoseのストリームを作成
まず、Kinesis Data Firehoseのストリームを作成します。
ここら辺は参考になるサイトがいっぱいあるので、そちらを見ながら設定。
CloudWatch Logsをサブスクリプション
CloudWatch Logsにはサブスクリプションという、外部出力の機能があります。
しかし、Kinesis Data Firehose に出力するにはコンソールからではできず、AWS APIで操作しなければなりません。
すごくないよAWS……
こちらのサイトを参考に登録しました。ありがとうございます。
https://christina04.hatenablog.com/entry/cloudwatch-logs-to-s3-via-firehose
ここまでは順調に進みましたが、この先でつまづきました。
S3に出力されるも……
ログがS3に出力されるようになりました。ただ、ログの内容がなんか変……
{"messageType": "DATA_MESSAGE","owner": "677754094491","logGroup": "/aws/elasticbeanstalk/hogehoge-rails","logStream": "production/rails/c6c65f81-1c88-4c02-8407-2e1d8bce69fa","subscriptionFilters": ["All"],"logEvents": [{"id": "34657843470841441528834034625987848118641882362345488384","timestamp": 1554111450640,"message": "{\"method\":\"GET\",\"path\":\"/hogehoge/products\",\"format\":\"json\",\"controller\":\"Hogehoge::ProductsController\",\"action\":\"show\",\"status\":200,\"duration\":1.41,\"view\":0.72,\"transaction_id\":\"bcfa33d20729b874d4f8\",\"params\":{},\"type\":\"RAILS\",\"host\":\"foobar.hogehoge.com\",\"remote_ip\":\"192.168.1.11\",\"ua\":\"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36\",\"request_id\":\"d2c5fd8c-a207-4fe1-b030-e58439073ace\",\"session_id\":null,\"timestamp\":\"2019-05-21T08:03:50+00:00\",\"timestamp_jst\":\"2019-05-21T17:03:50+09:00\"}"}]}
見た感じ、生ログが Cloudwatch Logs のログで包まれ、複数のログも全部が1行にまとまっていました。
Athenaで読み込ませるには、1行に一つのログがある形式にしないといけないとのことなので、このログをバラす必要があります。
Kinesis Data Firehose の Transform 機能
そこで、Kinesis Data Firehose にあるTransform機能を使うことにしました。
一言で言うと、流れてきたログをLambdaで加工できる機能です。
これを使ってログをバラそう思い、流れてくるログをキャッチしてみると、なにやらbase64っぽい文字列が……
単純にbase64デコードしても、バイナリになっていて中が読めない。ううむ。
調べてみると、CloudWatch Logがサブスクリプションとして出力されるログは、gzip圧縮されてBase64エンコードされて送られてくるようです。な、なんと……
これを解凍してJSONで読み込み、生ログ部分だけ取り出すスクリプトをLambdaに用意しました。
const zlib = require('zlib'); exports.handler = (event, context, callback) => { const output = event.records.map((record) => { const buf = zlib.gunzipSync(Buffer.from(record.data, 'base64')); const cwlogs = buf.toString('utf-8'); const cwlogsparsed = JSON.parse(cwlogs); let ret = ''; for (let i = 0; i < cwlogsparsed.logEvents.length; i += 1) { ret += `${cwlogsparsed.logEvents[i].message}\n`; } return { recordId: record.recordId, result: 'Ok', data: Buffer.from(ret, 'utf8').toString('base64'), }; }); callback(null, { records: output }); };
肝心のログは「logEvents」配列の「message」フィールドに入っており、その他の部分はCloudWatch Logsの情報になっているようです。
「logEvents」配列を for で回して「messages」フィールドの文字列だけを取り出し、改行を付けています。
で、
{"method":"GET","path":"/hogehoge/products","format":"json","controller":"Hogehoge::ProductsController","action":"show","status":200,"duration":1.41,"view":0.72,"transaction_id":"bcfa33d20729b874d4f8","params":{},"type":"RAILS","host":"foobar.hogehoge.com","remote_ip":"192.168.1.11","ua":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36","request_id":"d2c5fd8c-a207-4fe1-b030-e58439073ace","session_id":null,"timestamp":"2019-05-21T08:03:50+00:00","timestamp_jst":"2019-05-21T17:03:50+09:00"}
こんな感じに生ログが一行に一つだけになりました。
この形式であればAthenaで読み込むことができます。
なお、Kinesis Data Firehoseのオプションでgzip圧縮して保存できるので、この段階では無圧縮で渡しています。
Kinesis Data Firehoseでのデータ保存
Athenaではパーティションという機能があるので、これを活用する形で Kinesis Data Firehose でも意識してデータを保存するようにしてみました。
Kinesis Data Firehoseのストリームの設定で、指定したS3のパスの下に /year=年/month=月/day=日/ のディレクトリを掘り、その中にログを出力するようにしています。
※Firehoseの仕様上、UTCで日付が振られます
S3 bucket: hogehoge Prefix: logs/rails/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/ Error Prefix: logs/error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/!{firehose:error-output-type}
この形式にしていると、Athenaでパーティションを作成するときに便利になります。
また、データはオプション設定でgzip圧縮して保存します。
Athenaの設定
Athenaでは、下記のSQLを使ってテーブルを作成しました。
RDBと違ってデータはS3にあってテーブルはいくらでも作り直せるので、いろいろ試行錯誤してみるのもいいかもしれません。
CREATE EXTERNAL TABLE IF NOT EXISTS hogehoge.rails ( `type` string, `timestamp` string, `timestamp_jst` string, `method` string, `path` string, `controller` string, `action` string, `status` string, `duration` float, `view` float, `transaction_id` string, `request_id` string, `session_id` string, `params` string, `host` string, `remote_ip` string, `ua` string, `sql` string ) PARTITIONED BY( year int, month int, day int ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'ignore.malformed.json' = 'true' ) LOCATION 's3://hogehoge/logs/rails/' TBLPROPERTIES ('has_encrypted_data'='false');
そして、パーティションをS3から読み込ませます。
テーブル名左の点メニューから「Load Partition」を選択します。すると、自動的に年月日ディレクトリを読み込みパーティションを作ります。
あとはAthenaでコマンドを実行してデータを取り出せばいい感じです。
ログの流れ
最終的に、下記のようなログの流れになりました。
CloudWatch Logs ↓ サブスクリプション ↓ ↓ Logは、JSON形式でCloudWatch Logsの情報が付与されて、 ↓ gzip圧縮されBase64で文字列化されて送られるので解凍 ↓ ↓ Kinesis Data Firehose にある Transform 機能(Lambda)を使い、 ↓ CloudWatch Logsの情報をはがして純粋なログのみを1行ずつJSONで受け渡す Kinesis Data Firehose ↓ 年月日ディレクトリにデータを保存 ↓ gzip圧縮オン ↓ S3 ↓ ↓ 定義したテーブルに読み込まれる ↓ Athena
最後に
アクトインディでは、楽しくAWSのソリューションを活用して楽しいサービスを作るエンジニアを募集しています!