Atomic Scala

A while ago I had to write a test tool that needed to do performance tests.
It needed to be able to open up a whole bunch of connections, send requests, then gather statistics on the responses.
I used Java and the java.util.concurrent libraries, and they worked great. I didn't have to worry about synchronized blocks or waiting, no deadlocks or starvation.

The thing is, I haven't even heard of anyone using these libraries since then.

Most of the Java code I see still does concurrency the way it's been done since the old days, and completely ignores the newer Atomic*, Executors, ThreadPools and Futures.

Maybe people aren't aware of these libraries?
Maybe the syntax seems to complicated?

I wondered how much nicer these libraries could be if they were used from scala; so I wrote a few little wrapper classes that gave the java.util.concurrent libraries a more scala styled interface.

I starting out with AtomicReference, which is a way of having shared mutable state between threads. The beauty of this class, is that it uses an optimistic sychronisation mechanism; so that reads don't block at all, and writes trigger a retry in the case of a collision. I think it uses a kind of Software Transactional Memory underneath.
It has a different performance profile to lock based sychronisation detailed in http://www.ibm.com/developerworks/java/library/j-jtp11234/.

Here's my Atom wrapper, full source code available at http://code.google.com/p/naedyrscala/

case class Atom[T](private val value: T) {
  private val ref = new AtomicReference[T]()
  def get(): T = ref.get()
  def set(f: T => T): T = {
    val previous = get()
    val update = f(previous)
    if (ref.compareAndSet(previous, update)) {
    } else {
  def set(f: => T): T = set(previous => f)

get() just gets the current value (and is NOT blocked at all by other threads).
There are two versions of set, the first sets a new value, ignoring the previous value. It takes a block of code, which it will rerun if there are any collisions, (ie if another thread is simultaneously running set on the same atom.
The second version of set takes a block which takes the previous value, allowing you to for example increment the current value.
Both of the set methods return the value that you have set, which is different from calling get after the set; as another thread may have already changed the value by then. This allows you to have a unique key generator for instance, by incrementing the current value and using the returned result as your unique key.

Here's some tests to show the usage, with a couple of variations on syntax:
val myAtom = Atom(5)
assertEquals(5, myAtom.get)
myAtom set 5
assertEquals(5, myAtom.get)
myAtom.set(_ + 3)
assertEquals(8, myAtom.get)
myAtom set { x =>
  x + 3
assertEquals(8, myAtom.get)

One catch to the atomic reference, is to make sure that any code that is used to generate a new value is inside the block passed to the set method.
In this example, the new value is not being computed inside the set method, which means that the value doesn't take into account any concurrent changes to the myAtom value.
val myAtom = Atom(5)
// intended to be 1
val newValue = if (myAtom.get == 5) 1 else -1
// newValue has already been set, therefore can't be retried
// image if some other thread then set the value
// this set is setting the 1 we got earlier, even though the old value isn't 5 anymore
assertEquals(1, myAtom.get) 
This example shows how the above should have been done, it now responds to the (concurrent) setting of myAtom2 to '8'
val myAtom2 = Atom(5)
// now this set is taking in to account the 8 set earlier, and we get -1 as expected
myAtom2.set(x => if (x == 5) 1 else -1)
assertEquals(-1, myAtom2.get)

Another limitation is that the set methods don't nest.
val atom1 = Atom(1)
val atom2 = Atom(2)
// the transactions don't nest properly,
atom1.set { x =>
  val value = atom2.set { y =>
    x + y
  // if there is a collision with atom1 here, 
  // atom2 will have an inconsistent value, until atom1's set retry succeeds
  value * 2
assertEquals(6, atom1.get)
assertEquals(3, atom2.get)

I originally played around using the apply method, but the syntax became a bit too minimalistic, ie
val myAtom = Atom(5)
assertEquals(6, myAtom())
Another variation which I liked, but was also a bit too confusing, was using a member variable, ie
val myAtom = Atom(5)
myAtom.value = _ + 1
assertEquals(6, myAtom.value)

Let's have a look at how this stuff is normally used from Java.
AtomicReference atom = new AtomicReference(1);
while (true) {
  Integer previous = atom.get();
  Integer update = previous + 2;
  if (atom.compareAndSet(previous, update)) {
assertEquals(3, atom.get().intValue());
val atom = Atom(1)
assertEquals(3, atom.get)
No wonder no one uses this AtomicReference in Java ! Just setting a value correctly is a real pain.

No comments:

Post a Comment