Akka Stream

Source ~> Flow ~> Sink

Posted by Fabio Fumarola on May 6, 2016

Akka Stream Concepts: Source ~> Flow ~> Sink

Reactive Stream vs Akka Stream

The first time I approached AkkaStream I found it hard to handle in contrast to Reactive Extensions and other Reactive Stream implementations. On one side I have a simple and ready to use DSL with simple operators like map, filter, ...; on the other side I have several concepts like Flow, GraphDSL, and GraphStage. So it was clear to me to approach with Reactive Extensions, but I was wrong. As Discussed before A first look at Akka streams Reactive Extensions are really good at defining flows, but become hard to manage when you are required to deal with streams coming at different rate or when you have to broadcast or merge streams. Fixing this requires carefully organizing schedulers, and things become quite complex pretty soon.

ConnectableFlux<Double> sharedDoubles = doubles.publish();

// will not start
Flux<Doubles> f1 = sharedDoubles
                      .map(x -> x + 1)
                      .subscribe(Subscribers.consume(System.out::println));
// will not start
Flux<Doubles> f2 = sharedDoubles
                      .map(x -> Math.pow(x,3))
                      .subscribe(Subscribers.consume(System.out::println));

// starts to emit
sharedDoubles.connect();

//Dispatching on a different thread is done using the `dispatchOn()` API in `Reactor` and `subscribeOn()` in ReactiveX. This causes the computation to move on a different *scheduler*.

ConnectableFlux<Double> sharedDoubles = doubles.publish().dispatchOn(SchedulerGroup.single());

Above you can see an example of broadcasting an incoming stream to multiple outputs. Here you need to explicitly setup a Thread to run the computation.

On the contrary, Akka Stream is built with the decision to offer APIs that are minimal and consistent—as opposed to easy or intuitive. There is no magic, all the API features are explicit. You have operators to define Source, Sink, Flow, Graph and Operators. You have the operators to handle back-pressure, buffering, transformations, failure recovery, etc. But on my opinion the features that make Akka Stream implementation interesting are: 1. the idea to use Akka Actors as backend for stream execution. Each transformations in Akka Stream is materialized into an Actor, which is scheduled on a thread pool. 2. the separation between stream materialization and definition. What you define in Akka Stream is a blueprint, that is a set of flow, graph and topologies. These defined partial graph can be reused and composed to create complex graphs.

Note about Latency: the solution to define an actor for each transformation derives in high latency since Akka actors would not be scheduled every time on the same thread of the thread pool. However, with the Fusion Operator it is possible to execute multiple processing steps of the stream graph within the same Actor. By default Akka Stream configuration file exposes an auto-fusing property to reduce actors creation during the materialisation step.

Using Akka Stream we can focus on building libraries of reusable pieces, allowing full compositionality without adding constraint on the actual stream execution. Constraint on the execution can be optionally imposed when composing the graphs or by extending the materializer. For example Gearpump.io supports Akka Stream by providing the GearpumpMaterializer.

Akka Stream concepts

The element in Akka Stream that allows you to define a stream are:

  • Source: something with exactly one output stream
  • Sink: something with exactly one input stream
  • Flow: something with exactly one input and one output stream
  • BidiFlow: something with exactly two input streams and two output streams that conceptually behave like two Flows of opposite direction
  • Graph: a packaged stream processing topology that exposes a certain set of input and output ports, characterized by an object of type Shape.

Source

A Source is a set of stream processing steps that has one open output. It can comprise any number of internal sources and transformations that are wired together, or it can be an “atomic” source, e.g. from a collection or a file. Materialization turns a Source into a Reactive Streams Publisher.

All the code used below can be found at akka_stream_tutorial.

As basis a source can be create from an Iterable.

val firstSource = Source(1 to 1000)

Now that we have our first Source to run our first Akka Stream example let us add a Sink and run the computation.

val firstSource = Source(1 to 1000)
val computation = firstSource.to(Sink.foreach(println))
computation.run()

You can see here that the stream definition is separated from its running. If we extend the same example with types we get:

val firstSource: Source[Int, NotUsed] = Source(1 to 1000)
val computation: RunnableGraph[NotUsed] = firstSource.to(Sink.foreach(println))
computation.run()(materializer)

So we can see that Source is of type Source[Int, NotUsed] and computation is of type RunnableGraph[NotUsed].

  1. The NotUsed can be changed with other Types like Int, Double and so on. It can be used in cases of finite streams where we want to get a result from the stream execution.
  2. The RunnableGraph is the base class for each StreamComputation.
  3. The firstSource.to(Sink.foreach(println)) define that the source is sent to a Sink. The Sink.foreach(println) is a Sink that print each element of the stream.
  4. The second parameter for the method computation.run() is the materializer that take cares of trasforming the computation in Actors.

