2011-03-15

Async Scala

In my last blog post http://naedyr.blogspot.com/2011/03/atomic-scala.html I described a scala wrapper for java.util.concurrent.AtomicReference, which made shared mutable state easy to use from multiple threads.

The next step is actually creating separate threads.

The old java.lang.Thread class is how most people would do multiple threads in java; but this meant managing starting and stopping the thread and managing your own thread pools.
The java.util.concurrent.Executors class and related libraries was a big step forward. But maybe, just like AtomicReference, it's use wasn't exactly obvious. My favorite part of it was the java.util.concurrent.Future class; which allows a return value from a thread, as well as exceptions to be thrown from the thread; and caught by the calling thread.

What I really wanted though, was to be able to run any block of code in another thread, and not have to think about the specifics. So I put together this wrapper class in scala:

case class Result[T](private val future: java.util.concurrent.Future[T]) {
  private lazy val value = future.get()
  def await(): T = value
}
object Async {
  val threadPool = Executors.newCachedThreadPool()
  def async[T](func: => T): Result[T] = {
    Result(Async.threadPool.submit(new Callable[T]() {
      def call(): T = func
    }))
  }
  def await[T](result: Result[T]): T = result.await()
}

Full source available at : http://code.google.com/p/naedyrscala/

Here's some example usage, with a couple of variations on syntax:
val sum = async { (1 to 100000000).reduceLeft(_ + _) ; println("finished1")}
val sum2 = async { (1 to 100000000).reduceLeft(_ + _); println("finished2") }
val sum3 = async { (1 to 100000000).reduceLeft(_ + _); println("finished3") }
println("do something else")
println(sum.await)
println(await { sum })
println(await(sum3))

With this wrapper, the entire usage boils down to the async and await functions.

  • Pass any block of code to async, and it will run in another thread.
  • To get the result of that thread, call await.

I was inspired by the C# version of async/await, but this scala version is far more general and simpler to use.

If you use await directly on an async call, the effect is to run the block in another thread, but then wait on that thread in the current thread, which is pretty pointless:
val sum = await{async {
      (1 to 100000000).reduceLeft(_ + _)
     println("finished3")
    }}
println("do something else")
println(sum)
You can have other threads await on each other. I don't think it's possible to create a deadlock situation with these semantics.
val sum = async { (1 to 100000000).reduceLeft(_ + _) }
val result = async {
      val x = await(sum)
      println("callback " + x)
      x
}
println(await(result)+await(sum))
If you run the below code several times, you'll see differing numbers and orderings of the "+/- amount comments", but the final amount, after the awaits should always be the same.
case class Account(private val initialAmount: Int) {
  val balance = Atom(initialAmount)
  def withdraw(amount: Int) = {
    balance.set { x => println("-" + amount); x - amount }
  }
  def deposit(amount: Int) = {
    balance.set { x => println("+" + amount); x + amount }
  }
}
val account = Account(500)
   val results1 = async {
   account.deposit(100)
   account.withdraw(100)
}
val results2 = async {
  account.withdraw(10)
  account.withdraw(10)
}
val results3 = async { account.withdraw(10) }
await(results1)
await(results2)
await(results3)
assertEquals((500 + 100 - 100 - 10 - 10 - 10), account.balance.get)

When combined, async/await and atom provide some very easy to use libraries for doing shared state multi threaded programming; with a minimum of effort.

Both the atom and async/await are made possible due to closures. I can't wait to Java 8.

2011-03-11

Atomic Scala

A while ago I had to write a test tool that needed to do performance tests.
It needed to be able to open up a whole bunch of connections, send requests, then gather statistics on the responses.
I used Java and the java.util.concurrent libraries, and they worked great. I didn't have to worry about synchronized blocks or waiting, no deadlocks or starvation.

The thing is, I haven't even heard of anyone using these libraries since then.

Most of the Java code I see still does concurrency the way it's been done since the old days, and completely ignores the newer Atomic*, Executors, ThreadPools and Futures.

Maybe people aren't aware of these libraries?
Maybe the syntax seems to complicated?

