Scala 2.8のActorで使われるスレッド数
scala.actors.scheduler.ThreadPoolConfigの中に定義されていますが、CPUコア数×2スレッドです。
val corePoolSize = getIntegerProp("actors.corePoolSize") match { case Some(i) if i > 0 => i case _ => { val byCores = rt.availableProcessors() * 2 if (byCores > minNumThreads) byCores else minNumThreads } } val maxPoolSize = { val preMaxSize = getIntegerProp("actors.maxPoolSize") getOrElse 256 if (preMaxSize >= corePoolSize) preMaxSize else corePoolSize }
厳密には、corePoolSizeがCPUコア数×2で、maxPoolSizeが256です。それぞれ、VM引数のactors.corePoolSizeとactors.maxPoolSizeで変更が可能です。が、後述するとおりスレッドプールの挙動的には、よっぽどのことが無い限りcorePoolSizeを超えるスレッドは生成されません。
というわけで、IO待ち(通信待ちとか)をするActorをたくさん動かしたい場合は、actors.corePoolSizeをでかくしたほうが効果的です。
内部的には、java.util.concurrent.ThreadPoolExecutorか、scala.actors.schedule.ForkJoinSchedulerが使われており、ThreadPoolConfigにて定義されているcorePoolSizeとmaxPoolSizeの値に基づいてスレッドプールが作成されます。
このExecutor/Schedulerはscala.actors.Reactorに定義されています。
val sched = if (!ThreadPoolConfig.useForkJoin) { // default is non-daemon val workQueue = new LinkedBlockingQueue[Runnable] ExecutorScheduler( new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, 60000L, TimeUnit.MILLISECONDS, workQueue, new ThreadPoolExecutor.CallerRunsPolicy)) } else { // default is non-daemon, non-fair val s = new ForkJoinScheduler(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, false, false) s.start() s }
このReactorはReactor traitのcompanion objectなので、Actorが使うスレッドプールは、すべてのActorを通じて1つだけになります。
ThreadPoolExecutorを使うのかForkJoinScheudlerを使うのかは、ThreadPoolConfig.useForkJoin で定義されていて、以下のような条件になります。
・VM引数 actors.enableForkJoin がfalseならThreadPoolExecutor
・VM引数 actors.enableForkJoin がtrueならForkJoinPool
・VM引数が無くて Sun/Apple Java 1.6以上ならForkJoinPool
・VM引数が無くて それ以外のJVMならThreadPoolExecutor
!propIsSetTo("actors.enableForkJoin", "false") && (propIsSetTo("actors.enableForkJoin", "true") || { Debug.info(this+": java.version = "+javaVersion) Debug.info(this+": java.vm.vendor = "+javaVmVendor) // on IBM J9 1.6 do not use ForkJoinPool // XXX this all needs to go into Properties. isJavaAtLeast("1.6") && ((javaVmVendor contains "Sun") || (javaVmVendor contains "Apple")) })
ThreadPoolExecutorのほうは、コンストラクタで指定されているworkQueueのサイズに制限がないため、実行待ちのタスクの数(=Actorの数)がInteger.MAX _VALUEを超えるまでは全部キューに入って実行待ちとなります。つまり、corePoolSizeを超えるタスクを実行しようとしてもキューに入るだけで、このキューがあふれるほどのタスクを投入して初めてスレッドプールが拡張されます。
ForkJoinSchedulerのほうは、まだ見れてませんが、どうやら同じ挙動となるようです。
ソースコードはJavaで書かれていて、↓から見れます。
http://lampsvn.epfl.ch/svn-repos/scala/scala/tags/R_2_8_0_final/src/forkjoin/scala/concurrent/forkjoin/
なんかActorモデルの並列処理って、スレッドを明示的にコントロールできないんで、Javaと比べて使いにくい気がします。退化したようなイメージ。
気軽に使えるのかもしれないけど、何が良いのか良くわからない。