sjsonの使い方

ScalaJSONを扱うためのライブラリ、sjsonについて書きます。

そもそもScalaでは標準ライブラリにJSONのパーサーがついてて、JSON文字列をパースしてcase classに入れてくれるくらいのことはしてくれます。ただし、

  • 返り値の型が Option[List[Any] ] だったり
  • パース結果のJSONObject(case class)をtoStringしてもJSONにならなかったり

なんで標準ライブラリに存在しているのか不明ですが、JSONをバリバリアプリケーション内で使うための機能はそろってないです。(コップ本に載ってるサンプルまんまな感じです)

JSON.scala in scala/tags/R_2_8_0_final/src/library/scala/util/parsing/json – Scala

あとは、Javaのライブラリがいろいろあるのでそれを使ってもいいんですが、当然のことながらインターフェイスJavaっぽいので、ScalaのコレクションオブジェクトやXMLリテラルを渡してウマー、ということができません。
implicit defとかラッパーを書けばいいんですが、そこまでしなくてもScala用のJSONライブラリは他にもありますよ、というわけでsjsonです。

sbtとかmavenリポジトリ設定

sjsonのパッケージはscala-tools.orgのMavenリポジトリに入っているので、sbtの場合はリポジトリの設定が不要です。
以下のようにプロジェクトファイルにsjsonのアーティファクトID等々を書けばOKです。

val json = "net.debasishg" %% "sjson_2.8.0" % "0.8"

Mavenを使う場合も、sjsonのライブラリはScalaランタイムと同じリポジトリにあるので、Scalaランタイムへのdependencyが書けていれば、あとはsjsonへのdependencyを追加するだけでOKです。

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>
  
  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.8.0.final</version>
    </dependency>
    <dependency>
      <groupId>net.debasishg</groupId>
      <artifactId>sjson_2.8.0</artifactId>
      <version>0.8</version>
    </dependency>
  </dependencies> 

オブジェクト=>JSONへのシリアライズ

scala> import sjson.json.JsonSerialization
import sjson.json.JsonSerialization

scala> JsonSerialization.tojson(Map("a" -> "apple", "b" -> "Borland", "c" -> "Citrix"))
res0: dispatch.json.JsValue = {"a" : "apple", "b" : "Borland", "c" : "Citrix"}

こんな感じで、Map, List, Tuple2〜22くらいならデフォルトで対応しています。
対応していない型を入れると、例外を出します。

scala> case class Neko(name : String, color : String, age : Int)
defined class Neko

scala> JsonSerialization.tojson(Neko("tama", "white", 666))
<<console>:34: error: could not find implicit value for parameter tjs: sjson.json.Writes[Neko]
       JsonSerialization.tojson(Neko("tama", "white", 666))
                                     ^

新たな型に対応させるためには、Protocolを書く必要があります。

scala> object NekoProtocol extends DefaultProtocol {
     |   implicit val NekoFormat : Format[Neko] = 
     |     asProduct3("name", "color", "age")(Neko)(Neko.unapply(_).get)
     | }
defined module NekoProtocol

scala> import NekoProtocol._
import NekoProtocol._

scala> JsonSerialization.tojson(Neko("tama", "white", 666))
res0: dispatch.json.JsValue = {"name" : "tama", "color" : "white", "age" : 666}

このへんの詳細については作者のブログエントリが訳されているので、そちらを参照すると良いと思います。

sjson: Scala の型クラスによる JSON シリアライゼーション | eed3si9n

JSON=>オブジェクト

さっきの逆ですが、以下のとおりです。

scala> import sjson.json.JsonSerialization
import sjson.json.JsonSerialization

scala> JsonSerialization.tojson(Neko("tama", "white", 666))
res0: dispatch.json.JsValue = {"name" : "tama", "color" : "white", "age" : 666}

scala> JsonSerialization.fromjson[Neko](res2)
res1: Neko = Neko(tama,white,666)

実装

このsjsonは、いろいろと面白い実装になっていて、任意の型に対してJSONシリアライズするためのプロトコルを定義できたり、そのプロトコルの指定をimplicit parameterでやらせてたりで、コードを読むと勉強になります。

あと sjson.json.Generic.scala を見ると、謎のマクロ的なものがあるんですが、なんだこれどうなってるんだ、という感じです。

  <#list 2..9 as i> 
  <#assign typeParams><#list 1..i as j>T${j}<#if i !=j>,</#if></#list></#assign>

  def asProduct${i}[S, ${typeParams}](<#list 1..i as j>f${j}: String<#if i != j>,</#if></#list>)(apply : (${typeParams}) => S)(unapply : S => Product${i}[${typeParams}])(implicit <#list 1..i as j>bin${j}: Format[T${j}]<#if i != j>,</#if></#list>) = new Format[S]{
    def writes(s: S) = {
      val product = unapply(s)
      JsObject(
        List(
          <#list 1..i as j>
          (tojson(f${j}).asInstanceOf[JsString], tojson(product._${j}))<#if i != j>,</#if>
          </#list>
        ))
    }
    def reads(js: JsValue) = js match {
      case JsObject(m) => // m is the Map
        apply(
          <#list 1..i as j>
          fromjson[T${j}](m(JsString(f${j})))<#if i != j>,</#if>
          </#list>
        )
      case _ => throw new RuntimeException("object expected")
    }
  }  
  </#list>

どうやら、JavaFreeMarkerというテンプレートエンジンをソースコードに適用してプリプロセッサしちゃうらしいです。
FMPP: Text file preprocessor (HTML preprocessor)

こいつを呼び出すコードがsbtのプロジェクトファイルに書かれたりしてます。

  // 〜 project / build / TemplateProject.scala から抜粋 〜
  
  // declares fmpp as a managed dependency.  By declaring it in the private 'fmpp' configuration, it doesn't get published
  val fmppDep = "net.sourceforge.fmpp" % "fmpp" % "0.9.13" % "fmpp"
  val fmppConf = config("fmpp") hide
  def fmppClasspath = configurationClasspath(fmppConf)

  // creates a task that invokes fmpp
  def fmppTask(args: => List[String], output: => Path, srcRoot: => Path, sources: PathFinder) = {
    runTask(Some("fmpp.tools.CommandLine"), fmppClasspath,
      "-U" :: "all" :: "-S" :: srcRoot.absolutePath :: "-O" :: output.absolutePath :: args ::: sources.getPaths.toList)
  }

他のJSONライブラリ

sjson以外にもTwitterのやつとか、liftのやつとか、Akkaのやつがあったりします。

kestrelのfanout-queuesについて

前回のエントリ
kestrelの作者さんによるkestrelの紹介 - na

の続きです。kestrelの作者さんによるfanout-queuesの解説です。Twitterのいわゆるファンアウト処理の根幹となる機能の説明になります。

Fanout queues in kestrel 1.2

前回紹介した記事の11ヵ月後の記事で、kestrel 1.2の話になります。kestrelのバージョンは現在1.2.2なので、そんなに古い内容ではありません。new featureとして紹介しているので、以前はない機能だったはずです。

1.2の新機能として以下のものを紹介しています。

  • 起動時、ポートをlistenする前にキューファイルをロードするようにしたよ。*1
  • "open"オプションによるトランザクションを、"abort"オプションによって明示的にロールバックできるようにしたよ。
  • "peek"オプションでキューの先頭から1件取得だけするようにしたよ。
  • max_item_sizeとsync_journalというオプションを追加したよ。*2
  • DELETEコマンドでキューが削除できるようにした。ジャーナルからも完全に消すから戻せないよ。*3
  • バグ直したよ。
  • いい感じにドキュメントを書いたよ。http://github.com/robey/kestrel/blob/master/docs/guide.md

