Trustbit

View Original

Event Sourcing with Apache Kafka

Let us talk about what it takes to build a system with Event Sourcing with Apache Kafka.

Disclosure: it is possible to do for event sourcing without aggregates, when using transactions API from Kafka for OCC.

For a long time, there was a consensus that Kafka and Event Sourcing are not compatible with each other. This came from a few limitations:

  • Apache Kafka is a highly-available message bus that favours availability over consistency

  • Kafka is not designed to work with many topics. Thousands are ok, but millions would not work. So it is not possible to address millions of individual aggregate streams for load and store operations.

  • Kafka is an eventually-consistent system, you cannot implement strong consistency guarantees on top of it.

  • Default schema management in the Kafka ecosystem was designed to have a message type per topic. Work-arounds with unions and custom envelopes were always possible, but this goes against the grain of kafka streams

So it might look like there is no way of working with Event Sourcing.

Before we dive into why this is not the case, we need to take a closer look at the history: There are actually two flavours of Event Sourcing: "Plain" and "with Aggregates". They have different implementation constraints.

History of Event Sourcing

Plain Event Sourcing was described by Martin Fowler in 2005, along with Domain Events.

It describes systems which capture all changes to their application state as a sequence of events. Such systems have many benefits:

  • replicate databases between different locations

  • replay events to reconstruct database state at any moment in time

  • change database schema at will

  • introduce reactive integration patterns, where different components react to specific events of interest.

In essence, one could implement this flavour of event sourcing by:

  • maintaining a database (SQL or NoSQL);

  • for each change in the database - generating an event that could be used to reproduce that change (it is easier to achieve, when changes are driven by events from the start);

  • storing all events by appending them to an event log;

  • optionally using that log to rebuild (source) database state from scratch.

However, plain Event Sourcing did not provide a mental model about structuring applications in a complex domain. This came from a different area - Domain-Driven Design (DDD).

In August 2003, Erik Evans published his fantastic blue book about tackling complexity in the heart of software. It was called "Domain-Driven Design". The book explained how to make design of application match the mental model of the domain you are dealing with.

Domain-Driven-Design by Eric Evans

Among patterns, DDD included "Aggregates" - "a collection of related objects that we want to treat as a single unit". Aggregates were pictured as a tree that has a root and boundary. The root would be used to load to find and address aggregate in a database, while the boundary also prescribed how far would the database transaction span.

"Domain-Driven Design" did not cover domain events or event sourcing at this point.

Later, Eric Evans interviewed Greg Young about Event Sourcing at QCon SF 2007. This was one of Greg’s earliest explanations of Event Sourcing. There were a lot of materials and a great course later.

Having two visionaries discuss these topics together, cemented together relation between Domain-Driven Design with Event Sourcing through Aggregate pattern. This approach, frequently served together with CQRS, explained how to design, implement and evolve a system with event sourcing.

The solution was a very logical one: we take a classical aggregate that was originally stored in the database. However, instead of storing it as a tree of linked database records, we would store it as a sequence of domain events that capture all past changes. We will replay these events on loading to project them into the state needed for making new decisions. Changes will be appended in form of the new events.

This approach:

  • allowed to gain all the benefits of plain event sourcing;

  • provided a practical guidance on implementing event sourcing: start with the DDD book, but instead of doing plain Aggregates, do "Aggregates with Event Sourcing"

Later on Greg delivered numerous course on event sourcing and also started Event Store - company and a dedicated database perfectly suited for storing aggregates as a sequence of past events. It could handle millions of individual aggregate streams without problems.

Event Sourcing with Aggregates

Let us get back to the original question - implementing Event Sourcing with Kafka.

If we are talking about Event Sourcing with Aggregates, Apache Kafka alone (!) is not a very good fit.

Number of distinct aggregates could easily reach millions in a system. If we were to store them as event streams, that would mean millions of individually addressable event streams.

However, the number of topics in Kafka is somewhat limited, so we cannot use it as a naive event store. If we really want to use ES with Kafka, we'll need to somehow mix (multiplex) aggregate event streams. Things can get messy from here on.

