読者です 読者をやめる 読者になる 読者になる

kestrelのソースコード

Scala Kestrel

kestrelについては前回のエントリ
kestrelを試してみたよ - na
と、githubを参照のこと。

twitter/kestrel · GitHub

ソースコードは↑から見れますが、

/src/main/scala/net/lag/kestrel/
  809  BrokenItemException.scala
 1043  Counter.scala
10126  Journal.scala
 5900  Kestrel.scala
12064  KestrelHandler.scala
18436  PersistentQueue.scala
 1732  QItem.scala
 6765  QueueCollection.scala

memcache/
 1921  ASCIICodec.scala
  406  BinaryCodec.scala
 1666  Codec.scala

tools/
 4475  QDumper.scala
 1616  Util.scala

ファイルサイズこんだけ。

$ find . -name "*.scala" | xargs wc -l
  351 ./KestrelHandler.scala
  159 ./Kestrel.scala
  238 ./QueueCollection.scala
   55 ./QItem.scala
  565 ./PersistentQueue.scala
  350 ./Journal.scala
   32 ./Counter.scala
   22 ./BrokenItemException.scala
   14 ./memcache/BinaryCodec.scala
   57 ./memcache/ASCIICodec.scala
   56 ./memcache/Codec.scala
  150 ./tools/QDumper.scala
   53 ./tools/Util.scala
 2102 合計

行数こんだけ。

というわけで、1個ずつ見ていきましょう。長くなりそうなので忙しい人は【忙しい人向け】って書いてあるとこだけ読むといいと思います。

KestrelHandler.scala

プロトコルの実装。メッセージハンドラ。

Actor継承してますが、これはcom.twitter.actor.Actorで、twitterActorというライブラリです。2.7.7の標準Actorライブラリをベースに、内部の実装だけを変えてあるらしいです。標準の実装は had a bug that caused very high GC pressure だそうな。

http://github.com/robey/twitterActors

actメソッドの中身は以下の通り。

def act = {
  loop {
    react {
      case MinaMessage.MessageReceived(msg) =>
        handle(msg.asInstanceOf[memcache.Request])

      case MinaMessage.ExceptionCaught(cause) => {
        cause.getCause match {
          case _: ProtocolError => writeResponse("CLIENT_ERROR\r\n")
          case _: IOException =>
            log.debug("I/O Exception on session %d: %s", sessionID, cause.toString)
          case _ =>
            log.error(cause, "Exception caught on session %d: %s", sessionID, cause.toString)
            writeResponse("ERROR\r\n")
        }
        session.close(false)
      }

      case MinaMessage.SessionClosed =>
        log.debug("End of session %d", sessionID)
        abortAnyTransaction
        KestrelStats.sessions.decr
        exit()

      case MinaMessage.SessionIdle(status) =>
        log.debug("Idle timeout on session %s", session)
        session.close(false)
    }
  }
}

このKestrelHandlerにメッセージを渡す部分は、naggatiというライブラリで実装されています。コードの中に出てくるMinaMessageはnaggatiで定義されています。

robey/naggati · GitHub

naggatiはApache Minaのプロトコルハンドラとして実装されていて、受け取った値をActor(KestrelHandler)へのメッセージとして送信してきます。naggatiについての詳細はこちら。

Actors, Mina, and Naggati

Apache MinaはNIOベースの軽量な汎用プロトコルを実装するためのライブラリみたい。詳しくはこちら。

Apache MINA — Apache MINA

getとかsetとかを受け取ったときの処理はhandleメソッドに書かれています。

private def handle(request: memcache.Request) = {
  request.line(0) match {
    case "GET" => get(request.line(1))
    case "SET" =>
      try {
        set(request.line(1), request.line(2).toInt, request.line(3).toInt, request.data.get)
      } catch {
        case e: NumberFormatException =>
          throw new ProtocolError("bad request: " + request)
      }
    case "STATS" => stats
    case "SHUTDOWN" => shutdown
    case "RELOAD" =>
      Configgy.reload
      writeResponse("Reloaded config.\r\n")
    case "FLUSH" =>
      flush(request.line(1))
    case "FLUSH_ALL" =>
      for (qName <- Kestrel.queues.queueNames) {
        Kestrel.queues.flush(qName)
      }
      writeResponse("Flushed all queues.\r\n")
    case "DUMP_CONFIG" =>
      dumpConfig()
    case "DUMP_STATS" =>
      dumpStats()
    case "DELETE" =>
      delete(request.line(1))
    case "FLUSH_EXPIRED" =>
      flushExpired(request.line(1))
    case "FLUSH_ALL_EXPIRED" =>
      val flushed = Kestrel.queues.flushAllExpired()
      writeResponse("%d\r\n".format(flushed))
    case "VERSION" =>
      version()
    case "QUIT" =>
      quit()
  }
}