以下、fanout-queuesについて和訳します。

ファンアウトキュー

ファンアウトキューは階層関係を持った「親」のキューになります。「親」のキューに項目が追加されると、同じ項目が自動的にすべての「子」のキューにも追加されます。子キューはそれぞれのジャーナルファイルを持った、独立したキューです。ある1つの子キューからキュー項目が削除されても、それはほかの子キュー(兄弟たち)からは削除されません。各子キューに別々の項目を追加することだって可能です。ただファンアウトキューのポイントは、親に追加した項目がすべての子に自動的に追加されることです。

子キューは、キューの名前に"+"が含まれていると自動的に作成されます。たとえば"orders+audit"という名前のキューを作れば、それは"orders"という名前のキューの子になります。子キューが作られると、すぐに新しいキュー項目を受け取るようになります。ただし、既に親キューが持っているキュー項目がコピーされることはありません。このファンアウト処理は子キューがDELETEされるまで続きます。

各子キューは親キューと同じ設定を持ちます。子キュー独自の設定はありません。
子キューが増えると、ファンアウト処理によってキュー項目がたくさんコピーされるため、余分にメモリとディスクを消費します。

シンプルな実装になっているので、変なトリックを使っているよりもきっと分かりやすいと思います。


以上です。
せっかくだから実際にやってみましょう。まずは以下のコマンドでkestrelを起動しておきます。ビルドとか起動の方法については以前のエントリ id:hito_asa:20101014 を参照してみてください。

java -jar kestrel-1.2.2.jar

別のターミナルを開いて、telnetでkestrelに接続します。

telnet localhost 22133

まずは以下のように"orders"というキューにa,bという2つの値を入れてみます。
その後にdump_statsコマンドで"orders"キューの状態を確認してみます。

set orders 0 0 1  <= 入力
a  <= 入力
STORED
set orders 0 0 1  <= 入力
b  <= 入力
STORED
dump_stats  <= 入力
queue 'orders' {
  items=2
  bytes=2
  total_items=2
  logsize=44
  expired_items=0
  mem_items=2
  mem_bytes=2
  age=0
  discarded=0
  waiters=0
  open_transactions=0
}
END

続けて以下のように"orders+audit"というキューにpeekコマンドを送ります。peekでもgetでもなんでもいいんですが、一度参照されるとキューが作成されます。
再びdump_statsコマンドで"orders+audit"というキューが作成されたことを確認してみます。

get orders+audit/peek  <= 入力
END
dump_stats  <= 入力
queue 'orders+audit' {
  items=0
  bytes=0
  total_items=0
  logsize=0
  expired_items=0
  mem_items=0
  mem_bytes=0
  age=0
  discarded=0
  waiters=0
  open_transactions=0
}
queue 'orders' {
  items=2
  bytes=2
  total_items=2
  logsize=44
  expired_items=0
  mem_items=2
  mem_bytes=2
  age=0
  discarded=0
  waiters=0
  open_transactions=0
  children=orders+audit
}
END

orders+auditという名前の空のキューが出来たのがわかると思います。
また、ordersのほうの一番下に"children=orders+audit"という値が増えています。これでキューの親子関係が作成されていることも分かります。

では、親の"orders"キューに項目を追加してみます。既に"a"と"b"が入っているので、"c"と"d"を入れてみます。

set orders 0 0 1  <= 入力
c  <= 入力
STORED
set orders 0 0 1  <= 入力
d  <= 入力
STORED

ここで、dump_statsコマンドを実行して確認すると。

dump_stats  <= 入力
queue 'orders+audit' {
  items=2
  bytes=2
  total_items=2
  logsize=44
  expired_items=0
  mem_items=2
  mem_bytes=2
  age=0
  discarded=0
  waiters=0
  open_transactions=0
}
queue 'orders' {
  items=4
  bytes=4
  total_items=4
  logsize=88
  expired_items=0
  mem_items=4
  mem_bytes=4
  age=0
  discarded=0
  waiters=0
  open_transactions=0
  children=orders+audit
}
END

"orders"のitemsは4になり、子キューである"orders+audit"のitemsも2になっています。親キューに入れたものが子キューにも入りました。あとは、適当にGETして内容を確認してみましょう。

get orders  <= 入力
VALUE orders 0 1
a
END
get orders  <= 入力
VALUE orders 0 1
b
END
get orders  <= 入力
VALUE orders 0 1
c
END
get orders  <= 入力
VALUE orders 0 1
d
END
get orders+audit
VALUE orders+audit 0 1
c
END
get orders+audit
VALUE orders+audit 0 1
d
END

以上です。

*1:ジャーナルファイルをロードしてキューを再現する処理を、サービスが起動する前にした、と言う意味だと思います。

*2:設定ファイルの話。

*3:今まで出来なかったのか・・・。

「WebプログラマのためのScala入門勉強会@渋谷」始めます

初心者向けのScala入門勉強会を始めますよー。

どんな勉強会??

内容
対象者
目標
  • Scalaを業務に使えるようになる!
備考
  • PC持ってきた方がいいですよ
  • 無線LANはたぶん用意できますよ

第1回

ATNDはこちら。 http://atnd.org/events/9054

日時
11月2日 19:50 〜 22:00
場所
渋谷 セルリアンタワー 11F 中2会議室
内容
    • ScalaでHello, World
    • Scala開発環境の構築
    • わかる!コップ本 その1

19:30開場、19:50開始、22:00終了目処です。
19:30で地上の正面入口がしまってしまうので、地下の駐車場入り口の奥にあるB1オフィス入り口へおまわりください。

入館時に入館証が必要となるため、19:30〜19:50までに来た方はScala Tシャツを来た受付の人から入館証を受け取ってください。
遅れて来た方は Twitterハッシュタグ #wpscala を付けて助けを求めてください。入館証を持ってお出迎えします。

なお、イスはたくさんあるので、みなさんお座りいただけますよ。

詳細はあんまり決まってないです。
参加者の皆さんで意見を出し合いながら、楽しくてためになる勉強会を作っていきましょう!

kestrelのマニュアル和訳

無いとばかり思ってたkestrelのそれなりにちゃんとしたマニュアルがありました。

kestrel/guide.md at master · twitter/kestrel · GitHub

とりあえずまじめに和訳します。

A working guide to kestrel

kestrelはJVM上で動作し、クライアントとmemcacheプロトコルで通信する、シンプルなメッセージキューサービスです。

1つのkestrelサーバーは、名前がついた複数のキューを持ちます。その名前は、それぞれのキューのジャーナルファイルのファイル名にもなります。(通常は/var/spool/kestrelに保存されます)
それぞれのキューは完全に順序付けされたFIFO構造で、キュー項目のバイナリデータを持ちます。これはJSONRubyのMarshalフォーマットようなデータになります。

キューの名前には半角英数字、ハイフン、アンダーラインが使用できます。実際は、"/", "~", "+", "."以外であれば制約はありません。それぞれ、"/"はファイル名に使用できないため、"~"は一時ファイルの名前として使用されるため、"+"はfanout-queuesの名前として利用されるため、"."は将来的な拡張に予約されているため利用できません。
キューの名前は大文字小文字を区別します。ただし、Mac OS XWindowsで実行している場合、ジャーナルファイル名の大文字小文字が区別されないため、キューの名前を大文字小文字だけで区別するのはやめたほうが良いです。