I wondered how much nicer these libraries could be if they were used from scala; so I wrote a few little wrapper classes that gave the java.util.concurrent libraries a more scala styled interface.

I starting out with AtomicReference, which is a way of having shared mutable state between threads. The beauty of this class, is that it uses an optimistic sychronisation mechanism; so that reads don't block at all, and writes trigger a retry in the case of a collision. I think it uses a kind of Software Transactional Memory underneath.
It has a different performance profile to lock based sychronisation detailed in http://www.ibm.com/developerworks/java/library/j-jtp11234/.

Here's my Atom wrapper, full source code available at http://code.google.com/p/naedyrscala/

case class Atom[T](private val value: T) {
  private val ref = new AtomicReference[T]()
  ref.set(value)
  def get(): T = ref.get()
  def set(f: T => T): T = {
    val previous = get()
    val update = f(previous)
    if (ref.compareAndSet(previous, update)) {
      update
    } else {
      set(f)
    }
  }
  def set(f: => T): T = set(previous => f)
}

get() just gets the current value (and is NOT blocked at all by other threads).
There are two versions of set, the first sets a new value, ignoring the previous value. It takes a block of code, which it will rerun if there are any collisions, (ie if another thread is simultaneously running set on the same atom.
The second version of set takes a block which takes the previous value, allowing you to for example increment the current value.
Both of the set methods return the value that you have set, which is different from calling get after the set; as another thread may have already changed the value by then. This allows you to have a unique key generator for instance, by incrementing the current value and using the returned result as your unique key.

Here's some tests to show the usage, with a couple of variations on syntax:
val myAtom = Atom(5)
myAtom.set(5)
assertEquals(5, myAtom.get)
myAtom set 5
assertEquals(5, myAtom.get)
myAtom.set(_ + 3)
assertEquals(8, myAtom.get)
myAtom set { x =>
  x + 3
}
assertEquals(8, myAtom.get)

One catch to the atomic reference, is to make sure that any code that is used to generate a new value is inside the block passed to the set method.
In this example, the new value is not being computed inside the set method, which means that the value doesn't take into account any concurrent changes to the myAtom value.
val myAtom = Atom(5)
// intended to be 1
val newValue = if (myAtom.get == 5) 1 else -1
// newValue has already been set, therefore can't be retried
// image if some other thread then set the value
myAtom.set(8)
// this set is setting the 1 we got earlier, even though the old value isn't 5 anymore
myAtom.set(newValue)
assertEquals(1, myAtom.get) 
This example shows how the above should have been done, it now responds to the (concurrent) setting of myAtom2 to '8'
val myAtom2 = Atom(5)
myAtom2.set(8)
// now this set is taking in to account the 8 set earlier, and we get -1 as expected
myAtom2.set(x => if (x == 5) 1 else -1)
assertEquals(-1, myAtom2.get)

Another limitation is that the set methods don't nest.
val atom1 = Atom(1)
val atom2 = Atom(2)
// the transactions don't nest properly,
atom1.set { x =>
  val value = atom2.set { y =>
    x + y
  }
  // if there is a collision with atom1 here, 
  // atom2 will have an inconsistent value, until atom1's set retry succeeds
  value * 2
}
assertEquals(6, atom1.get)
assertEquals(3, atom2.get)

I originally played around using the apply method, but the syntax became a bit too minimalistic, ie
val myAtom = Atom(5)
myAtom(_+1)
assertEquals(6, myAtom())
Another variation which I liked, but was also a bit too confusing, was using a member variable, ie
val myAtom = Atom(5)
myAtom.value = _ + 1
assertEquals(6, myAtom.value)

Let's have a look at how this stuff is normally used from Java.
AtomicReference atom = new AtomicReference(1);
while (true) {
  Integer previous = atom.get();
  Integer update = previous + 2;
  if (atom.compareAndSet(previous, update)) {
    break;
  }
}
assertEquals(3, atom.get().intValue());
VS
val atom = Atom(1)
atom.set(_+2)
assertEquals(3, atom.get)
No wonder no one uses this AtomicReference in Java ! Just setting a value correctly is a real pain.