Asynchronous boundaries in Monix Task
Recently, I have been playing on Monix. In this article, I will focus on Monix Task, especially how a thread switches with Monix Task’s methods.
Monix Task
Monix Task is implemented in Monix which is an asynchronous and reactive programming Scala and Scala.js library.
Task is a data type for controlling possibly lazy & asynchronous computations, useful for controlling side-effects, avoiding nondeterminism and callback-hell.
Monix also provides streaming data type and purely functional utilities for managing concurrency such as monix-reactive, monix-catnap, and monix-tail.
The current latest major version is 3.3.0, released in November 2020.
Asynchronous Boundaries
An asynchronous boundary is a context switch between threads. To achieve efficient asynchronous programming, we have to introduce an asynchronous boundary with a proper thread pool. For example, the asynchronous boundary should be introduced with an unbounded thread pool for a blocking execution, as Daniel’s document and Monix’s document said.
Monix Task has useful methods to introduce an asynchronous boundary.
Asynchronous boundaries in Monix Task
Now, let’s have a look at how to introduce an asynchronous boundary in Monix Task.
Scheduler
When Task is running, Scheduler is needed. The relationship is similar to the one between ExecutionContext and Future. But, ExecutionContext is required not only when Future is running, but also when Future is defined and almost all methods of Future is called.
The Scheduler is a subclass of ExecutionContext. So, The Scheduler has a thread pool such as CachedThreadPool, ForkJoinPool, etc. he Scheduler companion object provides useful methods to create a Scheduler object with each thread pool. The code below is an example of defining Scheduler object by using these methods. You can also specify ExecutionModel and ReportFailure like this link.
Although I didn’t explain Scheduler in detail, Scheduler is also a replacement for Java’s ScheduledExecutorService.
The Monix
Scheduler
is inspired by ReactiveX, being an enhanced Scala ExecutionContext and also a replacement for Java’s ScheduledExecutorService, but also for Javascript’s setTimeout.
Task shift
This is the base method to switch thread pool, that is to say, introduce an asynchronous boundary.
Without argument, the computation is shifted to default thread pool. With a Scheduler in argument, the computation is shifted to the thread pool the Scheduler has.
Most of the asynchronous methods of Task call the shift
method in the inside of the method.
Thread shift with asynchronous methods of Monix
Let’s have a look at the thread shifting of each asynchronous method of Monix.
The code below is a simple program that executes three tasks in a Task context. In production, the error handling should be done instead of calling the runSyncUnsafe
method.
As you can see, these three tasks are executed on the main thread because any asynchronous boundary isn’t introduced. On the other hand, Future always introduces an asynchronous boundary with a thread pool. It leads to the unnecessary overhead of context switch.
- executeAsync
The executeAsync
method introduces an asynchronous boundary with a default thread pool the runSyncUnsafe
method takes before the execution. As you can see in the code below, the shift
method is called in this method.
final def executeAsync: Task[A] =
Task.shift.flatMap(_ => this)
Moreover, calling executeAsync
is the same as calling evalAsync
as the code below.
Task.evalAsync(a) <-> Task.eval(a).executeAsync
The code below is using the executeAsync
method. The heavyTask
method is running on the default thread pool, not the main thread.
- asyncBoundary
The asyncBoundary
method introduces an asynchronous boundary after the execution. As you can see in the code below, the asyncBoundary
method can specify a Scheduler.
final def asyncBoundary: Task[A] =
flatMap(a => Task.shift.map(_ => a))final def asyncBoundary(s: Scheduler): Task[A] =
flatMap(a => Task.shift(s).map(_ => a))
The code below is using the asyncBoundary
method. The heavyTask
function is running on the thread pool the asyncBoundary
method takes, not the main thread because the asyncBoundary
method is called before this method.
As you can see these codes with the executeAsync
and asyncBoundary
methods above, after an asynchronous boundary is introduced, subsequent tasks are running on the same thread pool. It could lead to unintended usage of thread pools. For example, in the code above, the lightTask
function should run on the default thread pool, not the one for blocking. In this case, the thread pool should be switched back to the default one.
In the code below, by calling the shift
and asyncBoundary
methods, the thread pool is switched back to the default one.
- executeOn
The executeOn
method can specify the thread pool on where the computation runs if the forceAsync
parameter is set to true.
final def executeOn(s: Scheduler, forceAsync: Boolean = true): Task[A] =
TaskExecuteOn(this, s, forceAsync)
Since version 3.0, the thread pool has become to be shifted back to default one the runSyncUnsafe
method takes after the executeOn
method is called. It prevents us from leading to unintended usage of thread pools.
The code below is using the executeOn
method. The heavyTask
function is running on the thread pool for blocking, then lightTask
function is running on the default thread pool after this method.
As a little deeper insight into the behavior with the forceAsync
parameter method, I wrote the code below.
- Task.eval with executeOn and forceAsync setting to true
The task is running on the thread pool for blocking the executeOn
method specifies. After the task, the thread pool is swift back to the default thread pool.
2. Task.eval with executeOn and forceAsync setting to false
The task and the subsequent task are running on the main thread.
3. Task.evalAsync with executeOn and forceAsync setting to true
The task is running on the thread pool for blocking the executeOn
method specifies. After the task, the thread pool is swift back to the default thread pool.
4. Task.evalAsync with executeOn and forceAsync setting to false
The task and the subsequent task are running on the thread pool for blocking the executeOn
method takes.
- parMap2
As the final method, let’s see the thread shifting of the parMap2
method. The parMap2
method is recommended to execute two tasks in parallel. This method is also suitable for error handling and cancellation.
In case one of the tasks fails, then all other tasks get cancelled and the final result will be a failure.
In the code below, the parMap2
method takes two tasks. Each task runs in parallel, and the internal task is running on the specified thread pool.
Conclusion
Monix Task has useful asynchronous methods, but each behavior is different. So we have to use these methods separately.
Thank you for reading!