複数のkestrelサーバーを、memcacheクラスタのように利用することが出来ます。
各サーバーは互いに依存せず、互いに通信を行うことはありません。そのため、自由にサーバーを追加することが出来ます。クライアントはクラスタ内のサーバーのリストを持っていて、何か操作をする際はどれか1台をランダムに選んで使用します。つまり、複数のサーバーをまたいで、ゆるい順序付けの上でキューが動作することになります。

kestrelが起動すると、ジャーナルファイルの保存ディレクトリをスキャンし、見つかったすべてのジャーナルファイルを読み込み、前回のシャットダウン時(またはkillされたりダウンした場合も)の状態を復元します。
新しいキューはそれを参照したときに作成されます。(たとえば、そのキューに項目を追加しようとしたり、何か削除しようとしたとき)
キュー自体を削除するときは、DELETEコマンドを使用します。

設定

各キューの設定のデフォルトは、production.confファイルで指定できます。各キュー個別の設定は、デフォルトをオーバーライドして変更することも出来ます。production.confの中に例があるので参照してください。

キューごとの現在の設定を確認したい場合は、"dump_config"コマンドを実行してください。

実行中のサーバーで設定をリロードしたい場合は、"reload"コマンドを実行してください。変更された内容はすぐに反映され、"dump_config"コマンドで確認できます。

  • max_items (無限)

キューが保持できる項目数の最大値。キューがいっぱいだった場合の処理は discard_old_when_full の設定に応じて異なります。

  • max_size (無限)

キューが保持できる項目の合計最大バイト数。キューがいっぱいだった場合の処理は discard_old_when_full の設定に応じて異なります。

  • max_item_size (無限)

キューに入る各項目の1件あたりの最大バイト数。この設定値を超えるサイズの項目を追加しようとした場合は拒否されます。

  • discard_old_when_full (false)

この設定がfalseの場合、キューがいっぱいの状態で項目を追加しようとするとエラーとなります。trueの場合、古い項目を破棄して新しい項目を受け入れます。この設定はmax_itemsとmax_sizeのどちらも未設定の場合は無効になります。

  • journal (true)

falseに設定するとジャーナルファイルを保持しません。kestrelが終了した時点で、残っていたキューは削除されてしまいます。

  • sync_journal (false)

trueに設定すると、ジャーナルに書き込むたびにディスクと同期します。この処理は通常不要ですが、パラノイド検査のために用意されています。この設定によりサーバーの最大スループットは劣化する可能性があります。

  • max_journal_size (16MB)

ジャーナルファイルがこのサイズを超えた場合、キューが空になった時点で新しいファイルを作成します。値はバイト数で指定。

  • max_journal_overflow (10)

ジャーナルファイルがここで設定された回数大きくなって、キューの内容サイズが最大値未満の場合、ディスク容量を節約するために、ジャーナルファイルはゼロから書きなおされます。たとえばデフォルトの設定でmax_journal_sizeが16MBで、max_journal_overflowが10だった場合、ジャーナルファイルは160MB(キューの内容自体は16MB以下)になると書き直されます。

  • max_memory_size (128MB)

キューの内容サイズがこの設定を超えた場合、この設定サイズ分のみがメモリに保持されます。新たに追加された項目は直接ジャーナルファイルに書き込まれ、キューが消費される際に読み戻されます。この設定によりメモリ消費量を調整します。値はバイト数で指定。

  • max_age (0 = off)

このキューの項目の有効期限をミリ病で指定します。この設定値を超えてキューに存在する項目はすべて破棄されます。クライアントは項目を追加するときに有効期限を指定できます。しかし、この設定値を超える値を指定した場合は、この設定値が優先されます。

  • move_expired_to (none)

有効期限が切れた項目を移動する先のキュー名。これが設定されていると、有効期限が切れた項目はここで設定された名前のキューにSETコマンドを通じて追加されます。これにより、簡単な遅延処理キューなど、有効期限が切れた項目に対して特別な処理をする場合に利用できます。

ジャーナルファイル

ジャーナルファイルはキュー内容のディスクストレージです。また、キューに対する追加や削除処理のシーケンシャルな記録にもなります。kestrelが起動した際、各キューのジャーナルがクライアントからのクエリと同様に読み込まれ、メモリ上のキューとして再構築されます。

ジャーナルファイルは以下の2つの状態間を遷移します。

  1. キューが空で、ジャーナルがmax_journal_sizeより大きい状態
  2. キューがmax_journal_sizeより小さいサイズだが、ジャーナルがmax_journal_overflow * max_journal_sizeより大きい状態

たとえば、max_journal_sizeが16MB(デフォルト)で、max_journal_overflowが10(デフォルト)で、キューが空でジャーナルが16MB以上だった場合、ジャーナルは新しいファイルにローテートされます。キューが16MB以だが、ジャーナルが160MB以上だった場合、ジャーナルは現在のキュー内容を元に再作成されます。

ジャーナルを無効にして、キューをメモリだけに保持するようにも出来ます。(journal = false)その場合、サーバーが再起動した時点でキューに残っていた項目は失われます。また、パフォーマンス的にはコストがかかりますが、書き込み処理をする際に毎回ディスクと同期することも出来ます。(sync_journal = true)

キューがmax_memory_size(デフォルト128MB)を超えた場合、先頭の128MBのみがメモリに保持されます。ジャーナルはメモリに保持されている以降の項目を保持します。また、項目が削除された際にメモリ内に保持する項目を128MB分まで読み込む際にもジャーナルが使用されます。これは"read-behind"モードと言われるものですが、Twitterのエンジニアはこの実装を考えるときの図の形から"square snake"と呼ぶことがあります。キューがread-behindモードのとき、キュー項目の削除処理には2回のディスク処理が発生することがあります。1つは削除処理を記録するために、もう1つはメモリに128MB分のキューを読み取るためです。これはメモリがいっぱいになってしまうことや、JVMのクラッシュ時の影響を避けるためのトレードオフです。

有効期限

クライアントから見れば、有効期限はmemcacheと同じように扱われます。100万未満の数字の場合は現在からの秒数として解釈され、それ以上の値の場合は1970/01/01からの経過時間であるUnixタイムとして解釈されます。

有効期限の指定はミリ秒単位での有効期限の絶対値として変換されます。もしその値がキューのmax_ageよりも未来の数字であれば、max_ageが実際の有効期限になります。0を指定した場合(0がデフォルトです)は、有効期限は無いものとして扱われます。

有効期限が切れた項目は、次に新たな項目が追加されたり削除されたりするタイミングで破棄されます。アイドル状態のキューの項目は有効期限が切れても破棄されませんが、"peek"コマンドを実行することで破棄することが出来ます。

expiration_timer_frequency_secondsという設定で、定期的に有効期限が切れた項目を破棄するバックグラウンド処理を動かすことが出来ます。詳細はREADME.mdを参照してください。*1

ファンアウトキュー

<parent>+<child>の形式で、キューの名前に"+"が含まれる場合("orders+audit"のように)、ファンアウトキューとして扱われます。これらのキュー(先ほどの例だと、"orders"というキュー)は親キューとなります。親キューに書き込まれたすべての項目は、その子キューにも書き込まれます。

ファンアウトキューはそれぞれのジャーナルファイルを持ちます(親キューがジャーナルファイルを持っていれば)。その他の特徴も通常のキューと同様です。子キューに対してpeekやaddコマンドを直接実行することも可能です。設定については、子キューは親キューと同じ設定を持ち、独自の設定を持つことは出来ません。

