概要 : AMQP のプロトコルを読むと、一瞥して送信はパケットを送るだけ、受信はソケットを読み込むだけのようにも見える。しかしながら、実際に書いてみると、再送処理を自前で実装する必要があるため、現実には大変に複雑な処理が必要だ。
そもそもなぜ RabbitMQ を使うのかという話、あるいはなぜ再送が必要かという話
たんにコンポーネント同士が疎結合で通信したいのであればわざわざ MQ を使う必然性は皆無である。ごくあたりまえに TCP で通信すればそれでいい。暗号通信が必要なら当然 SSL でいいし、パケットエンティティに依存する複雑な L7 リバースプロキシを MQ を使って実現することも、不可能ではないが、普通そういうのは varnish とかでやるだろう。
MQ において優れているのはデータの durability だ。つまり、一旦キューにためておけば、その両側のコンポーネントは好き勝手に落ちたり立ち上がったり、あるいは数を増やしてスケールアウトしていい。複数のコンポーネントが複雑に入り乱れているシステムでは、その一部だけを落としてメンテナンスできるというのは非常に重要な要件だし、通信で間に MQ をかますことで、それを実現できるというのが非常に大きなアドバンテージである。
だから逆に言うと MQ を使うのであればピアのエンドポイントが突如いなくなるというのは、最初から考慮しておかなければ MQ の意味がない。システムが神のように無停止で動くのであればそもそもキューなんぞ不要である。ただのバグでプロセスが落ちるのが仮に排除されたとしても、おかんが掃除機かけたらブレーカーが落ちたとか、落雷で電話線からサージ電流が来てモデムが爆発したとか、光ファイバーだから平気だぜと思ったらセミが産卵したとか、様々な理由でシステムというのはすぐ落ちるものだし、みんな知ってるとおり 100% の SLA というものは存在しないのだ。
この節のまとめ : おまえらは本当に MQ が必要なのか? もしどうしても必要なのであれば、一切の希望を捨てよ。
再送処理の実際 (#1) 受信編
MQ から受信する側のプログラムは(比較的)そんなに難しくない。なぜかというと MQ からデータが来ていて自分のプログラムも動いているということは要するに、両側が生きていることは自明だからだ。
しかしながらそれが自明なのは受信側だけで、送信側である MQ からしたら、受信プログラムがだんまりを決め込んでしまうと、生きているか死んでるか分からんということになってしまう。そこで、プログラムは MQ からデータを受信した後に、適切に basic.ack パケットを返してあげる必要がある。逆に言うと、ack しなかったパケットはやがて MQ 側から再送されてきてしまう。
#! ruby
require 'amqp'
EM.run do
AMQP.connect $your_settings_here do |conn|
AMQP::Channel.new conn do |ch|
ch.queue $your_another_settings do |q|
q.suscribe $your_yet_another_settings do |hdr, bdy|
hdr.ack # <- don't miss it
end
end
end
end
end
Ack 戦略を考える
ここでいくつかの戦略が考えられる。つまりどこまで処理したら ack することにするかという話だ(図)。

受け取った直後に ack するというのは、 ack したあとに処理の途中で例外で死ぬこととかをかんがえると筋が悪いかのようにも見える。しかしながら、処理の最後に ack するというのだと、処理自体が無限ループしちゃったら永遠に ack できないとかありうる。一長一短と言える。どちらにせよ正常時には問題ない。が、今回考えているのはプログラムが突如死んだりする場合なので、そこを勘違いしないように。
別の考え方としては nak (not ack, 「受け取れませんでした」)を積極的に応答するというのもありうる(図)。nak されたパケットは、当然、MQ から後で再送されてくる(されてこないようにも指定できるが)。最初の受信時に処理をトリガしておいてから nak を返信して、再送時すでにトリガされた処理があれば、それが終了しているかで ack/nak を出し分けるということだ。

これは賢そうに思えるのだけれど、実際どうかというと、処理するワーカーが一個だけしか動いてなければまあ、そこそこ程度にはうまくいく。しかしながら MQ 使っててワーカーが一個というのはかなり考え辛くて、普通は横方向にたくさんスケールアウトしているだろう。そうなってくると、再送パケットがどのワーカーに届くかはロシアンルーレットだ。つまり処理をしているワーカーではないワーカーに再送されるというのが典型的なシナリオである。
この際、当然、二重に処理をしてしまってはいけないので、ちゃんとロックをとる必要がある。当然、プロセス間にまたがったロックだから、いわゆる分散 Mutex という話になるだろう。ところで、今、正常に動く場合の話はしていない。つまりプロセスが Mutex を握ったまま死んだらどうなるのか? このシナリオは、別途考える必要があるといえるし、そう一筋縄で解決できる話ではない。
この節のまとめ : 再送されてくるパケットを適切に無視するために分散 Mutex に逃げるというのは複雑さのレイヤーが MQ から Mutex に移動しただけの話で、本質的な解決になってない。
再送処理の実際 (#2) 送信編
さて、送信側では先程「 MQ からしたら、受信プログラムがだんまりを決め込んでしまうと、生きているか死んでるか分からんということになってしまう。」と書いたことの逆が発生する。つまり、 MQ が生きているかどうかを判定して、死んでそうなら手元にためておき、再起動してきたら送り直すというコードを自分で書かなくてはいけない。 Ruby の amqp ライブラリの場合で言えば、再接続まではライブラリの責任範囲だが、データの送受信は当然、クライアントプログラムの領分だ。とりうる戦略は二つある。
トランザクションを使う
MQのプロトコルにはトランザクションを宣言できる機能がある。これの流れ(図)は、まずクライアント側がトランザクションを宣言、次に通常のデータ通信を行い、最後に tx.commit を発行すると、トランザクションが有効であった間に送信したデータが確実にMQのデータストレージに書き込まれて再起動に耐えれる状態になるまで、 ブロックする。つまり間に MQ の再起動とかはさまってても、 commit-ok が返ってきたら送信できてたということをキュー側で保証してくれる(ちなみに失敗したら例外が発生)。簡単であるし、確実でもある。

#! ruby
require 'amqp'
EM.run do
AMQP.connect $your_settings_here do |conn|
AMQP::Channel.new conn do |ch|
AMQP::Exchane.new ch, $your_another_settings do |xchg|
ch.queue $your_yet_another_settings do |q|
q.bind xchg, nowait: false do
xchg.tx_select # start transactoin
$your_ary_to_send.each do |i|
xchg.publish i
xchg.tx_commit # this blocks
end
end
end
end
end
end
end
ただ問題は明らかなようにブロックするので速さが出ない。下層の RTT にもろに影響されるし、 MQ 側でも様々なロックをとったりディスク書き込みで fdatasync(2) したりするだろうから、ばーっとアクセスがくると今度はサーバのディスク速度が律速しだす。MQ に DB のような行の概念はないので、行単位にロックしたりとかということもない。
この節のまとめ : MQにもトランザクションがある。それはそれで便利ではある。が、万能ではない。
Publisher confirmation を使う
ようするにトランザクションはちょっと overkill なのである。どうせ TCP で通信しているのだからパケットの再送とかは基本は TCP でなんとかしているはずなのだ。そこはわざわざ待ち合わせる必要はない。問題はピアの MQ ブローカーが落ちたときで、それだけ検出できればいいでしょう、というのが、 RabbitMQ の拡張プロトコルである publisher confirmation だ。
Publisher confirmation はようするに MQ 側から「今ここまで読んだわー」っていう情報を送り返してくれる。そんだけである。したがって、流れ(図)としては送信すべきデータは送信後も一旦手元に残しておく。で、完了の通知を待たずにどんどん次のパケットも送ってしまう。のちに、完了通知が来たら、そのパケットはもう届いたから捨ててよくて、完了通知が来なかったら対向は死んでるので、再接続して送りなおす。

これはクライアント側に待ちが発生しないし、正常系(つまり運用時間の 99%)ではほとんど何も負荷がないので原理的には TCP のプロトコル限界までスピードが出せるし、ピアが死んでも救済できる。素晴らしそうに思えるのだけど、問題は正しく実装するのがめどい。つまりデータの再送はライブラリでは手助けしてくれないので、自力で正しく書くしかないのだけど、データの送信は exchange というエントリに向かって投げるのだけれど、どこまで読んだかの応答は channel というエントリから返ってくるという謎仕様(いや作りから考えれば謎ではないがまあ頭悪い感はある)になっているため、それをつきあわせたりしないといけない。
というわけで、たとえば以下のようなコードが必要になる。本気で必要な人以外は「ああ長くて面倒ですね」「ひどいコールバック地獄ですね」ってのが理解できればだいたいOK。興味がある人は、上と比べてみてどこがどう変わっているかを追いかけてみると良い。
#! ruby
require 'amqp'
PendingEvents = Struct.new :tag, :retry, :xchg, :msg, :block do
def publish
$publishing_events ||= []
self.xchg.publish self.msg, $your_yet_yet_another_settings
$tag += 1
self.tag = $tag
$publishing_events.push self
end
end
EM.run do
AMQP.connect $your_settings_here do |conn|
cap = conn.server_capabilities
raise 'RabbitMQ TOO OLD; please upgrade' unless cap and cap["publisher_confirms"]
AMQP::Channel.new conn do |ch|
ch.confirm_select do
$tag = 0
ch.on_ack do |ack|
n = ack.delivery_tag
ok = []
ng = []
if ack.multiple
ok, $publishing_events = $publishing_events.partition {|i| i.tag <= n }
else
ng, $publishing_events = $publishing_events.partition {|i| i.tag < n }
if $publishing_events.empty?
# the packet in quesion is lost?
elsif ev = $publishing_events.shift
ok = [ev]
end
end
ng.each do |e|
EM.next_tick do
# we need to re-send
e.retry += 1
publish e
end
end
ok.each do |e|
EM.next_tick do
# OK, reached
e.block.call if e.block
end
end
end
ch.on_nack do |nack|
n = nack.delivery_tag
ng = []
if ack.multiple
ng, $publishing_events = $publishing_events.partition {|i| i.tag <= n }
else
ng, $publishing_events = $publishing_events.partition {|i| i.tag < n }
if $publishing_events.empty?
# the packet in quesion is lost?
elsif ev = $publishing_events.shift
ng = [ev]
end
end
ng.each do |e|
EM.next_tick do
e.retry += 1
e.publish e
end
end
end
AMQP::Exchane.new ch, $your_another_settings do |xchg|
ch.queue $your_yet_another_settings do |q|
q.bind xchg, nowait: false do
$your_ary_to_send.each do |i|
$publishing_events ||= []
e = ::PendingEvents.new 0, 0, xchg, i, nil
e.publish
end
end
end
end
end
end
end
end
この節のまとめ : publisher confirmation は限界近くまでパフォーマンスを引き出しつつメッセージ到達性を確保するには最適だがライブラリ側からの支援が不足している等の理由により扱いが面倒。
まとめ: 結局どうすればいいのか?
まあ、結論はない。 RabbitMQ を導入することにより得られる利便性と、コンポーネントが増えたことによる複雑さの上昇は一種のトレードオフなので、あたりまえの話ではある。最悪なのがコンポーネントは増やして複雑さを上昇させたのに利便性はさして上がってないパターンで、これは防ぐ必要があるだろう。たとえば RabbitMQ を使わないというのは立派な選択肢の一つだ。 MongoDB にトランザクションがないからといって MongoDB がゴミではないのと同様に、 RabbitMQ を使わなかったからといってあなたのプロダクトはゴミではないだろう。
おまけ: 正しいシャットダウンのやりかた
上では「ピアがふいに死んだらどうするんだ」という話をしていたが、当然、行儀の良いプログラムとしては死ぬときはちゃんと死ぬと宣言してから死ね、というのはある。それをどうやるかというと、まずqueueから取り出すのをやめて、停止したのを確認したら次にchannelを消して、消えたのを確認したら次にconnectionを消して、消えたのを確認してから死ね。つまりこう:
#! ruby
require 'amqp'
def setup_traps conn, ch, q
%w(INT TERM HUP QUIT KILL).each do |signal|
trap(signal) do
q.unsubscribe do
ch.close do
conn.disconnect do
EM.stop
end
end
end
end
end
end