Rails 4.0 先睹為快:作業佇列

類別: IT
標籤: rails

Rails 最近增加了一個作業佇列系統,讓我們來看看如何使用。

Run, baby, run!

這個佇列 API 非常簡單,你將物件放到佇列中,而這個物件需要提供一個名為 run 的方法,下面是個簡單例子:

class TestJob
  def run
    puts "I am running!"
  end
end

Rails.queue.push(TestJob.new)
=> "I am running!"

對大多數人來說,這已足夠。佇列是在一個獨立的執行緒中執行,因此你的應用不會因為一些很複雜的作業而導致無響應

Rails 中的基本佇列並不是一個長期的解決方案,其目的僅僅是提供一個通用的 API 可以用來支援更可靠的佇列系統。例如當你需要從 Resque 切換到 Sidekiq,你不需要更改你應用程式碼,你只需關心物件進入佇列以及編組。

你也可以編寫自己的佇列,下面是一個簡單的佇列實現:

class MyQueue
  def push(job)
    job.run
  end
end

要使用你自定義的佇列,只需要在 application.rb 中設定:

config.queue = MyQueue 

上面例子來源於 Rails 的測試套件,它定義了一個非非同步的佇列,當作業被放到佇列中便立即執行。下面讓我們開發一個實際的作業,無需依賴於 Queue 類。

class MyQueue
  def initialize
    @queue = []
  end

  def push(job)
    @queue.push(job)
  end

  def pop
    @queue.pop
  end
end

這個例子我們實現了一個簡單的佇列,接下來你需要告訴 Rails 的 QueueConsumer 來使用這個佇列,可以在 application.rb 的 initializer 塊中設定:

intializer 'start queue consumer' do |app|
  qpp.queue_consumer = config.queue_consumer.start(app.queue)
  at_exit { app.queue.consumer.shutdown }
end

然後我們將新的作業放到佇列中:

Rails.queue.push(TestJob.new) 

啥也沒有,為什麼?檢查 QueueConsumer:

Rails.application.queue_consumer
=> #<Rails::Queueing::ThreadedConsumer @queue=#<MyQueue @queue=[]>, @thread=#<Thread dead>>

然後你會發現執行緒死了,我們可以強行要求佇列處理:

Rails.application.queue_consumer.start
=> "I am running!"

回過頭來看看到底發生了什麼。首先我們找到 ThreadedConsumer#start

def start
  @thread = Thread.new do
    while job = @queue.pop
      begin
        job.run
      rescue Exception => e
        handle_exception e
      end
    end
  end
  self
end

這個執行緒只有在 @queue.pop 返回一個 true 值的時候才會執行,這不太合理,我們需要不斷的將物件推到佇列中,讓我們來看看 Queue#pop 發生了什麼:

# Retrieves data from the queue.  If the queue is empty, the calling thread is
# suspended until data is pushed onto the queue.  If +non_block+ is true, the
# thread isn't suspended, and an exception is raised.
#
def pop(non_block=false)
  while true
    @mutex.synchronize do
      @waiting.delete(Thread.current)
      if @que.empty?
        raise ThreadError, "queue empty" if non_block
        @waiting.push Thread.current
        @resource.wait(@mutex)
      else
        retval = @que.shift
        @resource.signal
        return retval
      end
    end
  end
end

終於有點明白了,Queue#pop 是一個無限的迴圈在等待其需要的內容。而我們簡單的 MyQueue 類在 ThreadConsumer#start 呼叫的時候會返回 nil,因此佇列裡沒物件,執行緒就結束了。甚至當我們往佇列裡放物件時,再次 pop 操作後也會結束。

簡單起見,只需要讓 MyQueue 繼承 Queue 即可:

class MyQueue < Queue
end

現在我們可以:

Rails.queue.push(TestJob.new)
=> "I am running!"

Rails 4.0 中的佇列系統是一個非常簡單的解決方案,我們期待正式版的釋出,能夠支援更多更好的後臺作業處理庫。

需要注意的是,目前的佇列還是 beta 版本,API 可能還有有所更改。

Rails 4.0 先睹為快:作業佇列原文請看這裡

推薦文章