クライアントから最初にファンアウトキューが参照されたとき、ジャーナルファイルが作成されます。(設定に応じて)
そして、親キューに書き込まれる項目を受け取るようになります。既にある項目についてはコピーされません。ファンアウトキューが削除されたとき、新しい項目を受け取る処理が停止します。

Memcacheコマンド
  • SET <# bytes>

項目をキューに追加します。キューの数やサイズに制限があり、いっぱいになっている場合は失敗します。

  • GET [options]

キューから項目を削除して返します。もしキューが空の場合は、すぐに空のレスポンスを返します。queue-nameが"/"で区切られている場合は以下の通りです。

    • /t=

指定された時間、新しい項目が追加されるまで待ちます。時間内に新しい項目が追加された場合、通常通りにレスポンスが帰ります。タイムアウト時間が経過した場合は空のレスポンスを返します。

    • /open

キューから項目を削除して返します。ただし、実際は削除せずに一時的に退避します。クライアントが"/close"リクエストを送る前に切断された場合、退避した項目はキューの先頭に戻されます。(Reliable readsの項を参照)

    • /close

開いた項目を閉じます。(Reliable readsの項を参照)

    • /abort

開いた項目をキャンセルし、キューの先頭に戻します。この項目は次のフェッチ時に取得されます。(Reliable readsの項を参照)

    • /peek

キューの先頭の項目を削除せずに返します。Reliable readsのコマンドと併用できます。

たとえば、新たな項目を500ミリ秒待って開く場合は

GET work/t=500/open

となります。もしくは、すでに開いている項目を閉じてから開く場合は、伊かのようになります。

GET work/close/open
  • DELETE

キューを削除し、すべての項目を破棄し、関連するジャーナルファイルをすべて削除します。

  • FLUSH

指定されたキューに含まれる項目をすべて破棄します。キュー自体は引き続き有効で、新たな項目を追加することも出来ます。FLUSHにかかる時間はキューのサイズに依存します。また処理中はキューに対するその他の操作がブロックされます。

  • FLUSH_ALL

すべてのキューのすべての項目を破棄します。各キューにFLUSHコマンドを実行した場合と同様に、全部のキューをFLUSHします。

  • VERSION

memcacheと同様の方法でkestrelのバージョンを表示します。

  • SHUTDOWN

サーバーをシャットダウンし、プロセスを終了します。

  • RELOAD

すべてのキューの設定をリロードします。サーバーのレスポンスには一切影響しません。

  • DUMP_CONFIG

現在の各キューの設定を以下のフォーマットでダンプします。

 queue 'master' {
    max_items=2147483647
    max_size=9223372036854775807
    max_age=0
    max_journal_size=16277216
    max_memory_size=134217728
    max_journal_overflow=10
    max_journal_size_absolute=9223372036854775807
    discard_old_when_full=false
    journal=true
    sync_journal=false
  }

最後のキューの後に END という行が入ります。

  • STATS

サーバーの状態をmemcacheと同様の形式で表示します。(状態確認の項を参照)

  • DUMP_STATS

サーバーの状態を読みやすい形式で表示します。

Reliable reads

クライアントから普通にキューから項目を削除した場合、kestrelは即座にその項目を削除してクライアントにすべてをゆだねます。これはいつもうまくいくとは限りません。クライアントがクラッシュしたり、データを受け取る前にネットワーク接続がロストしてしまう場合もありえます。そのため、kestrelはGETコマンドで"/open"と"/close"という2つのオプションを取ることにより、"reliable read"をサポートします。

"/open"が使用されると、キューに項目がある場合は、その項目を開き、通常通りキューの項目を削除してクライアントに送ります。しかし、削除した項目は退避させておくだけです。もしクライアントが項目を開いたまま切断された場合は退避しておいた項目をキューに戻し、次回のフェッチ時に再度取得できる状態にします。1つの項目は1つのクライアントからしか開くことは出来ません。

先ほど開いたリクエストは"/close"コマンドで閉じることが出来ます。サーバーは既に開かれている項目がある場合、ほかの読み取りリクエストを拒否します。利便性のため、開かれている項目が1つも無い場合でも"/close"リクエストは常に許可します。

何らかの理由で接続を切らないまま、開いた項目をキャンセルしたい場合は"/abort"オプションが利用できます。しかしキャンセルした項目はキューの先頭に戻されます。これはクライアント側のエラーに対してはあまり良い処理ではありません。エラーがその項目が原因で起きているならば、次にその項目を処理しようとしても同様のエラーが起きる可能性があるからです。他のクライアントに同じ処理を繰り返させてしまわないようにしてください。

キュー項目を失ってしまうことと、同じキューを繰り返し処理してしまうことはトレードオフとなります。Reliable readはキューを失わないことを優先します。この方法で成功するためには、そのキュー項目が何回処理されても同じ結果になるように準備するべきです。外部への影響があるような処理の場合は、何回呼ばれても実際は1回しか実行しないような工夫が必要です。

たとえば以下のように。

GET dirty_jobs/close/open
(receives job 1)
GET dirty_jobs/close/open
(closes job 1, receives job 2)
...etc..
状態確認

statsコマンドで確認できるのは以下の内容です。

uptime
サーバー起動時間
time
現在日時
version
バージョン
curr_items
全キューに残っている項目の合計数
total_itmes
このサーバーが起動してから追加された項目の合計数
bytes
全キューに残っている項目の合計バイト数
curr_connections
現在のクライアント接続数
total_connections
このサーバーが起動してからのクライアント接続数の合計
cmd_get
GETリクエスト数の合計
cmd_set
SETリクエスト数の合計
cmd_peek
GETまたはPEEKリクエスト数の合計
get_hits
ヒットしたGETリクエスト数(空じゃないキューに対するGET)の合計
get_misses
ミスしたGETリクエスト数(空のキューに対するGET)の合計
bytes_read
クライアントから読み込まれたバイト数の合計
bytes_written
クライアントへ書き込んだバイト数の合計

各キューのstatsとして確認できるのは以下の内容です。

items
このキューに残ってる項目数
bytes
このキューに残ってる項目の合計バイト数
total_items
このサーバーが起動してから追加された項目の合計数
logsize
ジャーナルファイルのバイト数
expired_items
このサーバーが起動してから有効期限が切れた項目の合計数
mem_items
このキューのメモリにある項目数
mem_bytes
このキューのメモリにある項目の合計バイト数(常にmax_memory_sizeの設定値以下になります)
age
最後にフェッチされた項目がSETされてからGETされるまでの時間(ミリ秒), キューが空の場合は常に0
discarded
キューがいっぱいで破棄されたキューの数
waiters
このキューの待ち行列にいるクライアントの数
kestrelをライブラリとして利用する

classpathにjarファイルを追加することで、kestrelをライブラリとして利用することも出来ます。これはプロセス間、スレッド間でMQを実現するための手軽な方法です。各キューはPersistentQueueオブジェクトとして利用できます。

class PersistentQueue(persistencePath: String, val name: String, val config: ConfigMap)

また、以下のように初期化する必要があります。

def setuo(): Unit

ジャーナルファイルのパス(ジャーナルを作成するかどうか)や、キューの名前、設定値を指定することも出来ます。

キューに項目を追加する際は

def add(value: Array[Byte], expiry: Long): Boolean

これはキューがいっぱいの場合はfalseを返します。

キューの各項目は以下のcase classで表現されます。

case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int)

また、キューの先頭項目に対するpeekやremove操作は以下のように実行できます。

