kestrelのソースコード
kestrelについては前回のエントリ
kestrelを試してみたよ - na
と、githubを参照のこと。
ソースコードは↑から見れますが、
/src/main/scala/net/lag/kestrel/ 809 BrokenItemException.scala 1043 Counter.scala 10126 Journal.scala 5900 Kestrel.scala 12064 KestrelHandler.scala 18436 PersistentQueue.scala 1732 QItem.scala 6765 QueueCollection.scala memcache/ 1921 ASCIICodec.scala 406 BinaryCodec.scala 1666 Codec.scala tools/ 4475 QDumper.scala 1616 Util.scala
ファイルサイズこんだけ。
$ find . -name "*.scala" | xargs wc -l 351 ./KestrelHandler.scala 159 ./Kestrel.scala 238 ./QueueCollection.scala 55 ./QItem.scala 565 ./PersistentQueue.scala 350 ./Journal.scala 32 ./Counter.scala 22 ./BrokenItemException.scala 14 ./memcache/BinaryCodec.scala 57 ./memcache/ASCIICodec.scala 56 ./memcache/Codec.scala 150 ./tools/QDumper.scala 53 ./tools/Util.scala 2102 合計
行数こんだけ。
というわけで、1個ずつ見ていきましょう。長くなりそうなので忙しい人は【忙しい人向け】って書いてあるとこだけ読むといいと思います。
KestrelHandler.scala
プロトコルの実装。メッセージハンドラ。
Actor継承してますが、これはcom.twitter.actor.Actorで、twitterActorというライブラリです。2.7.7の標準Actorライブラリをベースに、内部の実装だけを変えてあるらしいです。標準の実装は had a bug that caused very high GC pressure だそうな。
http://github.com/robey/twitterActors
actメソッドの中身は以下の通り。
def act = { loop { react { case MinaMessage.MessageReceived(msg) => handle(msg.asInstanceOf[memcache.Request]) case MinaMessage.ExceptionCaught(cause) => { cause.getCause match { case _: ProtocolError => writeResponse("CLIENT_ERROR\r\n") case _: IOException => log.debug("I/O Exception on session %d: %s", sessionID, cause.toString) case _ => log.error(cause, "Exception caught on session %d: %s", sessionID, cause.toString) writeResponse("ERROR\r\n") } session.close(false) } case MinaMessage.SessionClosed => log.debug("End of session %d", sessionID) abortAnyTransaction KestrelStats.sessions.decr exit() case MinaMessage.SessionIdle(status) => log.debug("Idle timeout on session %s", session) session.close(false) } } }
このKestrelHandlerにメッセージを渡す部分は、naggatiというライブラリで実装されています。コードの中に出てくるMinaMessageはnaggatiで定義されています。
naggatiはApache Minaのプロトコルハンドラとして実装されていて、受け取った値をActor(KestrelHandler)へのメッセージとして送信してきます。naggatiについての詳細はこちら。
Apache MinaはNIOベースの軽量な汎用プロトコルを実装するためのライブラリみたい。詳しくはこちら。
getとかsetとかを受け取ったときの処理はhandleメソッドに書かれています。
private def handle(request: memcache.Request) = { request.line(0) match { case "GET" => get(request.line(1)) case "SET" => try { set(request.line(1), request.line(2).toInt, request.line(3).toInt, request.data.get) } catch { case e: NumberFormatException => throw new ProtocolError("bad request: " + request) } case "STATS" => stats case "SHUTDOWN" => shutdown case "RELOAD" => Configgy.reload writeResponse("Reloaded config.\r\n") case "FLUSH" => flush(request.line(1)) case "FLUSH_ALL" => for (qName <- Kestrel.queues.queueNames) { Kestrel.queues.flush(qName) } writeResponse("Flushed all queues.\r\n") case "DUMP_CONFIG" => dumpConfig() case "DUMP_STATS" => dumpStats() case "DELETE" => delete(request.line(1)) case "FLUSH_EXPIRED" => flushExpired(request.line(1)) case "FLUSH_ALL_EXPIRED" => val flushed = Kestrel.queues.flushAllExpired() writeResponse("%d\r\n".format(flushed)) case "VERSION" => version() case "QUIT" => quit() } }
あとはgetとsetの実装がメインです。長いので抜粋しますがコードを読まないとわかんない仕様が満載です。*1
たとえば、def get(name: String)では、
if (name contains '/') { val options = name.split("/") key = options(0) for (i <- 1 until options.length) { val opt = options(i) if (opt startsWith "t=") { timeout = opt.substring(2).toInt } if (opt == "close") closing = true if (opt == "open") opening = true if (opt == "abort") aborting = true if (opt == "peek") peeking = true } }
こんな感じに"/"区切りでオプションが指定できたり、t=でタイムアウト指定ができたりします。
それから、def set(name: String, flags: Int, expiry: Int, data: Array[Byte])では、
KestrelStats.setRequests.incr if (Kestrel.queues.add(name, data, expiry)) { writeResponse("STORED\r\n") } else { writeResponse("NOT_STORED\r\n") }
と、さらっとflagsを無視してたりします。
Kestrel.scala
【忙しい人向け】
KestrelHandlerのget/setメソッドで出てくる、KestrelとKestrelStatsというオブジェクトが定義されています。
まさにKestrel本体で、mainメソッドが定義されています。やってることは、初期化とMinaの起動。
def main(args: Array[String]): Unit = { runtime.load(args) startup(Configgy.config) }
KestrelHandlerから参照される、Kestrel.queuesはQueueCollectionというクラスのインスタンスで、startupメソッドの中で以下のように初期化されてます。QueueCollectionについては後で見ていきます。
queues = new QueueCollection(config.getString("queue_path", "/tmp"), config.configMap("queues")) ... queues.loadQueues()
startupメソッドの最後に以下のような記述がありますが、これはjava.util.concurrent.CountDownLatchを使っていて、shutdownメソッドが呼ばれるまでスレッドをブロックし続けるActorを起動しています。deathSwitchって名前がカコイイ。
def startup(config: Config): Unit = { ... actor { deathSwitch.await } } def shutdown(): Unit = { ... deathSwitch.countDown }
QueueCollection.scala
【忙しい人向け】
mutable.HashMap[String, PersistentQueue]で内部表現された複数のキューを保持するクラスです。
Kestrel.startupで呼ばれるloadQueuesは以下の通り。
コンストラクタに渡されたパスにあるファイルからキューを読み込みます。変数pathはjava.io.Fileオブジェクト。
このコードによると、キューのファイル名には"~~"が含まれるらしい。1ファイルが1つのキューのようです。
// preload any queues def loadQueues() { path.list() filter { name => !(name contains "~~") } map { queue(_) } }
Javaでこれをやろうとすると、↓のような感じでFilenameFilterクラスを継承した匿名クラス作って云々とめんどくさいところですが、Scalaらくちんですね。
public List<String> loadQueues() { File[] files = path.list(new FilenameFilter() { public boolean accept(File file, String name) { return name.contains("~~"); } }) for (File file : files) queue(file) }
で、queueメソッド。
これは「キューする」メソッドじゃなくて、キューのコレクションから、キューを1つ取得するメソッドです。
private[kestrel]はnet.lag.kestrelパッケージのみから見えるってやつですね、package private。
queuesはmutable.HashMap[String, PersistentQueue]オブジェクトです。
/** * Get a named queue, creating it if necessary. * Exposed only to unit tests. */ private[kestrel] def queue(name: String): Option[PersistentQueue] = synchronized { if (shuttingDown) { None } else { Some(queues.get(name) getOrElse { // only happens when creating a queue for the first time. val q = if (name contains '+') { val master = name.split('+')(0) fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name log.info("Fanout queue %s added to %s", name, master) new PersistentQueue(path.getPath, name, queueConfigs.configMap(master)) } else { new PersistentQueue(path.getPath, name, queueConfigs.configMap(name)) } q.setup queues(name) = q q }) } }
メソッド全体がsynchronizedです。Javaと違ってsynchronizedは構文じゃなくてAnyRefのメソッドなので、メソッド全体を同期しようと思うと、こういう書き方になっちゃいます。Javaだとprivate synchronized Option
Some(queues.get(name) getOrElse {...} のところはmapから同名のキューを取得しようとして、無かったら {...} のところをやる、という感じです。mapはApplyだと値をそのまま返しますが、getだとOptionでラップして返します。
{...} のところをあらためて見ると。
Some(queues.get(name) getOrElse { // only happens when creating a queue for the first time. val q = if (name contains '+') { val master = name.split('+')(0) fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name log.info("Fanout queue %s added to %s", name, master) new PersistentQueue(path.getPath, name, queueConfigs.configMap(master)) } else { new PersistentQueue(path.getPath, name, queueConfigs.configMap(name)) } q.setup queues(name) = q q })
PersistentQueueを作って、queuesに入れてます。
名前(ファイル名)に'+'があったら、'+'の前の文字列をキーとしてfanout_queues(mutable.HashMap[String, mutable.HashSet[String]])にも「名前だけを」追加します。
このfanout_queuesってのがどう使われているかというと、addメソッドの先頭で以下のように使ってます。
def add(key: String, item: Array[Byte], expiry: Int): Boolean = { for (fanouts <- fanout_queues.get(key); name <- fanouts) { add(name, item, expiry) } ... }
追加しようとしたメッセージを、fanout_queuesの中のすべてのキューに再帰的に追加しています。
これでtwitterのファンアウトの仕組みが実現できてる。。。のか??よくわかんないや。
PersistentQueue.scala
【忙しい人向け】
Kestrel.queuesの中にあるキューの実装です。長いので詳細はコード読むといいです。
主なメソッドとしては、以下の辺りを見るといいです。
- def add(Array[Byte], Long)
- キューにメッセージを追加します。
- def peek()
- キューの先頭からメッセージを1件取得します。
- def remove(Boolean)
- peekと同様ですが、取得したメッセージはキューから削除します。
- def operateReact(=> Option[QItem], Long)(Option[QItem] => Unit)
- キューのメッセージを1件取得し、Actor.self.reactWithinで処理します。removeReactまたはpeekReactから呼ばれます。
- def operateReceive(=> Option[QItem], Long)
- キューのメッセージを1件取得し、Actor.self.receiveWithinで処理します。removeReceiveまたはpeekReceiveから呼ばれます。
- def operateOrWait(=> Option[QItem], Long)
- QItemを返す関数から謎のTupleを返します。キューが顕在ならWaiterというcase classでラップして、キューがとまってたりしたらそのままで返します。
- def replayJournal()
- ジャーナルを読み込んで、中のメッセージを処理します。メッセージを残してシャットダウン&再起動したときとかの処理ですね。
- def setup()
- キューが作られたときに呼ばれて、replayJournalを呼びます。
読んでて思ったのが、このtupleを返すメソッドがとても分かりにくいこと。
private def operateOrWait(op: => Option[QItem], timeoutAbsolute: Long): (Option[QItem], Option[Waiter]) = synchronized { val item = op if (!item.isDefined && !closed && !paused && timeoutAbsolute > 0) { val w = Waiter(Actor.self) waiters += w (None, Some(w)) } else { (item, None) } }
型は(Option[QItem], Option[Waiter])で、条件に応じてどっちかがNoneになって返るという仕様なわけだけど、どっちがどっちなのかが分かりにくい。以下のコードが呼び出し元。
def operateReact(op: => Option[QItem], timeoutAbsolute: Long)(f: Option[QItem] => Unit): Unit = { operateOrWait(op, timeoutAbsolute) match { case (item, None) => f(item) case (None, Some(w)) => Actor.self.reactWithin((timeoutAbsolute - Time.now.inMilliseconds) max 0) { case ItemArrived => operateReact(op, timeoutAbsolute)(f) case TIMEOUT => synchronized { waiters -= w // race: someone could have done an add() between the timeout and grabbing the lock. Actor.self.reactWithin(0) { case ItemArrived => f(op) case TIMEOUT => f(op) } } } case _ => throw new RuntimeException() } }
tupleをパターンマッチさせてるわけですが、型が書いていないんでどっちがどっちだか分かりにくい。
tupleはこういう戻り値を複数にしたいときとかパターンマッチに使いたいときとかに便利だと思いますが、値の意味が明確じゃないんで読みにくいです。
QItem.scala
【忙しい人向け】
PersistentQueueに入るメッセージをあらわすクラス。PersistentQueue#addの中でパラメータからこのQItemを作ります。
ジャーナルファイルに書くときとかのデータ表現はこんなふうになってるみたいですね。packを読んだ時にbyte配列化されます。
| 8bitのタイムスタンプ | 8byteの有効期限 | データ本文のリトルエンディアン表現 |
case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int) { def pack(): Array[Byte] = { val bytes = new Array[Byte](data.length + 16) val buffer = ByteBuffer.wrap(bytes) buffer.order(ByteOrder.LITTLE_ENDIAN) buffer.putLong(addTime) buffer.putLong(expiry) buffer.put(data) bytes } }
unpackについては、コンパニオンオブジェクトに定義されてます。
unpackOldAddとかありますが、昔の仕様かなんかでしょう。
object QItem { def unpack(data: Array[Byte]): QItem def unpackOldAdd(data: Array[Byte]): QItem }
Journal.scala
キュー操作を記録しているジャーナルのクラスです。
ジャーナルに記録されたメッセージを再処理するためのreplayメソッドは以下の通り。
readJournalEntryでファイルの中からメッセージを1件Tuple(JournalItem,Int)で読み込んで、2つ目の引数で受け取ってる関数fを適用してます。
def replay(name: String)(f: JournalItem => Unit): Unit = { size = 0 var lastUpdate = 0L val TEN_MB = 10L * 1024 * 1024 try { val in = new FileInputStream(queueFile).getChannel try { replayer = Some(in) var done = false do { readJournalEntry(in) match { case (JournalItem.EndOfFile, _) => done = true case (x, itemsize) => size += itemsize f(x) if (size / TEN_MB > lastUpdate) { lastUpdate = size / TEN_MB log.info("Continuing to read '%s' journal; %d MB so far...", name, lastUpdate * 10) } } } while (!done) } catch { case e: BrokenItemException => log.error(e, "Exception replaying journal for '%s'", name) log.error("DATA MAY HAVE BEEN LOST! Truncated entry will be deleted.") truncateJournal(e.lastValidPosition) } } catch { case e: FileNotFoundException => log.info("No transaction journal for '%s'; starting with empty queue.", name) case e: IOException => log.error(e, "Exception replaying journal for '%s'", name) log.error("DATA MAY HAVE BEEN LOST!") // this can happen if the server hardware died abruptly in the middle // of writing a journal. not awesome but we should recover. } replayer = None }
このメソッドは、PersistentQueueの初期化時(replayJournalメソッド)に以下のようにPartialFunctionを引数として呼ばれていて、ジャーナルファイルからキューが再現されていることが分かります。
journal.replay(name) { case JournalItem.Add(item) => _add(item) // when processing the journal, this has to happen after: if (!journal.inReadBehind && queueSize >= maxMemorySize()) { log.info("Dropping to read-behind for queue '%s' (%d bytes)", name, queueSize) journal.startReadBehind } case JournalItem.Remove => _remove(false) case JournalItem.RemoveTentative => _remove(true) case JournalItem.SavedXid(xid) => xidCounter = xid case JournalItem.Unremove(xid) => _unremove(xid) case JournalItem.ConfirmRemove(xid) => openTransactions.removeKey(xid) case x => log.error("Unexpected item in journal: %s", x) }
Counter.scala
ただのAtomicLongのラッパークラスです。
引数なしapplyがgetterってのが気に入りませんが、そんだけです。
KestrelとかPersistentQueue、QueueCollectionあたりで使ってますが、PersistentQueueあたりでは@volatileなvarだったり、synchronizedなgetterを定義していたりします。同期処理の妙がいろいろとあるようですがわかりません。。。
class Counter { private val value = new AtomicLong(0) def apply() = value.get def set(n: Long) = value.set(n) def incr() = value.addAndGet(1) def incr(n: Long) = value.addAndGet(n) def decr() = value.addAndGet(-1) def decr(n: Long) = value.addAndGet(-n) override def toString = value.get.toString }
BrokenItemException.scala
ジャーナルを処理する際の例外クラス。Journal.scalaで使われてて、そのままthrowされたりもします。
こういうException書くときはcase classでやっちゃえばいいんですねー。
case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause)
tools/*
QDumperはジャーナルファイルをパースして標準出力にダンプするクラスです。
Utilオブジェクトはbyte配列のサイズを見やすく表示するためのメソッドが定義されてます。
以上。
コード量はたいしたことないから一度みんな読んでみるといいと思います。
*1:すみません、ドキュメントありました。 http://github.com/robey/kestrel/blob/master/docs/guide.md