読者です 読者をやめる 読者になる 読者になる

perfectqueueを動かしてみる

Fluentdの作者の古橋さんが作って、Treasure Dataで使われているというメッセージキュー、 perfectqueue。 動かしてみようと思ったら全然ネットに資料がなかったので頑張って動かしてみました。 大きな特徴としてはMySQL上でキューイングシステムを構築するところでしょうか。

インストール

$ git clone https://github.com/treasure-data/perfectqueue.git
$ cd perfectqueue
$ bundle install

設定

$ mysql -h localhost -u root -e 'create database perfectqueue;'

$ mkdir config
$ cat config/perfectqueue.yml #以下のような内容でファイル作成
development:
  type: rdb_compat
  url: mysql2://root:@localhost:3306/perfectqueue
  table: queues

$ bundle exec perfectqueue init # テーブルを初期構築

実装

$ cat app/workers/dispatch/test_handler.rb
class TestHandler < PerfectQueue::Application::Base
  # implement run method
  def run
    # do something ...
    puts "#### acquired task: #{task.inspect}"

    # call task.finish!, task.retry! or task.release!
    task.finish!
  end
end
$ cat app/workers/dispatch/dispatch.rb
$:.unshift File.dirname(__FILE__)
require 'test_handler.rb'
class Dispatch < PerfectQueue::Application::Dispatch
  # describe routing
  route "type1" => TestHandler
  route /^regexp-.*$/ => :TestHandler  # String or Regexp => Class or Symbol
end

キューにメッセージを詰めてみる

$ bundle exec perfectqueue submit k1 type1 '{"uid":1}' -u user_1
$ bundle exec perfectqueue list
                           key            type               user             status                   created_at                      timeout   data
                            k1       user_task             user_1            waiting    2016-03-10 22:15:12 +0900    2016-03-10 22:15:12 +0900   {"uid"=>1}

perfectqueueを起動させる

$ bundle exec perfectqueue run -I. -rapp/workers/dispatch/dispatch.rb Dispatch
I, [2016-03-10T22:21:11.842687 #3933]  INFO -- : PerfectQueue 0.8.45
I, [2016-03-10T22:21:11.852560 #3933]  INFO -- : Worker process started. pid=3934
I, [2016-03-10T22:21:11.972423 #3934]  INFO -- : acquired task task=k1 id=1
#### acquired task: #<PerfectQueue::AcquiredTask @key="k1" @attributes={:status=>:running, :created_at=>1457616062, :data=>{"uid"=>1}, :type=>"type1", :user=>"user_1", :timeout=>1457616062, :max_running=>nil, :message=>nil, :node=>nil}>
I, [2016-03-10T22:21:11.973519 #3934]  INFO -- : finished task=k1
I, [2016-03-10T22:21:11.974812 #3934]  INFO -- : completed processing task=k2 id=1:

$ bundle exec perfectqueue list
                           key            type               user             status                   created_at                      timeout   data
                            k1           type1                              finished                                 1984-07-02 20:39:31 +0900   {"uid"=>1}

起動中にキューを追加

$ bundle exec perfectqueue submit k5 type1 '{"uid":5}' -u user_5
I, [2016-03-10T22:26:43.535768 #4234]  INFO -- : acquired task task=k5 id=1
#### acquired task: #<PerfectQueue::AcquiredTask @key="k5" @attributes={:status=>:running, :created_at=>1457616402, :data=>{"uid"=>5}, :type=>"type1", :user=>"user_5", :timeout=>1457616402, :max_running=>nil, :message=>nil, :node=>nil}>
I, [2016-03-10T22:26:43.536079 #4234]  INFO -- : finished task=k5
I, [2016-03-10T22:26:43.544347 #4234]  INFO -- : completed processing task=k5 id=1:

ちゃんと処理されてますね。