def peek(): Option[QItem]
def remove(): Option[QItem]

Reliable readで項目を開く場合は、transactionの値をtrueに設定してremoveし、confirmやunremoveにQItemのxidを渡します。

def remove(transaction: Boolean): Option[QItem]
def unremove(xid: Int)
def confirmRemove(xid: Int)

非同期でactorを利用してremoveやpeekを実行し、receiveかreactでコールバックを受け取るためには以下のようにします。

def removeReact(timeoutAbsolute: Long, transaction: Boolean)(f: Option[QItem] => Unit): Unit
def removeReceive(timeoutAbsolute: Long, transaction: Boolean): Option[QItem]
def peekReact(timeoutAbsolute: Long)(f: Option[QItem] => Unit): Unit
def peekReceive(timeoutAbsolute: Long): Option[QItem]

キューの捜査が終了したら、キューを閉じる必要があります。

def close(): Unit
def isClosed: Boolean

以下に短い例を示します。

var queue = new PersistentQueue("/var/spool/kestrel", "work", config)
queue.setup()

// add an item with no expiration:
queue.add("hello".getBytes, 0)

// start to remove it, then back out:
val item = queue.remove(true)
queue.unremove(item.xid)

// remove an item with a 500msec timeout, and confirm it:
queue.removeReact(System.currentTimeMillis + 500, true) { x =>
  x match {
    case None =>
      println("nothing. :(")
    case Some(item) =>
      println("got: " + new String(item.data))
      queue.confirmRemove(item.xid)
  }
}

queue.close()


以上です。

*1:上記の設定のところに書いてない・・・デフォルトは0 = offで、この処理の起動感覚を秒単位で指定できるようです。

kestrelの作者さんによるkestrelの紹介

古い記事ですが、kestrelの作者さんによるkestrelの紹介。

http://robey.lag.net/2008/11/27/scarling-to-kestrel.html

生い立ち

もともとScalingという名前だったそうです。
というのも、RubyにあるStarlingというMQサーバーをScalaに移植しようとして作り始めたそうで、memcacheプロトコルを使うという仕様もここからきています。
Starlingについてはhttp://www.rubyinside.com/starling-and-rudeq-persistent-ruby-queues-958.html
だけど色々機能を追加したからちゃんと名前つけようか、ということでkestrelになったとのこと。

kestrelで実装した3つの機能について。以下、簡単に和訳します。
(Fanoutについては別のエントリで書いているみたいです。このときは未だ無かったのかも。)

Big Queues

まともに動いてるMQサーバーなら、producerよりもconsumerのほうが多いべきで、キューは空っぽのはず。リクエストが来たら速攻処理するのが当然だよね。キューは全部メモリに置けるもので、キューするのはいざって時のため。

だけど、大統領選挙みたいなイベントでTwitterに負荷がかかると、kestrelを手動で調整しないとメモリが足らなくなることがあるよね。

今のkestrelは、メモリ消費が設定値(デフォルト128MB)の限界を超えると、メモリ内のキューには入れずにジャーナルにだけ書き込むようになってるんだ。(read-behind mode)メモリ内にはキューの変わりにジャーナルファイルへのポインタを持っておいて、その後にキューが消費されると、ジャーナルファイルから読み込んでメモリ上のキューに詰めなおすよ、というわけ。*1

こんな感じで、キューの先頭部分だけメモリに持って、残りをジャーナルファイルとしてディスクに持つことで、メモリ消費を抑え、高いパフォーマンスを実現し、突発的なトラフィックにも備えてるんだ。
ディスクがいついっぱいになるか分からないのが不安かもしれないけど、一時的な高負荷のときは、そんなことより別のこと心配したほうがいいよ。 少なくとも一時的な高負荷には対応できるよ。(このアプローチで全てを解決するわけではないけどね)*2

Blocking fetches

