Creating an RxJava Observable from an Akka Actor
Recently I found myself wondering about the ways of combining two awesome libraries: Akka and RxJava. The most straightforward use case for me was about converting a stream of messages received by an Actor
into an Observable
.
I’ll quickly write down how that works, and maybe this will be the start of a new little series of blogs. And maybe, I’ll learn something along the way about how to best create an Akka module for RxJava, something I’d love to do…
But I’ll start very very small. This is not going to go into any kind of error handling, or explore the Akka Event Bus, for example. – All this will have to wait for later.
First, I’ll need a small build.sbt
that’s mostly there to import the two dependencies for me:
organization := "de.johoop",
name := "akka-rxjava",
version := "1.0.0-SNAPSHOT",
scalaVersion := "2.10.3",
scalacOptions ++= Seq("-deprecation", "-language:_"))
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.2.1",
"com.netflix.rxjava" % "rxjava-core" % "0.14.5",
"com.netflix.rxjava" % "rxjava-scala" % "0.14.5"))
Next, I’ll need to define a simple message…
sealed trait Message
case class Hello(text: String) extends Message
…and a basic actor that receives them:
import akka.actor._
class ObservableActor extends Actor with ActorLogging {
def receive = {
case message: Message =>
log debug s"incoming: $message"
}
}
Now let’s add a simple Main
and try it out:
object Main extends App {
val system = ActorSystem("client")
val receiver = system actorOf (Props[ObservableActor], "rcv")
Seq("hello", "world", "again", "not anymore") foreach { msg =>
receiver ! Hello(msg)
}
system.shutdown
}
Up to now, this is pure Akka, and of course it works as expected. Now, for making an Observable out of this, I’ll need a mechanism for subscribing and unsubscribing. For subscribing, I want to be able to specify a callback onNext
that will be called each time a message is received.
This means that I need a few more messages:
sealed trait SubUnsub extends Message
case class Subscribe(onNext: Message => Unit) extends SubUnsub
case object Unsubscribe extends SubUnsub
My actor has to handle these new messages, of course. I’ll use the Become
mechanism for this in order to avoid any kind of mutable state. And very important: I want to call the new callback upon receiving messages instead of just logging them. All this means that the new version of the actor looks like this:
class ObservableActor extends Actor with ActorLogging {
def receive = {
case Subscribe(onNext) =>
log debug "subscribe"
context become subscribed(onNext)
}
def subscribed(onNext: Message => Unit): Actor.Receive = {
case Unsubscribe =>
log debug "unsubscribe"
context become receive
case message: Message =>
log debug s"incoming: $message"
onNext(message)
}
}
At last, I’m prepared to create an Observable. This is the step where the “magic” happens. I’ll add a small utility function for this:
import rx.lang.scala._ // the additional import for RxJava
def observableFromActor(actor: ActorRef): Observable[Message] =
Observable { observer =>
actor ! Subscribe(observer onNext)
new Subscription {
override def unsubscribe: Unit = actor ! Unsubscribe
}
}
What happens here is that in order to create an Observable
, I need a function that takes an Observer
(which provides the required callback(s) and wants to get notified about messages) and returns a Subscription
(for unsubscribing).
On subscription, we send the Subscribe
message to the actor, telling it about the callback. And on unsubscription, we send the Unsubscribe
message. That’s all there is to it.
Let’s extend our Main
object to try it out:
val subscription =
observableFromActor(receiver)
.take(3)
.subscribe(msg => println(s"received: $msg"))
Seq("hello", "world", "again", "not anymore") foreach { msg =>
receiver ! Hello(msg)
}
subscription.unsubscribe
Aaaand it still does it’s thing. I’m good. For now.
In conclusion, I have to say that this was surprisingly easy. Of course, maybe that’s just because I left out all the more interesting things…
Thanks for reading! :-)
For reference, here’s the complete code of the example:
import akka.actor._
import rx.lang.scala._
sealed trait Message
case class Hello(text: String) extends Message
sealed trait SubUnsub extends Message
case class Subscribe(onNext: Message => Unit) extends SubUnsub
case object Unsubscribe extends SubUnsub
object Main extends App {
val system = ActorSystem("client")
val receiver = system actorOf (Props[ObservableActor], "rcv")
val subscription =
observableFromActor(receiver)
.take(3)
.subscribe(msg => println(s"received: $msg"))
Seq("hello", "world", "again", "not anymore") foreach { msg =>
receiver ! Hello(msg)
}
subscription.unsubscribe
system.shutdown
def observableFromActor(actor: ActorRef): Observable[Message] =
Observable { observer =>
actor ! Subscribe(observer onNext)
new Subscription {
override def unsubscribe: Unit = actor ! Unsubscribe
}
}
}
class ObservableActor extends Actor with ActorLogging {
def receive = {
case Subscribe(onNext) =>
log debug "subscribe"
context become subscribed(onNext)
}
def subscribed(onNext: Message => Unit): Actor.Receive = {
case Unsubscribe =>
log debug "unsubscribe"
context become receive
case message: Message =>
log debug s"incoming: $message"
onNext(message)
}
}