読者です 読者をやめる 読者になる 読者になる

kestrelのマニュアル和訳

Scala Kestrel

無いとばかり思ってたkestrelのそれなりにちゃんとしたマニュアルがありました。

kestrel/guide.md at master · twitter/kestrel · GitHub

とりあえずまじめに和訳します。

A working guide to kestrel

kestrelはJVM上で動作し、クライアントとmemcacheプロトコルで通信する、シンプルなメッセージキューサービスです。

1つのkestrelサーバーは、名前がついた複数のキューを持ちます。その名前は、それぞれのキューのジャーナルファイルのファイル名にもなります。(通常は/var/spool/kestrelに保存されます)
それぞれのキューは完全に順序付けされたFIFO構造で、キュー項目のバイナリデータを持ちます。これはJSONRubyのMarshalフォーマットようなデータになります。

キューの名前には半角英数字、ハイフン、アンダーラインが使用できます。実際は、"/", "~", "+", "."以外であれば制約はありません。それぞれ、"/"はファイル名に使用できないため、"~"は一時ファイルの名前として使用されるため、"+"はfanout-queuesの名前として利用されるため、"."は将来的な拡張に予約されているため利用できません。
キューの名前は大文字小文字を区別します。ただし、Mac OS XWindowsで実行している場合、ジャーナルファイル名の大文字小文字が区別されないため、キューの名前を大文字小文字だけで区別するのはやめたほうが良いです。

複数のkestrelサーバーを、memcacheクラスタのように利用することが出来ます。
各サーバーは互いに依存せず、互いに通信を行うことはありません。そのため、自由にサーバーを追加することが出来ます。クライアントはクラスタ内のサーバーのリストを持っていて、何か操作をする際はどれか1台をランダムに選んで使用します。つまり、複数のサーバーをまたいで、ゆるい順序付けの上でキューが動作することになります。

kestrelが起動すると、ジャーナルファイルの保存ディレクトリをスキャンし、見つかったすべてのジャーナルファイルを読み込み、前回のシャットダウン時(またはkillされたりダウンした場合も)の状態を復元します。
新しいキューはそれを参照したときに作成されます。(たとえば、そのキューに項目を追加しようとしたり、何か削除しようとしたとき)
キュー自体を削除するときは、DELETEコマンドを使用します。

設定

各キューの設定のデフォルトは、production.confファイルで指定できます。各キュー個別の設定は、デフォルトをオーバーライドして変更することも出来ます。production.confの中に例があるので参照してください。

キューごとの現在の設定を確認したい場合は、"dump_config"コマンドを実行してください。

実行中のサーバーで設定をリロードしたい場合は、"reload"コマンドを実行してください。変更された内容はすぐに反映され、"dump_config"コマンドで確認できます。

  • max_items (無限)

キューが保持できる項目数の最大値。キューがいっぱいだった場合の処理は discard_old_when_full の設定に応じて異なります。

  • max_size (無限)

キューが保持できる項目の合計最大バイト数。キューがいっぱいだった場合の処理は discard_old_when_full の設定に応じて異なります。

  • max_item_size (無限)

キューに入る各項目の1件あたりの最大バイト数。この設定値を超えるサイズの項目を追加しようとした場合は拒否されます。

  • discard_old_when_full (false)

この設定がfalseの場合、キューがいっぱいの状態で項目を追加しようとするとエラーとなります。trueの場合、古い項目を破棄して新しい項目を受け入れます。この設定はmax_itemsとmax_sizeのどちらも未設定の場合は無効になります。

  • journal (true)

falseに設定するとジャーナルファイルを保持しません。kestrelが終了した時点で、残っていたキューは削除されてしまいます。

  • sync_journal (false)

trueに設定すると、ジャーナルに書き込むたびにディスクと同期します。この処理は通常不要ですが、パラノイド検査のために用意されています。この設定によりサーバーの最大スループットは劣化する可能性があります。

  • max_journal_size (16MB)

ジャーナルファイルがこのサイズを超えた場合、キューが空になった時点で新しいファイルを作成します。値はバイト数で指定。

  • max_journal_overflow (10)

ジャーナルファイルがここで設定された回数大きくなって、キューの内容サイズが最大値未満の場合、ディスク容量を節約するために、ジャーナルファイルはゼロから書きなおされます。たとえばデフォルトの設定でmax_journal_sizeが16MBで、max_journal_overflowが10だった場合、ジャーナルファイルは160MB(キューの内容自体は16MB以下)になると書き直されます。

  • max_memory_size (128MB)