memcacheプロトコルを使うと、キューをフェッチするときにクライアントの処理をブロックする手段がないってよく言われるね。kestrelはキューに項目があれば、すぐにそれを返し、無ければ無いとすぐに応答しちゃうから。さっき書いたように、処理すべきキュー項目よりもconsumerが多いべきで、consumerは常に仕事を求めてフラフラしてるはず。
そうすると、以下のようなクライアントコードを書くことになるね(Ruby

while !(response = QUEUE.get(queue_name))
  sleep 0.25
end


そこで、ちょっとイマイチだけどmemcacheのgetコマンドに"/"区切りでオプションを指定できるようにしてみた。"/"ならファイル名に利用できないのでキューの名前としても使われることはないよね。*3
さらに以下のようなタイムアウトのオプションを追加した。これでクライアントが一定時間処理をブロックできるようになるわけさ。*4

while !(response = QUEUE.get(#{queue_name}/t=250")); end

"t=250"は、「今キューに何も無かったら、250ミリ秒は何か来るまで待ってろ」という意味。タイムアウトするまで何も無かったら、kestrelは空の応答を返す。ここで大事なのは、memcacheクライアントがこのタイムアウト時間より長い時間を設定していないといけない、という点だね。

いろいろ試したけど、これが一番簡単だった。
各キューは待ち行列を持ってて、そこにクライアント(consumer/worker)が連なる仕組みになってる。クライアントがタイムアウト指定付きのgetリクエストを送ってきて、その時点でキューが空だったら、クライアントを待ち行列に追加するんだ。このとき、クライアントのActorがreceiveWithin(timeout)メソッドで新しいメッセージが届くのを待つ。キューに項目が届いたら、クライアントはキューの待ち行列から削除され、メッセージが通知される、という仕組みだ。

1つのproducerにゆっくりキューイングさせて、100とか500クライアントからタイムアウト付きのgetリクエストを送ってみたところ、いい感じに動いたよ。

Reliable Fetch

キューは信頼してもらってOK。set操作が正しく動けば、kestrelは"STORED"を返すんだけど、その時点で既にジャーナルファイルに書き込まれていて、"STORED"を返したからにはkestrelはそのキュー項目に対して責任を持つ。

でもキューからフェッチする処理はそんな簡単じゃないんだ。
kestrelがキュー項目をクライアントに送っても、クライアントはackも返さないし確認メッセージも送ってこない。すべてちゃんと受け取れたと仮定することしか出来ないんだ。クライアントがデータ転送中にダウンしたり、受け取ってからクラッシュしてみろよ、そしたらキュー項目はどっかいっちゃって、二度と手に入らない。

そこで、getリクエストに"open"というオプションを追加したんだ。
もしキュー項目があれば、kestrelは通常通りその項目をキューから削除してクライアントに送るんだけど、もしそこでクライアントが確認メッセージを送らずに接続を切った場合は、kestrelはキュー項目をロールバックしちゃう、という仕組みさ。"open"オプションによるフェッチは以下のようにやってくれ。

1."open"オプションを付けてgetリクエストを送る。

QUEUE.get("#{queue_name}/open")
and confirmed with:

2.続いて"close"オプションを付けてgetリクエストを送る。

QUEUE.get("#{queue_name}/close")

これは何も応答を返さない。余分な通信を避けるために、以下のようにopenとcloseを一度に行うことも出来るぞ

QUEUE.get("#{queue_name}/close/open")

クライアントが"open"オプションつきでフェッチしていれば、もし接続をロストした場合でも、フェッチされたキューはロールバックされ、次のクライアントが処理してくれるよ。

まとめ

ここで紹介した3つの機能によって、

  1. 突発的なトラフィックの増大に対処できるよ(Big Queues)
  2. リクエストに即応できるよ(Blocking Fetches)
  3. 信頼性も提供できるよ(Reliable Fetch)

RubyのStarlingを拡張して、コードサイズを50%増しにしたけど、それだけの価値のある機能を実現できたと思うよ。


以上です。
昔の記事ですが、現在のソースコードと見比べても内容はそんなに変わっていないようです。

*1:read-behind modeについてはソースコード見ると今も書いてありました

*2:英語表現が良くわからない。。。and give you one less thing to worry about when the snake is trying to swallow the pig. => 鹿島さんが教えてくれた!thx!!

*3:キューの名前がジャーナルファイルの名前になります

*4:ドキュメント化しろよ ⇒ ありましたすみません。http://github.com/robey/kestrel/blob/master/docs/guide.md

kestrelのソースコード

kestrelについては前回のエントリ
kestrelを試してみたよ - na
と、githubを参照のこと。

twitter/kestrel · GitHub

ソースコードは↑から見れますが、

/src/main/scala/net/lag/kestrel/
  809  BrokenItemException.scala
 1043  Counter.scala
10126  Journal.scala
 5900  Kestrel.scala
12064  KestrelHandler.scala
18436  PersistentQueue.scala
 1732  QItem.scala
 6765  QueueCollection.scala

memcache/
 1921  ASCIICodec.scala
  406  BinaryCodec.scala
 1666  Codec.scala

tools/
 4475  QDumper.scala
 1616  Util.scala

ファイルサイズこんだけ。

$ find . -name "*.scala" | xargs wc -l
  351 ./KestrelHandler.scala
  159 ./Kestrel.scala
  238 ./QueueCollection.scala
   55 ./QItem.scala
  565 ./PersistentQueue.scala
  350 ./Journal.scala
   32 ./Counter.scala
   22 ./BrokenItemException.scala
   14 ./memcache/BinaryCodec.scala
   57 ./memcache/ASCIICodec.scala
   56 ./memcache/Codec.scala
  150 ./tools/QDumper.scala
   53 ./tools/Util.scala
 2102 合計

行数こんだけ。

というわけで、1個ずつ見ていきましょう。長くなりそうなので忙しい人は【忙しい人向け】って書いてあるとこだけ読むといいと思います。

KestrelHandler.scala

プロトコルの実装。メッセージハンドラ。

Actor継承してますが、これはcom.twitter.actor.Actorで、twitterActorというライブラリです。2.7.7の標準Actorライブラリをベースに、内部の実装だけを変えてあるらしいです。標準の実装は had a bug that caused very high GC pressure だそうな。

http://github.com/robey/twitterActors

actメソッドの中身は以下の通り。

def act = {
  loop {
    react {
      case MinaMessage.MessageReceived(msg) =>
        handle(msg.asInstanceOf[memcache.Request])

      case MinaMessage.ExceptionCaught(cause) => {
        cause.getCause match {
          case _: ProtocolError => writeResponse("CLIENT_ERROR\r\n")
          case _: IOException =>
            log.debug("I/O Exception on session %d: %s", sessionID, cause.toString)
          case _ =>
            log.error(cause, "Exception caught on session %d: %s", sessionID, cause.toString)
            writeResponse("ERROR\r\n")
        }
        session.close(false)
      }

      case MinaMessage.SessionClosed =>
        log.debug("End of session %d", sessionID)
        abortAnyTransaction
        KestrelStats.sessions.decr
        exit()

      case MinaMessage.SessionIdle(status) =>
        log.debug("Idle timeout on session %s", session)
        session.close(false)
    }
  }
}

このKestrelHandlerにメッセージを渡す部分は、naggatiというライブラリで実装されています。コードの中に出てくるMinaMessageはnaggatiで定義されています。

robey/naggati · GitHub

naggatiはApache Minaのプロトコルハンドラとして実装されていて、受け取った値をActor(KestrelHandler)へのメッセージとして送信してきます。naggatiについての詳細はこちら。

Actors, Mina, and Naggati

Apache MinaはNIOベースの軽量な汎用プロトコルを実装するためのライブラリみたい。詳しくはこちら。

Apache MINA — Apache MINA

getとかsetとかを受け取ったときの処理はhandleメソッドに書かれています。

private def handle(request: memcache.Request) = {
  request.line(0) match {
    case "GET" => get(request.line(1))
    case "SET" =>
      try {
        set(request.line(1), request.line(2).toInt, request.line(3).toInt, request.data.get)
      } catch {
        case e: NumberFormatException =>
          throw new ProtocolError("bad request: " + request)
      }
    case "STATS" => stats
    case "SHUTDOWN" => shutdown
    case "RELOAD" =>
      Configgy.reload
      writeResponse("Reloaded config.\r\n")
    case "FLUSH" =>
      flush(request.line(1))
    case "FLUSH_ALL" =>
      for (qName <- Kestrel.queues.queueNames) {
        Kestrel.queues.flush(qName)
      }
      writeResponse("Flushed all queues.\r\n")
    case "DUMP_CONFIG" =>
      dumpConfig()
    case "DUMP_STATS" =>
      dumpStats()
    case "DELETE" =>
      delete(request.line(1))
    case "FLUSH_EXPIRED" =>
      flushExpired(request.line(1))
    case "FLUSH_ALL_EXPIRED" =>
      val flushed = Kestrel.queues.flushAllExpired()
      writeResponse("%d\r\n".format(flushed))
    case "VERSION" =>
      version()
    case "QUIT" =>
      quit()
  }
}

あとはgetとsetの実装がメインです。長いので抜粋しますがコードを読まないとわかんない仕様が満載です。*1
たとえば、def get(name: String)では、

if (name contains '/') {
  val options = name.split("/")
  key = options(0)
  for (i <- 1 until options.length) {
    val opt = options(i)
    if (opt startsWith "t=") {
      timeout = opt.substring(2).toInt
    }
    if (opt == "close") closing = true
    if (opt == "open") opening = true
    if (opt == "abort") aborting = true
    if (opt == "peek") peeking = true
  }
}

こんな感じに"/"区切りでオプションが指定できたり、t=でタイムアウト指定ができたりします。

それから、def set(name: String, flags: Int, expiry: Int, data: Array[Byte])では、

KestrelStats.setRequests.incr
if (Kestrel.queues.add(name, data, expiry)) {
  writeResponse("STORED\r\n")
} else {
  writeResponse("NOT_STORED\r\n")
}

と、さらっとflagsを無視してたりします。

Kestrel.scala

【忙しい人向け】
KestrelHandlerのget/setメソッドで出てくる、KestrelとKestrelStatsというオブジェクトが定義されています。
まさにKestrel本体で、mainメソッドが定義されています。やってることは、初期化とMinaの起動。

def main(args: Array[String]): Unit = {
  runtime.load(args)
  startup(Configgy.config)
}

KestrelHandlerから参照される、Kestrel.queuesはQueueCollectionというクラスのインスタンスで、startupメソッドの中で以下のように初期化されてます。QueueCollectionについては後で見ていきます。

  queues = new QueueCollection(config.getString("queue_path", "/tmp"), config.configMap("queues"))
  ...
  queues.loadQueues()

startupメソッドの最後に以下のような記述がありますが、これはjava.util.concurrent.CountDownLatchを使っていて、shutdownメソッドが呼ばれるまでスレッドをブロックし続けるActorを起動しています。deathSwitchって名前がカコイイ。

def startup(config: Config): Unit = {
  ...
  actor {
    deathSwitch.await
  }
}

def shutdown(): Unit = {
  ...
  deathSwitch.countDown
}

QueueCollection.scala

【忙しい人向け】
mutable.HashMap[String, PersistentQueue]で内部表現された複数のキューを保持するクラスです。

Kestrel.startupで呼ばれるloadQueuesは以下の通り。
コンストラクタに渡されたパスにあるファイルからキューを読み込みます。変数pathはjava.io.Fileオブジェクト。
このコードによると、キューのファイル名には"~~"が含まれるらしい。1ファイルが1つのキューのようです。

// preload any queues
def loadQueues() {
  path.list() filter { name => !(name contains "~~") } map { queue(_) }
}

Javaでこれをやろうとすると、↓のような感じでFilenameFilterクラスを継承した匿名クラス作って云々とめんどくさいところですが、Scalaらくちんですね。

public List<String> loadQueues() {
  File[] files = path.list(new FilenameFilter() {
    public boolean accept(File file, String name) {
      return name.contains("~~");
    }
  })
  for (File file : files)
    queue(file)
}

で、queueメソッド
これは「キューする」メソッドじゃなくて、キューのコレクションから、キューを1つ取得するメソッドです。
private[kestrel]はnet.lag.kestrelパッケージのみから見えるってやつですね、package private。
queuesはmutable.HashMap[String, PersistentQueue]オブジェクトです。

/**
 * Get a named queue, creating it if necessary.
 * Exposed only to unit tests.
 */
private[kestrel] def queue(name: String): Option[PersistentQueue] = synchronized {
  if (shuttingDown) {
    None
  } else {
    Some(queues.get(name) getOrElse {
      // only happens when creating a queue for the first time.
      val q = if (name contains '+') {
        val master = name.split('+')(0)
        fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name
        log.info("Fanout queue %s added to %s", name, master)
        new PersistentQueue(path.getPath, name, queueConfigs.configMap(master))
      } else {
        new PersistentQueue(path.getPath, name, queueConfigs.configMap(name))
      }
      q.setup
      queues(name) = q
      q
    })
  }
}

メソッド全体がsynchronizedです。Javaと違ってsynchronizedは構文じゃなくてAnyRefのメソッドなので、メソッド全体を同期しようと思うと、こういう書き方になっちゃいます。Javaだとprivate synchronized Option queue...ですね。

Some(queues.get(name) getOrElse {...} のところはmapから同名のキューを取得しようとして、無かったら {...} のところをやる、という感じです。mapはApplyだと値をそのまま返しますが、getだとOptionでラップして返します。

{...} のところをあらためて見ると。

    Some(queues.get(name) getOrElse {
      // only happens when creating a queue for the first time.
      val q = if (name contains '+') {
        val master = name.split('+')(0)
        fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name
        log.info("Fanout queue %s added to %s", name, master)
        new PersistentQueue(path.getPath, name, queueConfigs.configMap(master))
      } else {
        new PersistentQueue(path.getPath, name, queueConfigs.configMap(name))
      }
      q.setup
      queues(name) = q
      q
    })

PersistentQueueを作って、queuesに入れてます。
名前(ファイル名)に'+'があったら、'+'の前の文字列をキーとしてfanout_queues(mutable.HashMap[String, mutable.HashSet[String]])にも「名前だけを」追加します。

このfanout_queuesってのがどう使われているかというと、addメソッドの先頭で以下のように使ってます。

def add(key: String, item: Array[Byte], expiry: Int): Boolean = {
  for (fanouts <- fanout_queues.get(key); name <- fanouts) {
    add(name, item, expiry)
  }
  ...
}

追加しようとしたメッセージを、fanout_queuesの中のすべてのキューに再帰的に追加しています。
これでtwitterのファンアウトの仕組みが実現できてる。。。のか??よくわかんないや。

PersistentQueue.scala

【忙しい人向け】
Kestrel.queuesの中にあるキューの実装です。長いので詳細はコード読むといいです。

主なメソッドとしては、以下の辺りを見るといいです。

def add(Array[Byte], Long)
キューにメッセージを追加します。
def peek()
キューの先頭からメッセージを1件取得します。
def remove(Boolean)
peekと同様ですが、取得したメッセージはキューから削除します。
def operateReact(=> Option[QItem], Long)(Option[QItem] => Unit)
キューのメッセージを1件取得し、Actor.self.reactWithinで処理します。removeReactまたはpeekReactから呼ばれます。
def operateReceive(=> Option[QItem], Long)
キューのメッセージを1件取得し、Actor.self.receiveWithinで処理します。removeReceiveまたはpeekReceiveから呼ばれます。
def operateOrWait(=> Option[QItem], Long)
QItemを返す関数から謎のTupleを返します。キューが顕在ならWaiterというcase classでラップして、キューがとまってたりしたらそのままで返します。
def replayJournal()
ジャーナルを読み込んで、中のメッセージを処理します。メッセージを残してシャットダウン&再起動したときとかの処理ですね。
def setup()
キューが作られたときに呼ばれて、replayJournalを呼びます。

読んでて思ったのが、このtupleを返すメソッドがとても分かりにくいこと。

private def operateOrWait(op: => Option[QItem], timeoutAbsolute: Long): (Option[QItem], Option[Waiter]) = synchronized {
  val item = op
  if (!item.isDefined && !closed && !paused && timeoutAbsolute > 0) {
    val w = Waiter(Actor.self)
    waiters += w
    (None, Some(w))
  } else {
    (item, None)
  }
}

型は(Option[QItem], Option[Waiter])で、条件に応じてどっちかがNoneになって返るという仕様なわけだけど、どっちがどっちなのかが分かりにくい。以下のコードが呼び出し元。

def operateReact(op: => Option[QItem], timeoutAbsolute: Long)(f: Option[QItem] => Unit): Unit = {
  operateOrWait(op, timeoutAbsolute) match {
    case (item, None) =>
      f(item)
    case (None, Some(w)) =>
      Actor.self.reactWithin((timeoutAbsolute - Time.now.inMilliseconds) max 0) {
        case ItemArrived => operateReact(op, timeoutAbsolute)(f)
        case TIMEOUT => synchronized {
          waiters -= w
          // race: someone could have done an add() between the timeout and grabbing the lock.
          Actor.self.reactWithin(0) {
            case ItemArrived => f(op)
            case TIMEOUT => f(op)
          }
        }
      }
    case _ => throw new RuntimeException()
  }
}

tupleをパターンマッチさせてるわけですが、型が書いていないんでどっちがどっちだか分かりにくい。
tupleはこういう戻り値を複数にしたいときとかパターンマッチに使いたいときとかに便利だと思いますが、値の意味が明確じゃないんで読みにくいです。

QItem.scala

【忙しい人向け】
PersistentQueueに入るメッセージをあらわすクラス。PersistentQueue#addの中でパラメータからこのQItemを作ります。

ジャーナルファイルに書くときとかのデータ表現はこんなふうになってるみたいですね。packを読んだ時にbyte配列化されます。
| 8bitのタイムスタンプ | 8byteの有効期限 | データ本文のリトルエンディアン表現 |

case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int) {
  def pack(): Array[Byte] = {
    val bytes = new Array[Byte](data.length + 16)
    val buffer = ByteBuffer.wrap(bytes)
    buffer.order(ByteOrder.LITTLE_ENDIAN)
    buffer.putLong(addTime)
    buffer.putLong(expiry)
    buffer.put(data)
    bytes
  }
}

unpackについては、コンパニオンオブジェクトに定義されてます。
unpackOldAddとかありますが、昔の仕様かなんかでしょう。

object QItem {
  def unpack(data: Array[Byte]): QItem 
  def unpackOldAdd(data: Array[Byte]): QItem
}

Journal.scala

キュー操作を記録しているジャーナルのクラスです。

ジャーナルに記録されたメッセージを再処理するためのreplayメソッドは以下の通り。
readJournalEntryでファイルの中からメッセージを1件Tuple(JournalItem,Int)で読み込んで、2つ目の引数で受け取ってる関数fを適用してます。

def replay(name: String)(f: JournalItem => Unit): Unit = {
  size = 0
  var lastUpdate = 0L
  val TEN_MB = 10L * 1024 * 1024
  try {
    val in = new FileInputStream(queueFile).getChannel
    try {
      replayer = Some(in)
      var done = false
      do {
        readJournalEntry(in) match {
          case (JournalItem.EndOfFile, _) => done = true
          case (x, itemsize) =>
            size += itemsize
            f(x)
            if (size / TEN_MB > lastUpdate) {
              lastUpdate = size / TEN_MB
              log.info("Continuing to read '%s' journal; %d MB so far...", name, lastUpdate * 10)
            }
        }
      } while (!done)
    } catch {
      case e: BrokenItemException =>
        log.error(e, "Exception replaying journal for '%s'", name)
        log.error("DATA MAY HAVE BEEN LOST! Truncated entry will be deleted.")
        truncateJournal(e.lastValidPosition)
    }
  } catch {
    case e: FileNotFoundException =>
      log.info("No transaction journal for '%s'; starting with empty queue.", name)
    case e: IOException =>
      log.error(e, "Exception replaying journal for '%s'", name)
      log.error("DATA MAY HAVE BEEN LOST!")
      // this can happen if the server hardware died abruptly in the middle
      // of writing a journal. not awesome but we should recover.
  }
  replayer = None
}

このメソッドは、PersistentQueueの初期化時(replayJournalメソッド)に以下のようにPartialFunctionを引数として呼ばれていて、ジャーナルファイルからキューが再現されていることが分かります。

journal.replay(name) {
  case JournalItem.Add(item) =>
    _add(item)
    // when processing the journal, this has to happen after:
    if (!journal.inReadBehind && queueSize >= maxMemorySize()) {
      log.info("Dropping to read-behind for queue '%s' (%d bytes)", name, queueSize)
      journal.startReadBehind
    }
  case JournalItem.Remove => _remove(false)
  case JournalItem.RemoveTentative => _remove(true)
  case JournalItem.SavedXid(xid) => xidCounter = xid
  case JournalItem.Unremove(xid) => _unremove(xid)
  case JournalItem.ConfirmRemove(xid) => openTransactions.removeKey(xid)
  case x => log.error("Unexpected item in journal: %s", x)
}

Counter.scala

ただのAtomicLongのラッパークラスです。
引数なしapplyがgetterってのが気に入りませんが、そんだけです。
KestrelとかPersistentQueue、QueueCollectionあたりで使ってますが、PersistentQueueあたりでは@volatileなvarだったり、synchronizedなgetterを定義していたりします。同期処理の妙がいろいろとあるようですがわかりません。。。

class Counter {
  private val value = new AtomicLong(0)

  def apply() = value.get
  def set(n: Long) = value.set(n)
  def incr() = value.addAndGet(1)
  def incr(n: Long) = value.addAndGet(n)
  def decr() = value.addAndGet(-1)
  def decr(n: Long) = value.addAndGet(-n)
  override def toString = value.get.toString
}

BrokenItemException.scala

ジャーナルを処理する際の例外クラス。Journal.scalaで使われてて、そのままthrowされたりもします。
こういうException書くときはcase classでやっちゃえばいいんですねー。

case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause)

memcache/*

Apache Minaでmemcacheプロトコルを実装するためのCodecです。

tools/*

QDumperはジャーナルファイルをパースして標準出力にダンプするクラスです。
Utilオブジェクトはbyte配列のサイズを見やすく表示するためのメソッドが定義されてます。


以上。
コード量はたいしたことないから一度みんな読んでみるといいと思います。

*1:すみません、ドキュメントありました。 http://github.com/robey/kestrel/blob/master/docs/guide.md

kestrelを試してみたよ

メッセージキューイングシステムが必要でActiveMQとか調べてたけど、やっぱなんかめんどくさそうだし、必要以上にエンタープライズ臭がするのでkestrelを試してみました。

twitter/kestrel · GitHub

kestrelはTwitterのバッググラウンドで使われている(らしい)、メッセージキューイングシステムです。
特徴は、ゆろよろさんのブログにも和訳(githubにおいてあるREADMEの和訳)がありますが、以下の通りです。

  • 速くて、
  • 小さくて、
  • 障害に強くて、
  • 信頼できる。

今のコード量はコメント入れて2000行ほどで、たしかに小さいです。
あと、大きな特徴として、memcacheプロトコルでキューへのput/getを行います。
また、ActiveMQと比べたらライトウェイトで速そうな雰囲気です。

必要なもの

  • JDK6
  • Scala2.7.7 final
  • sbt
  • git

ダウンロードとインストール

まずはgithubからソースをダウンロードします。
http://github.com/robey/kestrel

sbtを使っているので、適当に展開して以下のコマンドでビルドします。

sbt clean update package-dist

※sbtをScala2.8.0で動かすとビルドできません

正常にビルドできると、dist/kestrel-x.x.xというディレクトリができます。(x.x.xはバージョン番号)
このディレクトリをどっかよさげなところにコピーして使います。以降$KESTREL_HOMEと呼びます。

起動してみる

起動するためには、以下の2つのディレクトリが必要なので、作成&書き込めるようにしておきます。

  • ジャーナル保存用ディレクトリ /var/spool/kestrel
  • 動作ログ用ディレクトリ /var/log/kestrel

このへんの設定は、$KESTREL_HOME/config/production.conf に書かれてます。

  3 # where to listen for connections:
  4 port = 22133
  5 host = "0.0.0.0"
  6
  7 log {
  8   filename = "/var/log/kestrel/kestrel.log"
  9   roll = "daily"
 10   level = "info"
 11 }
 12
 13 queue_path = "/var/spool/kestrel"

で、起動するためには $KESTREL_HOME/scripts/kestrel.sh というdaemonizeして動かすためのシェルスクリプトがありますが、オレオレスクリプトな感じなので無視して、以下のコマンドで普通に起動できます。

java -jar $KESTREL_HOME/kestrel-x.x.x.jar &

入れてみる

さっきの設定ファイルに書いてある通り、デフォルトではポート22133で待ち受けてます。

というわけで、telnetでつないでmemcacheプロトコルっぽくいじっています。

$ telnet localhost 22133
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
stats <= 入力
STAT uptime 15
STAT time 1287045949
STAT version 1.2.2
STAT curr_items 0
STAT total_items 0
STAT bytes 0
...
END
set keyname 0 0 1 <= 入力
a <= 入力
STORED
set keyname 0 0 1 <= 入力
b <= 入力
STORED
set keyname 0 0 1 <= 入力
c <= 入力
STORED
get keyname <= 入力
VALUE keyname 0 1
1
END
get keyname <= 入力
VALUE keyname 0 1
b
END
get keyname <= 入力
VALUE keyname 0 1
c
END

という感じで、キューとして機能します。

と、ここまで書いてみたけど、まんまyuroyoroさんのエントリと一緒だ。
引き続き、kestrelをちゃんとしたスケーラブルなMQとして使うまでがんばります。