php使用stomp操作ActiveMQ

最近終於可以抽了點時間試一下php和ActiveMQ這樣的組合。遇到了些狀況,在大家解決這些狀況的過程中,挖到更多的相關資訊。所以,做個記錄…

這次的測試,覺得ActiveMQ和JAVA比較親,其他的語言大部分須要靠STOMP(全名為Simple (or Streaming) Text Orientated Messaging Protocol。)然而,有些功能在STOMP上並沒有實做、或者處理方式不太相同。

既然是php和ActiveMQ的組合,就必須先瞭解一下STOMP。寫這記錄時,一般使用的STOMP都是STOMP 1.0。以php所提供的extension stomp pecl來說,它文件中雖然沒提到所採用的版本,但以它目前最新的版本-Release 1.0.3來說,是在2010所出。當時還沒出STOMP Protocol Specification, Version 1.1(官網提到1.1 Released on 2011/3/31)。推算時間,應該是採用STOMP 1.0

寫這記錄時,官網文件 - stomp Implementations提到,目前支援STOMP 1.1的僅Apache Apollo。文件中是對Apache Apollo的描述如下…感覺上對STOMP的支援程度似乎比較好?

Apache Apollo a redesigned version of ActiveMQ focused on STOMP messaging.

Apache Apollo官網上,是如此描述自身產品
ActiveMQ Apollo is a faster, more reliable, easier to maintain messaging broker built from the foundations of the original ActiveMQ. It accomplishes this using a radically different threading and message dispatching architecture.
至今,居然只有支援STOMP。之後才會加上其他的protocol。感覺…Apache Apollo似乎沒有獨厚JAVA…不過,沒實際測過,就不下結論…

php如何使用stomp操作ActiveMQ?其實很簡單,大概就是下面的程式碼(範例為Procedural style,另有Object oriented style)。不過,使用ActiveMQ的重點在於…要明確知道自己要如何使用ActiveMQ,再決定程式的運作邏輯以及相關的參數設定。
  1. //send a message to the queue  
  2. function sendMQ($data) {  
  3.     $link = openMQ();  
  4.     foreach ($data as $pitem) {  
  5.         //使用 persistent message  
  6.         $result = stomp_send($link$amq['queue'], $pitemarray("persistent" => "true"));  
  7.         if (FALSE === $result) {  
  8.             //do something  
  9.         }  
  10.     }  
  11. }  
  12.   
  13. //receive message  
  14. function receiveMQ() {  
  15.     $link = openMQ($queue);  
  16.     stomp_subscribe($link$queue);  
  17.   
  18.     while (1) {  
  19.         if (TRUE === stomp_has_frame($link)) {  
  20.             $frame = stomp_read_frame($link);  
  21.   
  22.             if (FALSE !== $frame) {  
  23.                 stomp_ack($link$frame['headers']['message-id']);  
  24.             } else {  
  25.                 //do something  
  26.                 break;  
  27.             }  
  28.         } else {  
  29.             break;  
  30.         }  
  31.     }  
  32.     stomp_unsubscribe($link$queue);  
  33.     stomp_close($link);  
  34. }  
  35.   
  36. //connection ActiveMQ  
  37. function openMQ(&$queue=FALSE) {  
  38.     $amq = array(  
  39.         'url' => 'tcp://127.0.0.1:61613',  
  40.         'id' => 'xxx',  
  41.         'pswd' => 'xxx',  
  42.         'queue' => '/queue/mytest',  
  43.         'enable' => TRUE  
  44.     );  
  45.     $link = stomp_connect($amq['url'], $amq['id'], $amq['pswd']);  
  46.     if (!$link) {  
  47.         die("Can't connect MQ !!");  
  48.     } else {  
  49.         return $link;  
  50.     }  
  51. }  
附註:
  1. 上面範例是採用Persistent
  2. 雖然producer設定為persistent,但如果ActiveMQ的設定不對,還是可能會遺失資料
照上面的php code,producer塞15000筆資料到ActiveMQ,每筆資料的內容是strng(12)。測試的VM主機,處理時間大約都在1.5~2.5 sec內。

但是customer在取得資料時,處理速度異常的緩慢…和JAVA所寫的customer處理速度相比,只能說慢到令人無法接受。嘗試使用fork()方式用multi process,同時間用20個process處理。處理速度雖然有增快,卻依舊很慢…

雖然,預期中速度可能會『比較』慢。但慢成這樣的速度根本無法採用。查了一下ActiveMQ文件,終於在ActiveMQ extensions to Stomp這篇文章中找到了解決方案。於SUBSCRIBE設定activemq.prefetchSize。設定方式如下…
  1. stomp_subscribe($link$queuearray("activemq.prefetchSize" => 1000));  
什麼是prefetchSize?文中是這樣說明的…
Specifies the maximum number of pending messages that will be dispatched to the client. Once this maximum is reached no more messages are dispatched until the client acknowledges a message. Set to 1 for very fair distribution of messages across consumers where processing messages can be slow.

這樣的解釋,一時之間不太明瞭真正的含意。由已經深入研究ActiveMQ的同事解說後,才能瞭解(有興趣的人可以參考書籍-ActiveMQ in Action,內有詳細說明。或參考下面slide於P34 PreFetch limit的內容),卻也引發我其他的疑問,特別是-acknowledges(下篇再述)

將prefetchSize設定和JAVA相同的預設值1000後,使用php所寫的customer,處理速度和JAVA相比,『感覺上』已經不分軒輊。其實,prefetchSize由1改為2時,customer的處理速度就改善幾十倍

最後提一下在實務使用上會使用的設定-failover。我們可以指定一台以上的ActiveMQ操作。除此,也可能會需要設定timeout時間。詳細的設定值與說明,可參考Apache ActiveMQ ™ -- Failover Transport Reference

參考資料

  1. ActiveMQ in Action所提到不同consumer的預設值
    • Queue consumer default prefetch size = 1000
    • Queue browser consumer default prefetch size = 500
    • Persistent topic consumer default prefetch size = 100
    • Nonpersistent topic consumer default prefetch size = 32766
  2. stomp 1.1和stomp 1.0的差異
  3. ActiveMQ參考書籍-ActiveMQ in Action
  4. php使用stomp操作ActiveMQ
  5. stomp進階說明-prefetchSize、ack header
  6. stomp failover作法

留言