Default schema management in Kafka ecosystem favours "one message schema-per-topic" approach. Chances are that different message types will go to different topics. This makes it easier to build streaming solutions on top of that, but causes:

  • message bloat - since events are almost always processed out of their context, they need to carry a lot of context with them

  • message fragility - larger the event schema is, higher is the change that it will have to evolve to accomodate event more changes

  • overall system complexity - whenever one needs to make a decision based on multiple different events, different kafka topics would need to be joined together.

  • race conditions - events on different topics will be delivered and processed at different speeds, smearing initial consistency.

In essence, we end up with "Kafka as a document database with strongly-typed streams", instead of the event log.

Long story short Event sourcing is not a good fit for Apache Kafka, as long as we are talking about "Event Sourcing with Aggregates".

Let us take a look at "Event Sourcing without Aggregates" or just plain "Event Sourcing".

Event Sourcing

Plain event sourcing has a long history in databases. Most of the databases maintain a transaction log (or write-ahead log). This log is used to replicate changes from one database to replicas, where it is materialized to create exactly the same state.

Kafka is good fit for log shipping and replication. We can still use domain events as a foundation element in our design, just making sure that all events (for the same app/shard/tenant) go to the same topic and partition.

Whenever there is a change:

  • generate domain event;

  • apply event to the DB (project/materialize);

  • write to the event log.

DB is always driven only from events and can be always replayed from events. This is how new versions are deployed: launch a new replica, let it replay events, then switch traffic at the load balancer.

There is no need to have individually adressable aggregates in this approach, our database is the unit of work. This is similar to how database engines themselves have a single transaction log per database.

Using Kafka for event sourcing would also require capability to "enforce invariants". In other words, this is ability to provide certain guarantees about the state that goes into the event stream. For example:

  • banking: person can have many withdrawals, but he can't withdraw more than 1500 EUR per day;

  • warehouse: items can be taken from the inventory, but inventory can never go negative;

  • reservations: seats could be reserved, but a single seat will never be reserved by more than one person.

Our application needs to enforce these invariants even in cases when there are multiple application instances that are trying to write events at the same time - concurrently (this happens frequently enough in deployments with high availability).

We can do that either by preventing conflicts from being recorded in the event log or by resolving them afterwards.

Here are some ways:

  1. Optimistic Concurrency Control, enforced by the event store. Conflicting writes will not happen, because only one writer will be able to append event at a time. The other will get a concurrency error, will have to read the latest event and retry. Kafka doesn't support this, but many event store implementations provide this out-of-the-box.

  2. Using Kafka Transactions API to turn weak leadership into strong leadership. This will ensure that only one application instance (leader) can write to a specific partition at any given moment. Invariants are preserved when publishing. Conflicts don't happen, because there is only one event published at any point. Check out Kafka Transactions API or this insightful blog post for more details.

  3. Way of CRDT. Let’s dive in more detail here.

It is also possible to use CRDT approach (Conflict-free replicated data type) to resolve conflict after they were recorded in the event log. It relies on the fact that Kafka will order messages anyway, even if both writes come almost at the same time.

We could pick either a simple resolution strategy - the first event to advance the event log to the next version wins, the other one - is rejected. All writers are reading their topic anyway (because they do not know if they are a replica), so they can wait a little bit to see if their write made it through. Conceptually this is same as when we are waiting for ACK about write from the cluster, just a different channel is used.

More involved conflict resolution could extend this approach, increasing system throughput at the cost of complexity: "except if events change different entitites and have no chance of breaking invariants".

 

Summary

To summarise. It is possible to implement event sourcing using Apache Kafka as a transport if the following requirements are met:

  • All events for a single aggregate are published to a single topic

  • We use Kafka Transactions API or CRDT approach to enforce aggregate invariants.

  • We steer to have large aggregate streams that are partitioned by tenants.

 

While this approach is feasible in theory (and was implemented in practice), there are inherent roadblocks in reality, especially in larger organisations.

Kafka software ecosystem favours having one message schema per topic. While it is possible to use Unions or schema references in Kafka topics, this is a recent development with a limited support.

Historically Kafka is favoured by large enterprises. Such companies tend to have strong departmental division with different teams being responsible for different modules, micro-services and corresponding message types. It will be unusual to have multiple teams develop a product that puts different message schemas in a single aggregate and topic, while treating it as a source of truth.

The last point is the important one - it is trivial to join multiple messages in a single topic.