キューの内容サイズがこの設定を超えた場合、この設定サイズ分のみがメモリに保持されます。新たに追加された項目は直接ジャーナルファイルに書き込まれ、キューが消費される際に読み戻されます。この設定によりメモリ消費量を調整します。値はバイト数で指定。

  • max_age (0 = off)

このキューの項目の有効期限をミリ病で指定します。この設定値を超えてキューに存在する項目はすべて破棄されます。クライアントは項目を追加するときに有効期限を指定できます。しかし、この設定値を超える値を指定した場合は、この設定値が優先されます。

  • move_expired_to (none)

有効期限が切れた項目を移動する先のキュー名。これが設定されていると、有効期限が切れた項目はここで設定された名前のキューにSETコマンドを通じて追加されます。これにより、簡単な遅延処理キューなど、有効期限が切れた項目に対して特別な処理をする場合に利用できます。

ジャーナルファイル

ジャーナルファイルはキュー内容のディスクストレージです。また、キューに対する追加や削除処理のシーケンシャルな記録にもなります。kestrelが起動した際、各キューのジャーナルがクライアントからのクエリと同様に読み込まれ、メモリ上のキューとして再構築されます。

ジャーナルファイルは以下の2つの状態間を遷移します。

  1. キューが空で、ジャーナルがmax_journal_sizeより大きい状態
  2. キューがmax_journal_sizeより小さいサイズだが、ジャーナルがmax_journal_overflow * max_journal_sizeより大きい状態

たとえば、max_journal_sizeが16MB(デフォルト)で、max_journal_overflowが10(デフォルト)で、キューが空でジャーナルが16MB以上だった場合、ジャーナルは新しいファイルにローテートされます。キューが16MB以だが、ジャーナルが160MB以上だった場合、ジャーナルは現在のキュー内容を元に再作成されます。

ジャーナルを無効にして、キューをメモリだけに保持するようにも出来ます。(journal = false)その場合、サーバーが再起動した時点でキューに残っていた項目は失われます。また、パフォーマンス的にはコストがかかりますが、書き込み処理をする際に毎回ディスクと同期することも出来ます。(sync_journal = true)

キューがmax_memory_size(デフォルト128MB)を超えた場合、先頭の128MBのみがメモリに保持されます。ジャーナルはメモリに保持されている以降の項目を保持します。また、項目が削除された際にメモリ内に保持する項目を128MB分まで読み込む際にもジャーナルが使用されます。これは"read-behind"モードと言われるものですが、Twitterのエンジニアはこの実装を考えるときの図の形から"square snake"と呼ぶことがあります。キューがread-behindモードのとき、キュー項目の削除処理には2回のディスク処理が発生することがあります。1つは削除処理を記録するために、もう1つはメモリに128MB分のキューを読み取るためです。これはメモリがいっぱいになってしまうことや、JVMのクラッシュ時の影響を避けるためのトレードオフです。

有効期限

クライアントから見れば、有効期限はmemcacheと同じように扱われます。100万未満の数字の場合は現在からの秒数として解釈され、それ以上の値の場合は1970/01/01からの経過時間であるUnixタイムとして解釈されます。

有効期限の指定はミリ秒単位での有効期限の絶対値として変換されます。もしその値がキューのmax_ageよりも未来の数字であれば、max_ageが実際の有効期限になります。0を指定した場合(0がデフォルトです)は、有効期限は無いものとして扱われます。

有効期限が切れた項目は、次に新たな項目が追加されたり削除されたりするタイミングで破棄されます。アイドル状態のキューの項目は有効期限が切れても破棄されませんが、"peek"コマンドを実行することで破棄することが出来ます。

expiration_timer_frequency_secondsという設定で、定期的に有効期限が切れた項目を破棄するバックグラウンド処理を動かすことが出来ます。詳細はREADME.mdを参照してください。*1

ファンアウトキュー

<parent>+<child>の形式で、キューの名前に"+"が含まれる場合("orders+audit"のように)、ファンアウトキューとして扱われます。これらのキュー(先ほどの例だと、"orders"というキュー)は親キューとなります。親キューに書き込まれたすべての項目は、その子キューにも書き込まれます。

ファンアウトキューはそれぞれのジャーナルファイルを持ちます(親キューがジャーナルファイルを持っていれば)。その他の特徴も通常のキューと同様です。子キューに対してpeekやaddコマンドを直接実行することも可能です。設定については、子キューは親キューと同じ設定を持ち、独自の設定を持つことは出来ません。

