Creating an RxJava Observable from an Akka Actor

Recently I found myself wondering about the ways of combining two awesome libraries: Akka and RxJava. The most straightforward use case for me was about converting a stream of messages received by an Actor into an Observable.

I’ll quickly write down how that works, and maybe this will be the start of a new little series of blogs. And maybe, I’ll learn something along the way about how to best create an Akka module for RxJava, something I’d love to do…

But I’ll start very very small. This is not going to go into any kind of error handling, or explore the Akka Event Bus, for example. — All this will have to wait for later.

First, I’ll need a small build.sbt that’s mostly there to import the two dependencies for me:

organization := "de.johoop",
name := "akka-rxjava",
version := "1.0.0-SNAPSHOT",
scalaVersion := "2.10.3",
scalacOptions ++= Seq("-deprecation", "-language:_"))
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.2.1",
  "com.netflix.rxjava" % "rxjava-core" % "0.14.5",
  "com.netflix.rxjava" % "rxjava-scala" % "0.14.5"))

Next, I’ll need to define a simple message…

sealed trait Message
case class Hello(text: String) extends Message

…and a basic actor that receives them:

import akka.actor._

class ObservableActor extends Actor with ActorLogging {
  def receive = {
    case message: Message =>
      log debug s"incoming: $message"
  }
}

Now let’s add a simple Main and try it out:

object Main extends App {
  val system = ActorSystem("client")

  val receiver = system actorOf (Props[ObservableActor], "rcv")
  Seq("hello", "world", "again", "not anymore") foreach { msg => 
    receiver ! Hello(msg)
  }

  system.shutdown
}

Up to now, this is pure Akka, and of course it works as expected. Now, for making an Observable out of this, I’ll need a mechanism for subscribing and unsubscribing. For subscribing, I want to be able to specify a callback onNext that will be called each time a message is received.

This means that I need a few more messages:

sealed trait SubUnsub extends Message
case class Subscribe(onNext: Message => Unit) extends SubUnsub
case object Unsubscribe extends SubUnsub

My actor has to handle these new messages, of course. I’ll use the Become mechanism for this in order to avoid any kind of mutable state. And very important: I want to call the new callback upon receiving messages instead of just logging them. All this means that the new version of the actor looks like this:

class ObservableActor extends Actor with ActorLogging {
  def receive = {
    case Subscribe(onNext) =>
      log debug "subscribe"
      context become subscribed(onNext)
  }

  def subscribed(onNext: Message => Unit): Actor.Receive = {
    case Unsubscribe =>
      log debug "unsubscribe"
      context become receive

    case message: Message =>
      log debug s"incoming: $message"
      onNext(message)
  }
}

At last, I’m prepared to create an Observable. This is the step where the “magic” happens. I’ll add a small utility function for this:

import rx.lang.scala._ // the additional import for RxJava

def observableFromActor(actor: ActorRef): Observable[Message] = 
  Observable { observer =>
    actor ! Subscribe(observer onNext)
    new Subscription { 
      override def unsubscribe: Unit = actor ! Unsubscribe 
    }
  }

What happens here is that in order to create an Observable, I need a function that takes an Observer (which provides the required callback(s) and wants to get notified about messages) and returns a Subscription (for unsubscribing).

On subscription, we send the Subscribe message to the actor, telling it about the callback. And on unsubscription, we send the Unsubscribe message. That’s all there is to it.

Let’s extend our Main object to try it out:

val subscription = 
  observableFromActor(receiver) 
    .take(3)
    .subscribe(msg => println(s"received: $msg"))

Seq("hello", "world", "again", "not anymore") foreach { msg => 
  receiver ! Hello(msg)
}

subscription.unsubscribe

Aaaand it still does it’s thing. I’m good. For now.

In conclusion, I have to say that this was surprisingly easy. Of course, maybe that’s just because I left out all the more interesting things…

Thanks for reading! :-)

For reference, here’s the complete code of the example:

import akka.actor._
import rx.lang.scala._

sealed trait Message
case class Hello(text: String) extends Message

sealed trait SubUnsub extends Message
case class Subscribe(onNext: Message => Unit) extends SubUnsub
case object Unsubscribe extends SubUnsub

object Main extends App {
  val system = ActorSystem("client")
  val receiver = system actorOf (Props[ObservableActor], "rcv")

  val subscription =
    observableFromActor(receiver)
      .take(3)
      .subscribe(msg => println(s"received: $msg"))

  Seq("hello", "world", "again", "not anymore") foreach { msg =>
    receiver ! Hello(msg)
  }
  subscription.unsubscribe
  system.shutdown

  def observableFromActor(actor: ActorRef): Observable[Message] =
    Observable { observer =>
      actor ! Subscribe(observer onNext)
      new Subscription {
        override def unsubscribe: Unit = actor ! Unsubscribe
      }
    }
}

class ObservableActor extends Actor with ActorLogging {
  def receive = {
    case Subscribe(onNext) =>
      log debug "subscribe"
      context become subscribed(onNext)
  }

  def subscribed(onNext: Message => Unit): Actor.Receive = {
    case Unsubscribe =>
      log debug "unsubscribe"
      context become receive

    case message: Message =>
      log debug s"incoming: $message"
      onNext(message)
  }
}
Posted in Scala by jmhofer at October 22nd, 2013.
Tags: , ,

One Response to “Creating an RxJava Observable from an Akka Actor”

  1. Gavin Sallery says:

    This is very interesting, I’ve been looking at Scala reactive implementations for some time. I’m not too familiar with Akka just yet, but isn’t there a problem here with using the “become” mechanism to handle subscription? The way I’m reading this suggests that we can have at most one subscriber to the observable, which doesn’t really fit well with the reactive model.

    Am I missing something obvious?

Leave a Reply