fluent-plugin-pghstoreを書きました

fluentdをPostgreSQLのhstoreに書き出せるようにした、 fluent-plugin-pghstore というpluginを作成しました。

hstoreについては 前記事 を参照してください。

install

gem install fluent-plugin-pghstore

apache_log

例えば、tail pluginを使ってapacheaccess logをhstoreに出すようにする場合、こんなコンフィグを書きます。

<source>
  type tail
  path /var/log/apache/access_log_sym
  tag apache.access
  format apache
</source>

<match apache.*>
  type pghstore
  database test
</match>

そして、こんな感じになります。

       tag       |          time          |      record
 ----------------+------------------------+---------------------------------------
{apache,access} | 2012-04-01 22:55:15+09 | "code"=>"200", "host"=>"XXX.XXX.XXX.XXX", "path"=>"/", "size"=>"2608",
     "user"=>"-", "agent"=>"Mozilla/5.0 (Macintosh; Intel Mac OS X
     10_6_8) AppleWebKit/535.11 (KHTML, like Gecko)
     Chrome/17.0.963.83 Safari/535.11", "method"=>"GET", "referer"=>"-"

tagは"."でsplitして、配列として格納します。はい、PostgreSQLは配列を扱えますので。
hstore型のrecordカラムには、ちょっと見にくいですが、各種keyが値とともに入っています。

http

さらに、こんなコンフィグを追加して、

<source>
  type http
  port 9880
</source>

curlで叩いてみると、

curl -F 'json={"log":"hoge"}' "http://localhost:9880/apache.curl"
      tag       |          time          |      record
----------------+------------------------+---------------------------------------
{apache,access} | 2012-04-01 22:55:15+09 | "code"=>"200", "host"=>"XXX.XXX.XXX.XXX",
{apache,curl}   | 2012-04-01 23:28:44+09 | "log"=>"hoge"

同じテーブルに追加されましたね。 hstoreではkeyは動的に追加できますので、どんな形式のinput pluginでも大丈夫です。

例えば

一度PostgreSQLに入ってしまえばあとはSQLでいろいろなことができます。

UserAgentの数:

SELECT
  COUNT(*) AS c,
  record->'agent'
FROM apache_log
GROUP BY record->'agent'
ORDER BY c;

過去10分間のアクセス数:

SELECT count(*) FROM apache_log WHERE time > (CURRENT_TIMESTAMP - interval '10 min')

過去10分間のstatus codeの数:

SELECT
  count(CASE WHEN record->'code' = '200' THEN 1 ELSE NULL END) AS OK_200,
  count(CASE WHEN record->'code' = '301' THEN 1 ELSE NULL END) AS MOVED_301,
  count(CASE WHEN record->'code' = '302' THEN 1 ELSE NULL END) AS FOUND_302,
  count(CASE WHEN record->'code' = '304' THEN 1 ELSE NULL END) AS NOTMODIFIED_304,
  count(CASE WHEN record->'code' = '401' THEN 1 ELSE NULL END) AS UNAUTHORIZED_401
FROM apache_log
WHERE time > (CURRENT_TIMESTAMP - interval '10 min')

制限

ただし、こんな感じの多段のJSONを入れようとすると、だめです。

'json={"log":"hoge", "nest":{"a":"hoge", "b":"hige"}}'

hstoreが対応していないので難しいです。これは9.2のJSON型を待つのがいいのなあぁと思っているところです。

また、コネクションを一つだけしか使っていないので、おそらく高負荷環境では取りこぼしなどが発生する可能性があります。コネクションプールなどを使えばいいと思うので、patch大歓迎です!

こういう感じで

mongodbなどもいいですが、PostgreSQLもいいですよ。