Fluentdの作者の古橋さんが作って、Treasure Dataで使われているというメッセージキュー、 perfectqueue。
動かしてみようと思ったら全然ネットに資料がなかったので頑張って動かしてみました。
大きな特徴としてはMySQL上でキューイングシステムを構築するところでしょうか。
インストール
$ git clone https://github.com/treasure-data/perfectqueue.git
$ cd perfectqueue
$ bundle install
設定
$ mysql -h localhost -u root -e 'create database perfectqueue;'
$ mkdir config
$ cat config/perfectqueue.yml #以下のような内容でファイル作成
development:
type: rdb_compat
url: mysql2://root:@localhost:3306/perfectqueue
table: queues
$ bundle exec perfectqueue init # テーブルを初期構築
実装
$ cat app/workers/dispatch/test_handler.rb
class TestHandler < PerfectQueue::Application::Base
def run
puts "#### acquired task: #{task.inspect}"
task.finish!
end
end
$ cat app/workers/dispatch/dispatch.rb
$:.unshift File.dirname(__FILE__)
require 'test_handler.rb'
class Dispatch < PerfectQueue::Application::Dispatch
route "type1" => TestHandler
route /^regexp-.*$/ => :TestHandler
end
キューにメッセージを詰めてみる
$ bundle exec perfectqueue submit k1 type1 '{"uid":1}' -u user_1
$ bundle exec perfectqueue list
key type user status created_at timeout data
k1 user_task user_1 waiting 2016-03-10 22:15:12 +0900 2016-03-10 22:15:12 +0900 {"uid"=>1}
perfectqueueを起動させる
$ bundle exec perfectqueue run -I. -rapp/workers/dispatch/dispatch.rb Dispatch
I, [2016-03-10T22:21:11.842687 #3933] INFO -- : PerfectQueue 0.8.45
I, [2016-03-10T22:21:11.852560 #3933] INFO -- : Worker process started. pid=3934
I, [2016-03-10T22:21:11.972423 #3934] INFO -- : acquired task task=k1 id=1
#### acquired task: #<PerfectQueue::AcquiredTask @key="k1" @attributes={:status=>:running, :created_at=>1457616062, :data=>{"uid"=>1}, :type=>"type1", :user=>"user_1", :timeout=>1457616062, :max_running=>nil, :message=>nil, :node=>nil}>
I, [2016-03-10T22:21:11.973519 #3934] INFO -- : finished task=k1
I, [2016-03-10T22:21:11.974812 #3934] INFO -- : completed processing task=k2 id=1:
$ bundle exec perfectqueue list
key type user status created_at timeout data
k1 type1 finished 1984-07-02 20:39:31 +0900 {"uid"=>1}
起動中にキューを追加
$ bundle exec perfectqueue submit k5 type1 '{"uid":5}' -u user_5
I, [2016-03-10T22:26:43.535768 #4234] INFO -- : acquired task task=k5 id=1
#### acquired task: #<PerfectQueue::AcquiredTask @key="k5" @attributes={:status=>:running, :created_at=>1457616402, :data=>{"uid"=>5}, :type=>"type1", :user=>"user_5", :timeout=>1457616402, :max_running=>nil, :message=>nil, :node=>nil}>
I, [2016-03-10T22:26:43.536079 #4234] INFO -- : finished task=k5
I, [2016-03-10T22:26:43.544347 #4234] INFO -- : completed processing task=k5 id=1:
ちゃんと処理されてますね。