Scala notes – Futures – 2 (Promises)
In the last post, we saw how to extract values from the Future
upon onComplete
and their counterparts – onSuccess
and onFailure
. We also saw how to use Await.result
in Testcases to block and get the value from Future. In this post, we’ll discuss briefly about the relationship between a Promise
and a Future
.
Promise
The concepts Promise
and a Future
go hand in hand. A scala.concurrent.Promise
is the one which sets a value for the Future
. In other words, the Promise
is the brain behind executing the computation asynchronously and Future
is just a handle for reading the result when it becomes available. Crudely put, the Promise
is the setter and the Future
is the getter.
Most often, we won’t need to explicitly create a Promise. However, we are required to understand what a Promise
is, in order to truly understand how a Future
works.
Let’s use the following examples to understand how to create a Promise
.
1. Completing a Promise
In the following piece of code, we will see how a value is set in a promise and how it is read on the other side.
We
1. create a Promise
2. complete
the Promise
by setting a successful value
3. then return the read side of the Promise – the Future
back to the caller by using the promise.future
There is no time consuming process happening behind the scenes. The value is set to the Promise immediately and therefore the value is immediately available via the Future
.
Code
class PromiseInternals { ... def aCompletedPromiseUsingSuccess(num:Int): Future[Int] = { val promise=Promise[Int]() promise.success(num) promise.future } ... ...
Testcase
When we run the Testcase code, the onComplete
callback gets called immediately after the promise.success(100)
is called.
class PromiseInternalsTest extends FunSpec with Matchers { describe("A Future") { it("gives out the correct value when a Promise is completed") { val promiseInternals = new PromiseInternals val aCompletedPromise=promiseInternals.aCompletedPromiseUsingSuccess(100) assertValue(aCompletedPromise, 100) } ... ... def assertValueUsingOnComplete(future: Future[Int], expectedValue: Int): Unit = { future.onComplete { case Success(result) => { println (s"Result is $result and expectedValue is $expectedValue") result shouldBe expectedValue } case Failure (msg) => fail(msg) } }
The promise.success
is just a shortcut for using promise.complete
which accepts a Try[T]
as an argument. So, we could have actually written the above function as :
def aCompletedPromiseUsingComplete(num:Int): Future[Int] = { val promise=Promise[Int]() promise.complete(Success(num)) promise.future }
Alternatively, if we would like to indicate a failure in computation, we could either use a promise.complete(Failure(throwable))
or
def aCompletedPromiseUsingFailure(num:Int): Future[Int] = { val promise=Promise[Int]() promise.failure(new RuntimeException("Evil Exception")) promise.future }
Let’s summarize the above in a picture :
2. Running a block asynchronously
Now that we saw how to complete a Promise by setting a successful value or an exception, we’ll see how to execute a block of code asynchronously.
In the following Testcase, we pass a block of code to the someExternalDelayedCalculation
to be executed asynchronously.
Testcase
Let’s look at the testcase first.
- We pass a block as argument. The block of code simply sleeps for 2 seconds and then returns a 100.
- Assert the value after 3 seconds.
Simple enough.
it("gives out the correct value when an asynchronous block is submitted and is completed through a Promise") { val promiseInternals = new PromiseInternals val longCalculationFuture = promiseInternals.someExternalDelayedCalculation{()=> Thread.sleep(2000) 100 } println (s"We have submitted a block to be executed asynchronously ${longCalculationFuture.isCompleted}") //false at this point assertValue(longCalculationFuture, 100) } def assertValue(future: Future[Int], expectedValue: Int): Unit = { val resultVal=Await.result(future, 3000 seconds) resultVal shouldBe expectedValue }
Code
The implementation of the someExternalDelayedCalculation
is interesting :
We
1. create a FixedThreadPool
to execute our asynchronous code.
2. create a Promise
3. create a Runnable
and wrap the block to be run asynchronously in the run
method
4. close the promise and complete the promise using the result of the run
5. execute the Runnable
in the somePool
threadpool.
6. return the promise.future
from which the caller can read the value.
val somePool=Executors.newFixedThreadPool(2) def someExternalDelayedCalculation(f:()=>Int): Future[Int] = { val promise=Promise[Int]() val thisIsWhereWeCallSomeExternalComputation = new Runnable { override def run(): Unit ={ promise.complete{ try(Success(f())) catch { case NonFatal (msg)=> Failure(msg) } } } } somePool.execute(thisIsWhereWeCallSomeExternalComputation) promise.future }
That’s it !!
3. How is the Future.apply()
actually implemented?
Well, I cheated. The code in bullet 2 is actually stolen from the actual implementation of the Future.apply
itself.
Remember in the previous post, we saw that when a block of code is passed into the Future
‘s apply function, it gets executed asynchronously.
Now, compare the code above in someExternalDelayedCalculation
with the actual implementation of Future.apply
and the Runnable
that it wraps.
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = { val runnable = new PromiseCompletingRunnable(body) executor.prepare.execute(runnable) runnable.promise.future } class PromiseCompletingRunnable[T](body: => T) extends Runnable { val promise = new Promise.DefaultPromise[T]() override def run() = { promise complete { try Success(body) catch { case NonFatal(e) => Failure(e) } } } }
To repeat the same steps as above, the apply
function
- holds the
ThreadPool
that we provide as the implicitExecutionContext
- creates a
Promise
by creating aPromiseCompletingRunnable
that is aRunnable
- wraps the block to be run asynchronously in the
run
method - closes the promise and completes the promise using the result of the
run
- executes the
Runnable
using theExecutionContext
- returns the
promise.future
from which the caller can read the value.
4. Once written, twice error
Once the promise gets completed either with a Success
or a Failure
, all we could do after that is to extract the value from its Future
. Also, the onComplete
callback of the Future gets called. The value wrapped inside the Future of the Promise is set in stone and cannot be changed.
If we attempt to set a new value by completing an already completed Promise, an IllegalStateException
is thrown.
Code
Let’s look at this using a snippet. In the following code, we create a Promise and complete it with a value of 100. We then attempt to complete it with a failure.
def alreadyCompletedPromise(): Future[Int] = { val promise = Promise[Int]() promise.success(100) //completed promise.failure(new RuntimeException("Will never be set because an IllegalStateException will be thrown beforehand")) promise.future }
Testcase
The testcase just asserts that the IllegalStateException
gets thrown when an attempt to complete the Promise with a Failure.
it("should throw an error if a Promise is attempted to be completed more than once") { val promiseInternals = new PromiseInternals intercept[IllegalStateException] { promiseInternals.alreadyCompletedPromise() } }
Code
The complete code backing this blog is available in github
Reference: | Scala notes – Futures – 2 (Promises) from our JCG partner Arun Manivannan at the Rerun.me blog. |