Fluentdの作者の古橋さんが作って、Treasure Dataで使われているというメッセージキュー、 perfectqueue。 動かしてみようと思ったら全然ネットに資料がなかったので頑張って動かしてみました。 大きな特徴としてはMySQL上でキューイングシステムを構築するところでしょうか。
インストール
$ git clone https://github.com/treasure-data/perfectqueue.git $ cd perfectqueue $ bundle install
設定
$ mysql -h localhost -u root -e 'create database perfectqueue;' $ mkdir config $ cat config/perfectqueue.yml #以下のような内容でファイル作成 development: type: rdb_compat url: mysql2://root:@localhost:3306/perfectqueue table: queues $ bundle exec perfectqueue init # テーブルを初期構築
実装
$ cat app/workers/dispatch/test_handler.rb class TestHandler < PerfectQueue::Application::Base # implement run method def run # do something ... puts "#### acquired task: #{task.inspect}" # call task.finish!, task.retry! or task.release! task.finish! end end
$ cat app/workers/dispatch/dispatch.rb $:.unshift File.dirname(__FILE__) require 'test_handler.rb' class Dispatch < PerfectQueue::Application::Dispatch # describe routing route "type1" => TestHandler route /^regexp-.*$/ => :TestHandler # String or Regexp => Class or Symbol end
キューにメッセージを詰めてみる
$ bundle exec perfectqueue submit k1 type1 '{"uid":1}' -u user_1 $ bundle exec perfectqueue list key type user status created_at timeout data k1 user_task user_1 waiting 2016-03-10 22:15:12 +0900 2016-03-10 22:15:12 +0900 {"uid"=>1}
perfectqueueを起動させる
$ bundle exec perfectqueue run -I. -rapp/workers/dispatch/dispatch.rb Dispatch I, [2016-03-10T22:21:11.842687 #3933] INFO -- : PerfectQueue 0.8.45 I, [2016-03-10T22:21:11.852560 #3933] INFO -- : Worker process started. pid=3934 I, [2016-03-10T22:21:11.972423 #3934] INFO -- : acquired task task=k1 id=1 #### acquired task: #<PerfectQueue::AcquiredTask @key="k1" @attributes={:status=>:running, :created_at=>1457616062, :data=>{"uid"=>1}, :type=>"type1", :user=>"user_1", :timeout=>1457616062, :max_running=>nil, :message=>nil, :node=>nil}> I, [2016-03-10T22:21:11.973519 #3934] INFO -- : finished task=k1 I, [2016-03-10T22:21:11.974812 #3934] INFO -- : completed processing task=k2 id=1: $ bundle exec perfectqueue list key type user status created_at timeout data k1 type1 finished 1984-07-02 20:39:31 +0900 {"uid"=>1}
起動中にキューを追加
$ bundle exec perfectqueue submit k5 type1 '{"uid":5}' -u user_5
I, [2016-03-10T22:26:43.535768 #4234] INFO -- : acquired task task=k5 id=1 #### acquired task: #<PerfectQueue::AcquiredTask @key="k5" @attributes={:status=>:running, :created_at=>1457616402, :data=>{"uid"=>5}, :type=>"type1", :user=>"user_5", :timeout=>1457616402, :max_running=>nil, :message=>nil, :node=>nil}> I, [2016-03-10T22:26:43.536079 #4234] INFO -- : finished task=k5 I, [2016-03-10T22:26:43.544347 #4234] INFO -- : completed processing task=k5 id=1:
ちゃんと処理されてますね。