クライアントから最初にファンアウトキューが参照されたとき、ジャーナルファイルが作成されます。(設定に応じて)
そして、親キューに書き込まれる項目を受け取るようになります。既にある項目についてはコピーされません。ファンアウトキューが削除されたとき、新しい項目を受け取る処理が停止します。

Memcacheコマンド
  • SET <# bytes>

項目をキューに追加します。キューの数やサイズに制限があり、いっぱいになっている場合は失敗します。

  • GET [options]

キューから項目を削除して返します。もしキューが空の場合は、すぐに空のレスポンスを返します。queue-nameが"/"で区切られている場合は以下の通りです。

    • /t=

指定された時間、新しい項目が追加されるまで待ちます。時間内に新しい項目が追加された場合、通常通りにレスポンスが帰ります。タイムアウト時間が経過した場合は空のレスポンスを返します。

    • /open

キューから項目を削除して返します。ただし、実際は削除せずに一時的に退避します。クライアントが"/close"リクエストを送る前に切断された場合、退避した項目はキューの先頭に戻されます。(Reliable readsの項を参照)

    • /close

開いた項目を閉じます。(Reliable readsの項を参照)

    • /abort

開いた項目をキャンセルし、キューの先頭に戻します。この項目は次のフェッチ時に取得されます。(Reliable readsの項を参照)

    • /peek

キューの先頭の項目を削除せずに返します。Reliable readsのコマンドと併用できます。

たとえば、新たな項目を500ミリ秒待って開く場合は

GET work/t=500/open

となります。もしくは、すでに開いている項目を閉じてから開く場合は、伊かのようになります。

GET work/close/open
  • DELETE

キューを削除し、すべての項目を破棄し、関連するジャーナルファイルをすべて削除します。

  • FLUSH

指定されたキューに含まれる項目をすべて破棄します。キュー自体は引き続き有効で、新たな項目を追加することも出来ます。FLUSHにかかる時間はキューのサイズに依存します。また処理中はキューに対するその他の操作がブロックされます。

  • FLUSH_ALL

すべてのキューのすべての項目を破棄します。各キューにFLUSHコマンドを実行した場合と同様に、全部のキューをFLUSHします。

  • VERSION

memcacheと同様の方法でkestrelのバージョンを表示します。

  • SHUTDOWN

サーバーをシャットダウンし、プロセスを終了します。

  • RELOAD

すべてのキューの設定をリロードします。サーバーのレスポンスには一切影響しません。

  • DUMP_CONFIG

現在の各キューの設定を以下のフォーマットでダンプします。

 queue 'master' {
    max_items=2147483647
    max_size=9223372036854775807
    max_age=0
    max_journal_size=16277216
    max_memory_size=134217728
    max_journal_overflow=10
    max_journal_size_absolute=9223372036854775807
    discard_old_when_full=false
    journal=true
    sync_journal=false
  }

最後のキューの後に END という行が入ります。

  • STATS

サーバーの状態をmemcacheと同様の形式で表示します。(状態確認の項を参照)

  • DUMP_STATS

サーバーの状態を読みやすい形式で表示します。

Reliable reads

クライアントから普通にキューから項目を削除した場合、kestrelは即座にその項目を削除してクライアントにすべてをゆだねます。これはいつもうまくいくとは限りません。クライアントがクラッシュしたり、データを受け取る前にネットワーク接続がロストしてしまう場合もありえます。そのため、kestrelはGETコマンドで"/open"と"/close"という2つのオプションを取ることにより、"reliable read"をサポートします。

"/open"が使用されると、キューに項目がある場合は、その項目を開き、通常通りキューの項目を削除してクライアントに送ります。しかし、削除した項目は退避させておくだけです。もしクライアントが項目を開いたまま切断された場合は退避しておいた項目をキューに戻し、次回のフェッチ時に再度取得できる状態にします。1つの項目は1つのクライアントからしか開くことは出来ません。

先ほど開いたリクエストは"/close"コマンドで閉じることが出来ます。サーバーは既に開かれている項目がある場合、ほかの読み取りリクエストを拒否します。利便性のため、開かれている項目が1つも無い場合でも"/close"リクエストは常に許可します。

何らかの理由で接続を切らないまま、開いた項目をキャンセルしたい場合は"/abort"オプションが利用できます。しかしキャンセルした項目はキューの先頭に戻されます。これはクライアント側のエラーに対してはあまり良い処理ではありません。エラーがその項目が原因で起きているならば、次にその項目を処理しようとしても同様のエラーが起きる可能性があるからです。他のクライアントに同じ処理を繰り返させてしまわないようにしてください。

