Asynchronous boundaries in Monix Task

Hiroki Fujino
Level Up Coding
Published in
5 min readDec 28, 2020

--

Recently, I have been playing on Monix. In this article, I will focus on Monix Task, especially how a thread switches with Monix Task’s methods.

Monix Task

Monix Task is implemented in Monix which is an asynchronous and reactive programming Scala and Scala.js library.

Task is a data type for controlling possibly lazy & asynchronous computations, useful for controlling side-effects, avoiding nondeterminism and callback-hell.

Monix also provides streaming data type and purely functional utilities for managing concurrency such as monix-reactive, monix-catnap, and monix-tail.

The current latest major version is 3.3.0, released in November 2020.

Asynchronous Boundaries

An asynchronous boundary is a context switch between threads. To achieve efficient asynchronous programming, we have to introduce an asynchronous boundary with a proper thread pool. For example, the asynchronous boundary should be introduced with an unbounded thread pool for a blocking execution, as Daniel’s document and Monix’s document said.

Monix Task has useful methods to introduce an asynchronous boundary.

Asynchronous boundaries in Monix Task

Now, let’s have a look at how to introduce an asynchronous boundary in Monix Task.

Scheduler

When Task is running, Scheduler is needed. The relationship is similar to the one between ExecutionContext and Future. But, ExecutionContext is required not only when Future is running, but also when Future is defined and almost all methods of Future is called.

The Scheduler is a subclass of ExecutionContext. So, The Scheduler has a thread pool such as CachedThreadPool, ForkJoinPool, etc. he Scheduler companion object provides useful methods to create a Scheduler object with each thread pool. The code below is an example of defining Scheduler object by using these methods. You can also specify ExecutionModel and ReportFailure like this link.

Although I didn’t explain Scheduler in detail, Scheduler is also a replacement for Java’s ScheduledExecutorService.

The Monix Scheduler is inspired by ReactiveX, being an enhanced Scala ExecutionContext and also a replacement for Java’s ScheduledExecutorService, but also for Javascript’s setTimeout.

Task shift

This is the base method to switch thread pool, that is to say, introduce an asynchronous boundary.

Without argument, the computation is shifted to default thread pool. With a Scheduler in argument, the computation is shifted to the thread pool the Scheduler has.

Most of the asynchronous methods of Task call the shift method in the inside of the method.

Thread shift with asynchronous methods of Monix

Let’s have a look at the thread shifting of each asynchronous method of Monix.

The code below is a simple program that executes three tasks in a Task context. In production, the error handling should be done instead of calling the runSyncUnsafe method.

As you can see, these three tasks are executed on the main thread because any asynchronous boundary isn’t introduced. On the other hand, Future always introduces an asynchronous boundary with a thread pool. It leads to the unnecessary overhead of context switch.

  • executeAsync

The executeAsync method introduces an asynchronous boundary with a default thread pool the runSyncUnsafe method takes before the execution. As you can see in the code below, the shift method is called in this method.

final def executeAsync: Task[A] =
Task.shift.flatMap(_ => this)

Moreover, calling executeAsync is the same as calling evalAsync as the code below.

Task.evalAsync(a) <-> Task.eval(a).executeAsync

The code below is using the executeAsync method. The heavyTask method is running on the default thread pool, not the main thread.

  • asyncBoundary

The asyncBoundary method introduces an asynchronous boundary after the execution. As you can see in the code below, the asyncBoundary method can specify a Scheduler.

final def asyncBoundary: Task[A] =
flatMap(a => Task.shift.map(_ => a))
final def asyncBoundary(s: Scheduler): Task[A] =
flatMap(a => Task.shift(s).map(_ => a))

The code below is using the asyncBoundary method. The heavyTask function is running on the thread pool the asyncBoundary method takes, not the main thread because the asyncBoundary method is called before this method.

As you can see these codes with the executeAsync and asyncBoundary methods above, after an asynchronous boundary is introduced, subsequent tasks are running on the same thread pool. It could lead to unintended usage of thread pools. For example, in the code above, the lightTask function should run on the default thread pool, not the one for blocking. In this case, the thread pool should be switched back to the default one.

In the code below, by calling the shift and asyncBoundary methods, the thread pool is switched back to the default one.

  • executeOn

The executeOn method can specify the thread pool on where the computation runs if the forceAsync parameter is set to true.

final def executeOn(s: Scheduler, forceAsync: Boolean = true): Task[A] =
TaskExecuteOn(this, s, forceAsync)

Since version 3.0, the thread pool has become to be shifted back to default one the runSyncUnsafe method takes after the executeOn method is called. It prevents us from leading to unintended usage of thread pools.

The code below is using the executeOn method. The heavyTask function is running on the thread pool for blocking, then lightTask function is running on the default thread pool after this method.

As a little deeper insight into the behavior with the forceAsync parameter method, I wrote the code below.

  1. Task.eval with executeOn and forceAsync setting to true

The task is running on the thread pool for blocking the executeOn method specifies. After the task, the thread pool is swift back to the default thread pool.

2. Task.eval with executeOn and forceAsync setting to false

The task and the subsequent task are running on the main thread.

3. Task.evalAsync with executeOn and forceAsync setting to true

The task is running on the thread pool for blocking the executeOn method specifies. After the task, the thread pool is swift back to the default thread pool.

4. Task.evalAsync with executeOn and forceAsync setting to false

The task and the subsequent task are running on the thread pool for blocking the executeOn method takes.

  • parMap2

As the final method, let’s see the thread shifting of the parMap2 method. The parMap2 method is recommended to execute two tasks in parallel. This method is also suitable for error handling and cancellation.

In case one of the tasks fails, then all other tasks get cancelled and the final result will be a failure.

In the code below, the parMap2 method takes two tasks. Each task runs in parallel, and the internal task is running on the specified thread pool.

Conclusion

Monix Task has useful asynchronous methods, but each behavior is different. So we have to use these methods separately.

Thank you for reading!

--

--