php使用stomp操作ActiveMQ
最近終於可以抽了點時間試一下php和ActiveMQ這樣的組合。遇到了些狀況,在大家解決這些狀況的過程中,挖到更多的相關資訊。所以,做個記錄…
這次的測試,覺得ActiveMQ和JAVA比較親,其他的語言大部分須要靠STOMP(全名為Simple (or Streaming) Text Orientated Messaging Protocol。)然而,有些功能在STOMP上並沒有實做、或者處理方式不太相同。
既然是php和ActiveMQ的組合,就必須先瞭解一下STOMP。寫這記錄時,一般使用的STOMP都是STOMP 1.0。以php所提供的extension stomp pecl來說,它文件中雖然沒提到所採用的版本,但以它目前最新的版本-Release 1.0.3來說,是在2010所出。當時還沒出STOMP Protocol Specification, Version 1.1(官網提到1.1 Released on 2011/3/31)。推算時間,應該是採用STOMP 1.0
寫這記錄時,官網文件 - stomp Implementations提到,目前支援STOMP 1.1的僅Apache Apollo。文件中是對Apache Apollo的描述如下…感覺上對STOMP的支援程度似乎比較好?
在Apache Apollo官網上,是如此描述自身產品
php如何使用stomp操作ActiveMQ?其實很簡單,大概就是下面的程式碼(範例為Procedural style,另有Object oriented style)。不過,使用ActiveMQ的重點在於…要明確知道自己要如何使用ActiveMQ,再決定程式的運作邏輯以及相關的參數設定。
附註:
但是customer在取得資料時,處理速度異常的緩慢…和JAVA所寫的customer處理速度相比,只能說慢到令人無法接受。嘗試使用fork()方式用multi process,同時間用20個process處理。處理速度雖然有增快,卻依舊很慢…
雖然,預期中速度可能會『比較』慢。但慢成這樣的速度根本無法採用。查了一下ActiveMQ文件,終於在ActiveMQ extensions to Stomp這篇文章中找到了解決方案。於SUBSCRIBE設定activemq.prefetchSize。設定方式如下…
什麼是prefetchSize?文中是這樣說明的…
這樣的解釋,一時之間不太明瞭真正的含意。由已經深入研究ActiveMQ的同事解說後,才能瞭解(有興趣的人可以參考書籍-ActiveMQ in Action,內有詳細說明。或參考下面slide於P34 PreFetch limit的內容),卻也引發我其他的疑問,特別是-acknowledges(下篇再述)
將prefetchSize設定和JAVA相同的預設值1000後,使用php所寫的customer,處理速度和JAVA相比,『感覺上』已經不分軒輊。其實,prefetchSize由1改為2時,customer的處理速度就改善幾十倍
最後提一下在實務使用上會使用的設定-failover。我們可以指定一台以上的ActiveMQ操作。除此,也可能會需要設定timeout時間。詳細的設定值與說明,可參考Apache ActiveMQ ™ -- Failover Transport Reference
這次的測試,覺得ActiveMQ和JAVA比較親,其他的語言大部分須要靠STOMP(全名為Simple (or Streaming) Text Orientated Messaging Protocol。)然而,有些功能在STOMP上並沒有實做、或者處理方式不太相同。
既然是php和ActiveMQ的組合,就必須先瞭解一下STOMP。寫這記錄時,一般使用的STOMP都是STOMP 1.0。以php所提供的extension stomp pecl來說,它文件中雖然沒提到所採用的版本,但以它目前最新的版本-Release 1.0.3來說,是在2010所出。當時還沒出STOMP Protocol Specification, Version 1.1(官網提到1.1 Released on 2011/3/31)。推算時間,應該是採用STOMP 1.0
寫這記錄時,官網文件 - stomp Implementations提到,目前支援STOMP 1.1的僅Apache Apollo。文件中是對Apache Apollo的描述如下…感覺上對STOMP的支援程度似乎比較好?
Apache Apollo a redesigned version of ActiveMQ focused on STOMP messaging.
在Apache Apollo官網上,是如此描述自身產品
ActiveMQ Apollo is a faster, more reliable, easier to maintain messaging broker built from the foundations of the original ActiveMQ. It accomplishes this using a radically different threading and message dispatching architecture.
至今,居然只有支援STOMP。之後才會加上其他的protocol。感覺…Apache Apollo似乎沒有獨厚JAVA…不過,沒實際測過,就不下結論…php如何使用stomp操作ActiveMQ?其實很簡單,大概就是下面的程式碼(範例為Procedural style,另有Object oriented style)。不過,使用ActiveMQ的重點在於…要明確知道自己要如何使用ActiveMQ,再決定程式的運作邏輯以及相關的參數設定。
- //send a message to the queue
- function sendMQ($data) {
- $link = openMQ();
- foreach ($data as $pitem) {
- //使用 persistent message
- $result = stomp_send($link, $amq['queue'], $pitem, array("persistent" => "true"));
- if (FALSE === $result) {
- //do something
- }
- }
- }
- //receive message
- function receiveMQ() {
- $link = openMQ($queue);
- stomp_subscribe($link, $queue);
- while (1) {
- if (TRUE === stomp_has_frame($link)) {
- $frame = stomp_read_frame($link);
- if (FALSE !== $frame) {
- stomp_ack($link, $frame['headers']['message-id']);
- } else {
- //do something
- break;
- }
- } else {
- break;
- }
- }
- stomp_unsubscribe($link, $queue);
- stomp_close($link);
- }
- //connection ActiveMQ
- function openMQ(&$queue=FALSE) {
- $amq = array(
- 'url' => 'tcp://127.0.0.1:61613',
- 'id' => 'xxx',
- 'pswd' => 'xxx',
- 'queue' => '/queue/mytest',
- 'enable' => TRUE
- );
- $link = stomp_connect($amq['url'], $amq['id'], $amq['pswd']);
- if (!$link) {
- die("Can't connect MQ !!");
- } else {
- return $link;
- }
- }
- 上面範例是採用Persistent
- 雖然producer設定為persistent,但如果ActiveMQ的設定不對,還是可能會遺失資料
但是customer在取得資料時,處理速度異常的緩慢…和JAVA所寫的customer處理速度相比,只能說慢到令人無法接受。嘗試使用fork()方式用multi process,同時間用20個process處理。處理速度雖然有增快,卻依舊很慢…
雖然,預期中速度可能會『比較』慢。但慢成這樣的速度根本無法採用。查了一下ActiveMQ文件,終於在ActiveMQ extensions to Stomp這篇文章中找到了解決方案。於SUBSCRIBE設定activemq.prefetchSize。設定方式如下…
- stomp_subscribe($link, $queue, array("activemq.prefetchSize" => 1000));
Specifies the maximum number of pending messages that will be dispatched to the client. Once this maximum is reached no more messages are dispatched until the client acknowledges a message. Set to 1 for very fair distribution of messages across consumers where processing messages can be slow.
這樣的解釋,一時之間不太明瞭真正的含意。由已經深入研究ActiveMQ的同事解說後,才能瞭解(有興趣的人可以參考書籍-ActiveMQ in Action,內有詳細說明。或參考下面slide於P34 PreFetch limit的內容),卻也引發我其他的疑問,特別是-acknowledges(下篇再述)
將prefetchSize設定和JAVA相同的預設值1000後,使用php所寫的customer,處理速度和JAVA相比,『感覺上』已經不分軒輊。其實,prefetchSize由1改為2時,customer的處理速度就改善幾十倍
最後提一下在實務使用上會使用的設定-failover。我們可以指定一台以上的ActiveMQ操作。除此,也可能會需要設定timeout時間。詳細的設定值與說明,可參考Apache ActiveMQ ™ -- Failover Transport Reference
參考資料
- ActiveMQ in Action所提到不同consumer的預設值
- Queue consumer default prefetch size = 1000
- Queue browser consumer default prefetch size = 500
- Persistent topic consumer default prefetch size = 100
- Nonpersistent topic consumer default prefetch size = 32766
- stomp 1.1和stomp 1.0的差異
- ActiveMQ參考書籍-ActiveMQ in Action
- php使用stomp操作ActiveMQ
- stomp進階說明-prefetchSize、ack header
- stomp failover作法
留言