キュー項目を失ってしまうことと、同じキューを繰り返し処理してしまうことはトレードオフとなります。Reliable readはキューを失わないことを優先します。この方法で成功するためには、そのキュー項目が何回処理されても同じ結果になるように準備するべきです。外部への影響があるような処理の場合は、何回呼ばれても実際は1回しか実行しないような工夫が必要です。

たとえば以下のように。

GET dirty_jobs/close/open
(receives job 1)
GET dirty_jobs/close/open
(closes job 1, receives job 2)
...etc..
状態確認

statsコマンドで確認できるのは以下の内容です。

uptime
サーバー起動時間
time
現在日時
version
バージョン
curr_items
全キューに残っている項目の合計数
total_itmes
このサーバーが起動してから追加された項目の合計数
bytes
全キューに残っている項目の合計バイト数
curr_connections
現在のクライアント接続数
total_connections
このサーバーが起動してからのクライアント接続数の合計
cmd_get
GETリクエスト数の合計
cmd_set
SETリクエスト数の合計
cmd_peek
GETまたはPEEKリクエスト数の合計
get_hits
ヒットしたGETリクエスト数(空じゃないキューに対するGET)の合計
get_misses
ミスしたGETリクエスト数(空のキューに対するGET)の合計
bytes_read
クライアントから読み込まれたバイト数の合計
bytes_written
クライアントへ書き込んだバイト数の合計

各キューのstatsとして確認できるのは以下の内容です。

items
このキューに残ってる項目数
bytes
このキューに残ってる項目の合計バイト数
total_items
このサーバーが起動してから追加された項目の合計数
logsize
ジャーナルファイルのバイト数
expired_items
このサーバーが起動してから有効期限が切れた項目の合計数
mem_items
このキューのメモリにある項目数
mem_bytes
このキューのメモリにある項目の合計バイト数(常にmax_memory_sizeの設定値以下になります)
age
最後にフェッチされた項目がSETされてからGETされるまでの時間(ミリ秒), キューが空の場合は常に0
discarded
キューがいっぱいで破棄されたキューの数
waiters
このキューの待ち行列にいるクライアントの数
kestrelをライブラリとして利用する

classpathにjarファイルを追加することで、kestrelをライブラリとして利用することも出来ます。これはプロセス間、スレッド間でMQを実現するための手軽な方法です。各キューはPersistentQueueオブジェクトとして利用できます。

class PersistentQueue(persistencePath: String, val name: String, val config: ConfigMap)

また、以下のように初期化する必要があります。

def setuo(): Unit

ジャーナルファイルのパス(ジャーナルを作成するかどうか)や、キューの名前、設定値を指定することも出来ます。

キューに項目を追加する際は

def add(value: Array[Byte], expiry: Long): Boolean

これはキューがいっぱいの場合はfalseを返します。

キューの各項目は以下のcase classで表現されます。

case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int)

また、キューの先頭項目に対するpeekやremove操作は以下のように実行できます。

def peek(): Option[QItem]
def remove(): Option[QItem]

Reliable readで項目を開く場合は、transactionの値をtrueに設定してremoveし、confirmやunremoveにQItemのxidを渡します。

def remove(transaction: Boolean): Option[QItem]
def unremove(xid: Int)
def confirmRemove(xid: Int)

非同期でactorを利用してremoveやpeekを実行し、receiveかreactでコールバックを受け取るためには以下のようにします。

def removeReact(timeoutAbsolute: Long, transaction: Boolean)(f: Option[QItem] => Unit): Unit
def removeReceive(timeoutAbsolute: Long, transaction: Boolean): Option[QItem]
def peekReact(timeoutAbsolute: Long)(f: Option[QItem] => Unit): Unit
def peekReceive(timeoutAbsolute: Long): Option[QItem]

キューの捜査が終了したら、キューを閉じる必要があります。

def close(): Unit
def isClosed: Boolean

以下に短い例を示します。

var queue = new PersistentQueue("/var/spool/kestrel", "work", config)
queue.setup()

// add an item with no expiration:
queue.add("hello".getBytes, 0)

// start to remove it, then back out:
val item = queue.remove(true)
queue.unremove(item.xid)

// remove an item with a 500msec timeout, and confirm it:
queue.removeReact(System.currentTimeMillis + 500, true) { x =>
  x match {
    case None =>
      println("nothing. :(")
    case Some(item) =>
      println("got: " + new String(item.data))
      queue.confirmRemove(item.xid)
  }
}

queue.close()


以上です。

*1:上記の設定のところに書いてない・・・デフォルトは0 = offで、この処理の起動感覚を秒単位で指定できるようです。