Task support
Parallel collections are modular in the way operations are scheduled. Each parallel collection is parametrized with a task support object which is responsible for scheduling and load-balancing tasks to processors.
The task support object internally keeps a reference to a thread pool implementation and decides how and when tasks are split into smaller tasks. To learn more about the internals of how exactly this is done, see the tech report [1].
There are currently a few task support implementations available for parallel
collections. The ForkJoinTaskSupport
uses a fork-join pool internally and is
used by default on JVM 1.6 or greater. The less efficient
ThreadPoolTaskSupport
is a fallback for JVM 1.5 and JVMs that do not support
the fork join pools. The ExecutionContextTaskSupport
uses the default
execution context implementation found in scala.concurrent
, and it reuses
the thread pool used in scala.concurrent
(this is either a fork join pool or
a thread pool executor, depending on the JVM version). The execution context
task support is set to each parallel collection by default, so parallel
collections reuse the same fork-join pool as the future API.
Here is a way to change the task support of a parallel collection:
scala> import scala.collection.parallel._
import scala.collection.parallel._
scala> val pc = mutable.ParArray(1, 2, 3)
pc: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3)
scala> val forkJoinPool = new java.util.concurrent.ForkJoinPool(2)
forkJoinPool: java.util.concurrent.ForkJoinPool = java.util.concurrent.ForkJoinPool@6436e181[Running, parallelism = 2, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
scala> pc.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
pc.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ForkJoinTaskSupport@4a5d484a
scala> pc map { _ + 1 }
res0: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)
The above sets the parallel collection to use a fork-join pool with parallelism level 2. To set the parallel collection to use a thread pool executor:
scala> pc.tasksupport = new ThreadPoolTaskSupport()
pc.tasksupport: scala.collection.parallel.TaskSupport = scala.collection.parallel.ThreadPoolTaskSupport@1d914a39
scala> pc map { _ + 1 }
res1: scala.collection.parallel.mutable.ParArray[Int] = ParArray(2, 3, 4)
scala> forkJoinPool.shutdown()
Note that if you are making your own ForkJoinPool
instance you should call
ForkJoinPool.shutdown()
when you no longer need the thread pool. If you do
not call ForkJoinPool.shutdown()
and continue to create new instances of
ForkJoinPool the JVM may eventually run out of available threads and throw a
java.lang.OutOfMemoryError
.
When a parallel collection is serialized, the task support field is omitted from serialization. When deserializing a parallel collection, the task support field is set to the default value– the execution context task support.
To implement a custom task support, extend the TaskSupport
trait and
implement the following methods:
def execute[R, Tp](task: Task[R, Tp]): () => R
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R
def parallelismLevel: Int
The execute
method schedules a task asynchronously and returns a future to
wait on the result of the computation. The executeAndWait
method does the
same, but only returns when the task is completed. The parallelismLevel
simply returns the targeted number of cores that the task support uses to
schedule tasks.