あとはgetとsetの実装がメインです。長いので抜粋しますがコードを読まないとわかんない仕様が満載です。*1
たとえば、def get(name: String)では、

if (name contains '/') {
  val options = name.split("/")
  key = options(0)
  for (i <- 1 until options.length) {
    val opt = options(i)
    if (opt startsWith "t=") {
      timeout = opt.substring(2).toInt
    }
    if (opt == "close") closing = true
    if (opt == "open") opening = true
    if (opt == "abort") aborting = true
    if (opt == "peek") peeking = true
  }
}

こんな感じに"/"区切りでオプションが指定できたり、t=でタイムアウト指定ができたりします。

それから、def set(name: String, flags: Int, expiry: Int, data: Array[Byte])では、

KestrelStats.setRequests.incr
if (Kestrel.queues.add(name, data, expiry)) {
  writeResponse("STORED\r\n")
} else {
  writeResponse("NOT_STORED\r\n")
}

と、さらっとflagsを無視してたりします。

Kestrel.scala

【忙しい人向け】
KestrelHandlerのget/setメソッドで出てくる、KestrelとKestrelStatsというオブジェクトが定義されています。
まさにKestrel本体で、mainメソッドが定義されています。やってることは、初期化とMinaの起動。

def main(args: Array[String]): Unit = {
  runtime.load(args)
  startup(Configgy.config)
}

KestrelHandlerから参照される、Kestrel.queuesはQueueCollectionというクラスのインスタンスで、startupメソッドの中で以下のように初期化されてます。QueueCollectionについては後で見ていきます。

  queues = new QueueCollection(config.getString("queue_path", "/tmp"), config.configMap("queues"))
  ...
  queues.loadQueues()

startupメソッドの最後に以下のような記述がありますが、これはjava.util.concurrent.CountDownLatchを使っていて、shutdownメソッドが呼ばれるまでスレッドをブロックし続けるActorを起動しています。deathSwitchって名前がカコイイ。

def startup(config: Config): Unit = {
  ...
  actor {
    deathSwitch.await
  }
}

def shutdown(): Unit = {
  ...
  deathSwitch.countDown
}

QueueCollection.scala

【忙しい人向け】
mutable.HashMap[String, PersistentQueue]で内部表現された複数のキューを保持するクラスです。

Kestrel.startupで呼ばれるloadQueuesは以下の通り。
コンストラクタに渡されたパスにあるファイルからキューを読み込みます。変数pathはjava.io.Fileオブジェクト。
このコードによると、キューのファイル名には"~~"が含まれるらしい。1ファイルが1つのキューのようです。

// preload any queues
def loadQueues() {
  path.list() filter { name => !(name contains "~~") } map { queue(_) }
}

Javaでこれをやろうとすると、↓のような感じでFilenameFilterクラスを継承した匿名クラス作って云々とめんどくさいところですが、Scalaらくちんですね。

public List<String> loadQueues() {
  File[] files = path.list(new FilenameFilter() {
    public boolean accept(File file, String name) {
      return name.contains("~~");
    }
  })
  for (File file : files)
    queue(file)
}

で、queueメソッド
これは「キューする」メソッドじゃなくて、キューのコレクションから、キューを1つ取得するメソッドです。
private[kestrel]はnet.lag.kestrelパッケージのみから見えるってやつですね、package private。
queuesはmutable.HashMap[String, PersistentQueue]オブジェクトです。

/**
 * Get a named queue, creating it if necessary.
 * Exposed only to unit tests.
 */
private[kestrel] def queue(name: String): Option[PersistentQueue] = synchronized {
  if (shuttingDown) {
    None
  } else {
    Some(queues.get(name) getOrElse {
      // only happens when creating a queue for the first time.
      val q = if (name contains '+') {
        val master = name.split('+')(0)
        fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name
        log.info("Fanout queue %s added to %s", name, master)
        new PersistentQueue(path.getPath, name, queueConfigs.configMap(master))
      } else {
        new PersistentQueue(path.getPath, name, queueConfigs.configMap(name))
      }
      q.setup
      queues(name) = q
      q
    })
  }
}

メソッド全体がsynchronizedです。Javaと違ってsynchronizedは構文じゃなくてAnyRefのメソッドなので、メソッド全体を同期しようと思うと、こういう書き方になっちゃいます。Javaだとprivate synchronized Option queue...ですね。

