Compositional streaming of services with FS2
May 23, 2020
The intention of this post is to demonstrate a blueprint for composing multiple services as streams within the scope of one process with FS2. The idea is that when dealing with unbounded I/O streams, modeling everything as streams leads to better composition.
The libraries that will be used for all the examples:
"co.fs2" %% "fs2-core" % "2.3.0",
// Configuration and validation
"is.cir" %% "ciris-refined" % "1.0.4"
And the imports:
import cats.effect._
import cats.implicits._
// Configuration and validation
import ciris._
import ciris.refined._
import eu.timepit.refined.W
import eu.timepit.refined.api.Refined
import eu.timepit.refined.string.MatchesRegex
import eu.timepit.refined.types.net.PortNumber
import eu.timepit.refined.types.string.NonEmptyString
Configuration
Let’s start with configuration. Unlike the rest of the examples, I’d like to pause and show the awesomeness of the Ciris configuration library. The docs cover most of its features, so let’s jump to defining a sample config for Elasticsearch. The basic config would probably contain the host and port number, at least. If it’s deployed on AWS, it could require the AWS region as well (along with many other parameters skipped for brevity). The region names follow a particular pattern so let’s start by validating it first:
object AWS {
type Region = String Refined MatchesRegex[W.`"([a-z]{2,}-)+[0-9]"`.T]
}
We’re using the refined library here to specify the Region
type which checks if a string matches the region name regex.
The full config will look like this:
final case class ElasticsearchConfig(
host: NonEmptyString,
port: PortNumber,
region: AWS.Region
)
Notice how every field is validated using special types from refined
. The host
field is a string but cannot be empty, while the port
number is guaranteed to be in the correct range by using the PortNumber
type. As a result, there are no String
s and Int
s all over the place with custom validation on top. Ok, it’s just a model, time to specify how to populate it with values.
object ElasticsearchConfig {
val config: ConfigValue[ElasticsearchConfig] =
(
env("ELASTICSEARCH_HOST").default("localhost").as[NonEmptyString],
env("ELASTICSEARCH_PORT").as[PortNumber].default(PortNumber(9200)),
env("ELASTICSEARCH_REGION").as[AWS.Region]
).parMapN(apply)
}
The values are loaded from environment variables with defaults, validated, and combined in parallel. The automatic support for refined
is provided by the ciris-refined module. The same approach is used to compose multiple configurations into larger ones. For example given a separate Kafka config
final case class KafkaConfig(bootstrapServers: NonEmptyString, ...)
both configs can be accumulated into one main config:
final case class Config(es: ElasticsearchConfig, kafka: KafkaConfig)
object Config {
val config: ConfigValue[Config] = (
ElasticsearchConfig.config,
KafkaConfig.config
).parMapN(apply)
}
To load the configuration, we have to specify an effect type that is going to be the cats-effect’s IO
type:
val config: IO[Config] = Config.config.load[IO]
Dependencies
An initial stream to work with can be built by evaluating the IO
effect to emit the config value (effectively going from IO[A]
to Stream[IO, A]
):
val stream: fs2.Stream[IO, Config] = fs2.Stream.eval(config)
We can go back any time with stream.compile.lastOrError: IO[Config]
. The main reason not to is to model an effectful computation as a stream to compose it with a streaming service later on. In other words, instead of “wrapping” a dependency inside a stream, this modeling approach represents it as a stream itself.
Another dependency is an Elasticsearch client. Here’s a complete example of the client interface with a dummy implementation:
trait Event
trait ElasticsearchClient[F[_]] {
def index(event: Event): F[Unit]
}
object ElasticsearchClient {
def create[F[_]](config: ElasticsearchConfig)(
implicit F: Sync[F]
): ElasticsearchClient[F] =
new ElasticsearchClient[F] {
def index(event: Event): F[Unit] =
F.delay(println(s"Indexing $event to ${config.host}:${config.port}"))
}
}
The config is required to create a new client. There is already a stream emitting the config while evaluating the effect. The basic composition should be familiar:
stream.map(config => ElasticsearchClient.create[IO](config.es))
// fs2.Stream[IO, ElasticsearchClient[IO]] = Stream(..)
Although it’s completely valid, the idea is to stick with modeling everything as streams. Plus the config could be needed downstream making both the config and the client dependencies.
val stream: fs2.Stream[IO, ElasticsearchClient[IO]] = for {
config <- fs2.Stream.eval(Config.config.load[IO])
client <- fs2.Stream.emit(ElasticsearchClient.create[IO](config.es))
} yield client
At this step, configuration loading and client creation emit one value so they could stay in IO
without a problem. The composition would be similar. When FS2 shines, though, is in dealing with I/O computations in constant memory.
Composition
It’s common to run both an HTTP server and a Kafka (or any other) consumer concurrently within the scope of one microservice. Continuing this example an HTTP server could serve data from an Elasticsearch index while a consumer would index data from a Kafka topic(s). The code will be more abstract but two good libraries with FS2 support are http4s and fs2-kafka respectively.
A hypothetical API for a consumer that requires its own config and an Elasticsearch client producing a stream:
def eventStream[F[_]](config: KafkaConfig, client: ElasticsearchClient[F])(
implicit F: ConcurrentEffect[F]
): fs2.Stream[F, Unit] = ???
And an HTTP server API that is modeled as cats-effect’s Resource
:
trait Server[F[_]]
def server[F[_]](implicit F: ConcurrentEffect[F]): Resource[F, Server[F]] = ???
Abstracting over effect type (all these F[_]
) following the least powerful typeclass rule is crucial for stream composition and testing. There is no need to constrain an effect with ConcurrentEffect
when Sync
or Applicative
would be enough.
Given that the consumer and the server are independent services, they cannot be composed like dependencies as they are supposed to run concurrently. Instead of “sequential” flatMap
, we can use parJoin
which will nondeterministically merge a stream of streams into a single stream. To run multiple streams concurrently with FS2:
val stream: fs2.Stream[IO, Unit] = for {
config <- fs2.Stream.eval(Config.config.load[IO])
client <- fs2.Stream.emit(ElasticsearchClient.create[IO](config.es))
app <- fs2.Stream(
eventStream(config.kafka, client),
fs2.Stream.resource(server[IO]) // Resource[F, Server[F]] → Stream[F, Server[F]]
).parJoinUnbounded
} yield app
And that’s it! More information about parJoin
and parJoinUnbounded
For this example parJoin(2)
and parJoinUnbounded
are interchangeable but when adding one more service it’s easy to forget to bump the maxOpen
arguments of parJoin
.
, as well as many other composition primitives such as concurrently
(when the services depend on each other), can be found here. The complete snippet of the main entry point with basic error handling would look like this:
object Main extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream: fs2.Stream[IO, Unit] = for {
config <- fs2.Stream.eval(Config.config.load[IO])
client <- fs2.Stream.emit(ElasticsearchClient.create[IO](config.es))
app <- fs2.Stream(
eventStream(config.kafka, client),
fs2.Stream.resource(server[IO])
).parJoinUnbounded
} yield app
stream
.compile
.drain
.onError {
case e: Throwable => IO(println(e.getMessage))
}
.as(ExitCode.Success)
}
// ...
}
Not only is the flow concise, but every line packs a punch. The part with eval
includes type-safe configuration loading semantics, performs it in parallel with validation, and evaluates an effect. Then, any value can be represented as a stream, be it effectful (with eval
) or not (with emit
, where a sequence of values is handled with emits
). The main part comprises an unbounded stream of messages and a server with built-in resource safety (resource
) that run concurrently (parJoin
, where service independence can be controlled by changing the concurrency operation). Adding a third service to the example (let say a message producer along with the consumer) won’t change the structure at all. To finish things off, there is a centralized place for error handling and straightforward support for testing. Mocking every dependency and service in unit tests can be avoided by replacing the IO
effect with any other. Streaming semantics are already tested by FS2 library itself which allows focusing on service logic.
Why not use compositional streaming modeling for applications with streaming core?