Let us postpone in the following how to get results from stream execution and concentrate on Sources. Apart an Iterable a source can be started from: Iterator, Future, Graph, Publisher, Promise, single element, Tick and from Resource.

At first let’s define a simple trait to mix with out objects

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

trait AkkaStreamApp extends App {
  implicit val actorSystem = ActorSystem("Tutorial")
  implicit val materializer = ActorMaterializer()
  implicit val executor = actorSystem.dispatcher
}

Iterator

object SourceFromIterator extends AkkaStreamApp {
  Source.fromIterator(() => Iterator.from(0))
    .runWith(Sink.foreach(println))
}

Future

object SourceFromFuture extends AkkaStreamApp {
  Source.fromFuture(Future(1 to 10))
    .runWith(Sink.foreach(println))
}
object SourceFromFuture2 extends AkkaStreamApp {
  Source.fromFuture(Future(1 to 10))
    .mapConcat(identity)
    .runWith(Sink.foreach(println))
}

In the above example we apply the .mapConcat(identity) operator that flat the range of 1 to 10 into a stream of Int. This operator is similar to the scala flatMap method.

Single

object SourceFromSingle extends AkkaStreamApp {
  Source.single(1)
    .runWith(Sink.foreach(println))
}

Tick

object SourceFromTick extends AkkaStreamApp {
  Source.tick(0 second, 1 second, 1)
    .runForeach(println)
}

Resource

Start a new Source from some resource which can be opened, read and closed. Interaction with resource happens in a blocking way, but there is the unfoldResourceAsync that execute the computation into an async fashion. This source is useful, for example if you need to consumer a stream from a file or from a DB. It can be combined with the tick method to schedule a read of an external source.

object FromCloseableResource extends AkkaStreamApp {

  val in = this.getClass.getResourceAsStream("example_resource.txt")

  Source.unfoldResource[String, BufferedReader](
    () => scala.io.Source.fromInputStream(in).bufferedReader(),
    reader => Option(reader.readLine()),
    reader => reader.close())
    .runForeach(println)
}

Sink

A Sink is a destination of a stream process which can be used as a Subscriber. Sinks are then used in Akka Stream to finalise a computation. It can be used for tasks like printing elements, sending data to an external destination (http, tcp, udp, kafka and so on) or materializing the result of a stream computation.

Source.single(1)
.runWith(Sink.foreach(println))

For example:

  • Sink.ignore generates a sink that consume a stream ignoring all the elements.
  • Sink.asPublisher[T](fanout: Boolean) create a publisher to interact with ReactiveStreams.
  • be used as a publisher to another stream: Sink.asPublisher[T](fanout: Boolean)
  • Sink.foreach((t: T) => Unit) is used to apply a function to each element of the stream.

Moreover with Sink we can materialize stream computation with operators like:

  • Sink.head[T] which returns a Future[T] with the first element of the stream.
  • Sink.seq[T] holds a future with a Seq[T].
  • Sink.reduce[T](f: (T, T) => T) returns a Sink that reduce the stream into a single value.

Finally Sinks can be create from graph Sink.fromGraph[T, M](g: Graph[SinkShape[T], M]): Sink[T, M] that we will see in the next post.

Flow

The Flow class allows us to generate reusable piece of stream computation, that can be treated as operator (functions) to be composed with a Source. But let’s start with an example.


val flow = Source(1 to 1000)
	.map(_ + 1)
	.filter(_ % 2 == 0)
	.toMat(Sink.fold(0)(_ + _))(Keep.right)

Here we create a source from an iterable, then we apply in order a map a filter and we fold the stream by summing all the elements.


def myFlow(): Flow[Int, Int, NotUsed] = Flow[Int]
    .map(_ + 1)
    .filter(_ % 2 == 0)

val flow = Source(1 to 1000)
	.via(myFlow)
	.toMat(Sink.fold(0)(_ + _))(Keep.right)

val result = flow.run()

In the above example we transformed part of the stream processing into the method myFlow, that we can combine with a source using the via method. If you pay attention to the signature of myFlow() you can see that the class Flow transform an Int to an Int and does not use materialisation parameter (Flow[Int,Int,Mat]). Source and Flow class and object expose a rich API to work with streams. It allows you to process your stream one element at time, doing transformation, filtering. But it offers to you operators to deal with: timers, schedulers, merge of stream with different rates, apply async operations without blocking, monitor a stream, batch your stream in windows (and sliding windows), throttle the stream to limit its speed. Finally, it gives to you also basic operators to join and merge streams, but in the following we will that using the GraphDSL api we can deal with multiple streams in a more elegant way.

Conclusion

By analysing how to create a Source we just discovered the surface of Akka Stream. In the next post we will dig deeper in the complex part, which is dealing with graph and streaming graph composed by slower and faster nodes.