Some(queues.get(name) getOrElse {...} のところはmapから同名のキューを取得しようとして、無かったら {...} のところをやる、という感じです。mapはApplyだと値をそのまま返しますが、getだとOptionでラップして返します。

{...} のところをあらためて見ると。

    Some(queues.get(name) getOrElse {
      // only happens when creating a queue for the first time.
      val q = if (name contains '+') {
        val master = name.split('+')(0)
        fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name
        log.info("Fanout queue %s added to %s", name, master)
        new PersistentQueue(path.getPath, name, queueConfigs.configMap(master))
      } else {
        new PersistentQueue(path.getPath, name, queueConfigs.configMap(name))
      }
      q.setup
      queues(name) = q
      q
    })

PersistentQueueを作って、queuesに入れてます。
名前(ファイル名)に'+'があったら、'+'の前の文字列をキーとしてfanout_queues(mutable.HashMap[String, mutable.HashSet[String]])にも「名前だけを」追加します。

このfanout_queuesってのがどう使われているかというと、addメソッドの先頭で以下のように使ってます。

def add(key: String, item: Array[Byte], expiry: Int): Boolean = {
  for (fanouts <- fanout_queues.get(key); name <- fanouts) {
    add(name, item, expiry)
  }
  ...
}

追加しようとしたメッセージを、fanout_queuesの中のすべてのキューに再帰的に追加しています。
これでtwitterのファンアウトの仕組みが実現できてる。。。のか??よくわかんないや。

PersistentQueue.scala

【忙しい人向け】
Kestrel.queuesの中にあるキューの実装です。長いので詳細はコード読むといいです。

主なメソッドとしては、以下の辺りを見るといいです。

def add(Array[Byte], Long)
キューにメッセージを追加します。
def peek()
キューの先頭からメッセージを1件取得します。
def remove(Boolean)
peekと同様ですが、取得したメッセージはキューから削除します。
def operateReact(=> Option[QItem], Long)(Option[QItem] => Unit)
キューのメッセージを1件取得し、Actor.self.reactWithinで処理します。removeReactまたはpeekReactから呼ばれます。
def operateReceive(=> Option[QItem], Long)
キューのメッセージを1件取得し、Actor.self.receiveWithinで処理します。removeReceiveまたはpeekReceiveから呼ばれます。
def operateOrWait(=> Option[QItem], Long)
QItemを返す関数から謎のTupleを返します。キューが顕在ならWaiterというcase classでラップして、キューがとまってたりしたらそのままで返します。
def replayJournal()
ジャーナルを読み込んで、中のメッセージを処理します。メッセージを残してシャットダウン&再起動したときとかの処理ですね。
def setup()
キューが作られたときに呼ばれて、replayJournalを呼びます。

読んでて思ったのが、このtupleを返すメソッドがとても分かりにくいこと。

private def operateOrWait(op: => Option[QItem], timeoutAbsolute: Long): (Option[QItem], Option[Waiter]) = synchronized {
  val item = op
  if (!item.isDefined && !closed && !paused && timeoutAbsolute > 0) {
    val w = Waiter(Actor.self)
    waiters += w
    (None, Some(w))
  } else {
    (item, None)
  }
}

型は(Option[QItem], Option[Waiter])で、条件に応じてどっちかがNoneになって返るという仕様なわけだけど、どっちがどっちなのかが分かりにくい。以下のコードが呼び出し元。

def operateReact(op: => Option[QItem], timeoutAbsolute: Long)(f: Option[QItem] => Unit): Unit = {
  operateOrWait(op, timeoutAbsolute) match {
    case (item, None) =>
      f(item)
    case (None, Some(w)) =>
      Actor.self.reactWithin((timeoutAbsolute - Time.now.inMilliseconds) max 0) {
        case ItemArrived => operateReact(op, timeoutAbsolute)(f)
        case TIMEOUT => synchronized {
          waiters -= w
          // race: someone could have done an add() between the timeout and grabbing the lock.
          Actor.self.reactWithin(0) {
            case ItemArrived => f(op)
            case TIMEOUT => f(op)
          }
        }
      }
    case _ => throw new RuntimeException()
  }
}

tupleをパターンマッチさせてるわけですが、型が書いていないんでどっちがどっちだか分かりにくい。
tupleはこういう戻り値を複数にしたいときとかパターンマッチに使いたいときとかに便利だと思いますが、値の意味が明確じゃないんで読みにくいです。

QItem.scala

【忙しい人向け】
PersistentQueueに入るメッセージをあらわすクラス。PersistentQueue#addの中でパラメータからこのQItemを作ります。

ジャーナルファイルに書くときとかのデータ表現はこんなふうになってるみたいですね。packを読んだ時にbyte配列化されます。
| 8bitのタイムスタンプ | 8byteの有効期限 | データ本文のリトルエンディアン表現 |

case class QItem(addTime: Long, expiry: Long, data: Array[Byte], var xid: Int) {
  def pack(): Array[Byte] = {
    val bytes = new Array[Byte](data.length + 16)
    val buffer = ByteBuffer.wrap(bytes)
    buffer.order(ByteOrder.LITTLE_ENDIAN)
    buffer.putLong(addTime)
    buffer.putLong(expiry)
    buffer.put(data)
    bytes
  }
}

unpackについては、コンパニオンオブジェクトに定義されてます。
unpackOldAddとかありますが、昔の仕様かなんかでしょう。

object QItem {
  def unpack(data: Array[Byte]): QItem 
  def unpackOldAdd(data: Array[Byte]): QItem
}

Journal.scala

キュー操作を記録しているジャーナルのクラスです。

ジャーナルに記録されたメッセージを再処理するためのreplayメソッドは以下の通り。
readJournalEntryでファイルの中からメッセージを1件Tuple(JournalItem,Int)で読み込んで、2つ目の引数で受け取ってる関数fを適用してます。

def replay(name: String)(f: JournalItem => Unit): Unit = {
  size = 0
  var lastUpdate = 0L
  val TEN_MB = 10L * 1024 * 1024
  try {
    val in = new FileInputStream(queueFile).getChannel
    try {
      replayer = Some(in)
      var done = false
      do {
        readJournalEntry(in) match {
          case (JournalItem.EndOfFile, _) => done = true
          case (x, itemsize) =>
            size += itemsize
            f(x)
            if (size / TEN_MB > lastUpdate) {
              lastUpdate = size / TEN_MB
              log.info("Continuing to read '%s' journal; %d MB so far...", name, lastUpdate * 10)
            }
        }
      } while (!done)
    } catch {
      case e: BrokenItemException =>
        log.error(e, "Exception replaying journal for '%s'", name)
        log.error("DATA MAY HAVE BEEN LOST! Truncated entry will be deleted.")
        truncateJournal(e.lastValidPosition)
    }
  } catch {
    case e: FileNotFoundException =>
      log.info("No transaction journal for '%s'; starting with empty queue.", name)
    case e: IOException =>
      log.error(e, "Exception replaying journal for '%s'", name)
      log.error("DATA MAY HAVE BEEN LOST!")
      // this can happen if the server hardware died abruptly in the middle
      // of writing a journal. not awesome but we should recover.
  }
  replayer = None
}

このメソッドは、PersistentQueueの初期化時(replayJournalメソッド)に以下のようにPartialFunctionを引数として呼ばれていて、ジャーナルファイルからキューが再現されていることが分かります。

journal.replay(name) {
  case JournalItem.Add(item) =>
    _add(item)
    // when processing the journal, this has to happen after:
    if (!journal.inReadBehind && queueSize >= maxMemorySize()) {
      log.info("Dropping to read-behind for queue '%s' (%d bytes)", name, queueSize)
      journal.startReadBehind
    }
  case JournalItem.Remove => _remove(false)
  case JournalItem.RemoveTentative => _remove(true)
  case JournalItem.SavedXid(xid) => xidCounter = xid
  case JournalItem.Unremove(xid) => _unremove(xid)
  case JournalItem.ConfirmRemove(xid) => openTransactions.removeKey(xid)
  case x => log.error("Unexpected item in journal: %s", x)
}

Counter.scala

ただのAtomicLongのラッパークラスです。
引数なしapplyがgetterってのが気に入りませんが、そんだけです。
KestrelとかPersistentQueue、QueueCollectionあたりで使ってますが、PersistentQueueあたりでは@volatileなvarだったり、synchronizedなgetterを定義していたりします。同期処理の妙がいろいろとあるようですがわかりません。。。

class Counter {
  private val value = new AtomicLong(0)

  def apply() = value.get
  def set(n: Long) = value.set(n)
  def incr() = value.addAndGet(1)
  def incr(n: Long) = value.addAndGet(n)
  def decr() = value.addAndGet(-1)
  def decr(n: Long) = value.addAndGet(-n)
  override def toString = value.get.toString
}

BrokenItemException.scala

ジャーナルを処理する際の例外クラス。Journal.scalaで使われてて、そのままthrowされたりもします。
こういうException書くときはcase classでやっちゃえばいいんですねー。

case class BrokenItemException(lastValidPosition: Long, cause: Throwable) extends IOException(cause)

memcache/*

Apache Minaでmemcacheプロトコルを実装するためのCodecです。

tools/*

QDumperはジャーナルファイルをパースして標準出力にダンプするクラスです。
Utilオブジェクトはbyte配列のサイズを見やすく表示するためのメソッドが定義されてます。


以上。
コード量はたいしたことないから一度みんな読んでみるといいと思います。

*1:すみません、ドキュメントありました。 http://github.com/robey/kestrel/blob/master/docs/guide.md