Designing Loggi’s event-driven architecture

ernestocid
Loggi
Published in
10 min readMar 7, 2023

--

A new chapter for Loggi’s services communication

Photo by Florian Klauer on Unsplash

Distributed systems is a complex topic, especially distributed systems communication. The notion of clocks and order are fuzzy, it’s not always clear how we should deal with communication failures, for example: what happens if a message is sent twice? Or worse, what if some message “gets stuck” in the communication pipeline and we end up sending the same message hundreds of times in a short interval?

In the past years, Loggi’s business has grown a lot, and as a result, our systems got more distributed, pushing our architecture to a new level so we could be able to handle that growth. In this blog post, we will present how Loggi tackles microservices communication, with a deeper focus on how we are doing asynchronous microservices communication until now.

We always try to do synchronous first

At Loggi we’ve determined that we use REST for external APIs and for frontend-to-backend communication, for service-to-service we use mostly gRPC for horizontal communication.

We try to avoid complexity in our systems as much as possible and developing simple solutions is one of our core values. So, if something can be done in a synchronous request, we do it. Imagine a scenario where a user is trying to perform some action in an interface (web or mobile), we encourage people to make a synchronous request in this case, mainly to better handle failures. Instead of having a very complex architecture to deal with that failure or being exposed to a silent data bug, the user can simply retry by hitting a button again.

Synchronous requests are simple but we still need to be careful:

  • We try to use user-initiated requests as much as possible. This has strong implications for the user experience in the system and it helps to not get overwhelmed by automatic retries. It also makes the correlation between system behavior and look-and-feel stronger;
  • If it is a request that hits several services, we don’t mix synchronous requests with asynchronous requests;
  • Also, we avoid requests that are chained by several microservices (e.g. Service A calls Service B, Service B calls Service C, and so on) because they can result in cascade failures hurting reliability. As a rule of thumb, we try to avoid call chains deeper than 2 services;
  • If we need to make requests that change data in several services, we should be able to roll back the changes, otherwise we will leave inconsistent data in one or more services;
  • The order in our code is important. If the code must make a request to an external service and save something in the database in a transaction, we don’t save the data in the database before we are sure the request returned a successful response;
  • Aways apply backpressure techniques (timeouts, circuit breaks, etc.). It is important to avoid a system overload.

What about asynchronous communication?

Kafka was the most obvious decision when evaluating the event streaming platform to be the backbone for our event-driven architecture since it is used by thousands of companies including over 80% of the Fortune 100 in their data streaming platforms and event-driven designs.

However, operating a Kafka deployment is a cumbersome job, mainly because Kafka is a large complex system on its own. On top of that, it brings in additional complexities when integrating with client systems which can mess up a lot of things. To avoid that, we’ve decided to use the Confluent Cloud platform to leverage their products and expertise. Also, we did not want to have a dedicated and specialized team to manage Kafka clusters.

Since Kafka is a low-level piece of infrastructure, we can configure it in many ways and it requires some effort to understand which configurations we should use for a particular use case (e.g. should we use idempotent producers or transactional producers?). Instead of letting every team code their own Kafka producers and consumers and deal with the complexity, we developed a thin layer of infrastructure to abstract some of it. The main idea here was that Kafka should be as invisible as possible to everyone in our engineering team.

Besides simplicity, these were the requirements when we were designing this new architecture:

  • It had to work across all of our technology stacks;
  • It should be able to abstract most of the complexities of Kafka;
  • The events produced should be made available for consumption in our data lake for historical analysis;
  • Have strong transactional guarantees.

How the final architecture design looks like

There is a lot going on in this image, so let’s break it down and explain how each part works.

Producing events

An event is created in a producer service. We provide a simple API (which is actually just a single function) to produce events that use schemas defined as Protocol Buffers in our shared protocol buffer repository. By using protocol buffers, we are buying ourselves schema evolution, and backward and forward compatibility for our events. Since we are sharing objects that are used in different microservices, to ensure that we have standards for all of our protocol buffers, we use Uber’s prototool as a linter in the shared repository.

To avoid making unnecessary requests to other services to enrich event data, we also adopt the Event-Carried State Transfer concept in this design, which in practice means that we add all information required by a consumer service in the event. In the past, we suffered a lot with these types of requests overloading other services with accumulated message queues during incident recoveries.

Here is a snippet showing how an event is created using Python:

event = PackageDeliveredEvent(package=package.to_proto())
build_event(event).save()

This piece of code is instantiating the PackageDeliveredEvent protocol buffer and saving the event in the producer service’s database. In the past, we saw widespread use of Django's on_commit hooks and that created a big mess of inconsistency. With all the learnings we accumulate over the time, we built an alternative to on_commit that actually works as we expected.

As we mentioned before, we wanted to have strong transactional capabilities in this design as we did not want to worry about distributed transactions. Suppose we have a method in our producer service that saves some relational data in the database and also sends an event to Kafka. All of this should happen in the same transaction bit, if we fail to send the event to Kafka, we should rollback the changes we made in the relational database, which increases the complexity, so we wanted it to work like a simple database transaction that most people already know.

To reach this goal, we’ve implemented the transactional Outbox Pattern in the build_event function. This function also adds some metadata to the event such as a timestamp for its creation time, origin service, uuid, etc. Once the event is saved in the database we use Debezium, which is a Kafka Connector, to get the event from the database logs and send it to a Kafka topic.

Reliable Microservices Data Exchange With the Outbox Pattern

Here it is how it is done in the code:

@atomic
def deliver_package(package):
package.status = ‘delivered’
package.save()
event = PackageDeliveredEvent(package=package.to_proto())
build_event(event).save()

With the implementation above, the event is first saved in the producer’s database before it is sent to Kafka, everything works like a regular transaction, and if the build_event function fails to save the event, the transaction will rollback and the package status will not be updated.

Consuming Events

Once an event is produced and it reaches its designated Kafka topic, it will be consumed by a microservice called Event Broker as shown in the following image:

The Event Broker is responsible for reading events from Kafka and sending them to the consumers that need to receive them. The events are sent to the consumer via gRPC requests. We do not have any guarantees on the order of event deliveries. Since we are doing automatic retries, that makes in order deliveries very difficult.

One of our goals was to make it as effortless as possible to add new events. In the end we reached a point where an engineer only needs to do two things:

  1. Add a configuration in the Event Broker telling it which consumer services need what events;
  2. Implement a gRPC server to receive the event in the consumer service.

The configuration in the Event Broker looks like this:

consumers:
service-a:
grpc-server-hostname: "service-a-hostname"
grpc-server-port: "12345"
max-retry-attempts: 15
batch-size: 10
retry-topic: "service_a_retry"
dlq-topic: "service_a_dlq"
distribution:
- topic: "event_y"
services:
- "EndpointA"
- "EndpointB"
service-b:
grpc-server-hostname: "service-b-hostname"
grpc-server-port: "54321"
max-retry-attempts: 5
batch-size: 5
retry-topic: "service_b_retry"
dlq-topic: "service_b_dlq"
distribution:
- topic: "event_x"
services:
- "EndpointC"

We have a list of configurations for each consumer service that tells which events should be sent to which consumer, and also what gRPC endpoints are going to receive the events. The Event Broker instantiates one Kafka Consumer for each consumer service, it also uses different consumer group names which makes the consumption of events completely independent for each registered consumer service.

We also have some information about the consumer’s gRPC server like, number of retry attempts before sending the event to a DLQ topic (more on that later), and names of the consumer’s retry and DLQ topics.

As for the gRPC server in the consumer service, its interface is also defined in our shared protocol buffer repository and each service that wants to listen to an event has to implement this interface in their gRPC servers. Here is how it looks like:

class EventsAPI(EventsAPIServicer):
def PackageDelivered(self, request: PackageDeliveredEvent, context) -> Empty:

## do something with the event received

return Empty()

Dealing with failures

We have a retry mechanism in place which resends the event to the consumer service N times where N is the value set in the Event Broker configuration as shown above. We also have in-memory retries that use exponential backoffs and circuit breakers. We learned a lesson after past retry implementations DoS’ed some of our services to death :-).

It is important to mention that we require all consumer services to have idempotent endpoints, since we have an at least once semantics, we might send events more than one time in case of failures.

If the Event Broker fails to deliver the event to the consumer service N times, it will give up and send the event to a DLQ topic. The teams are responsible for monitoring their services’ DLQ topics and when necessary they can use a DLQ Processor job that resends the messages to a retry topic so we start processing them all over again.

Notice we have separate retry and DLQ topics for each consumer service. This is necessary because we may successfully deliver Event X to Service A but fail to deliver it to Service B. When this happens we don’t want to send the message again to Service A because it was already delivered. That is why we need separate topics for each consumer.

Final remarks

This architecture has been running in production for more than two years now and we have dozens of types of events created, from package events to cargo transfer and accounting events. The Event Broker delivers millions of events each day to several consumer services.

In the end, the design was practical enough for our engineering team to engage and be productive, it was easy to create and consume new events.

We also spent a lot of energy on observability and implemented logs and custom metrics so people can easily see what is happening under the hood with their events.

We did not experience any major issues with any of the components of this architecture, and when it comes to performance, we had to implement some fine-tuning for a few high-volume events. Originally, we had one Kafka consumer for each consumer service, and for high-volume events, this could result in some competition for resources. Luckily it was an easy fix and we just had to configure dedicated Kafka consumers for these events.

One of the main lessons learned was that we could have implemented the Event Broker microservice differently, given that in the design we presented here, it ends up being a single point of failure. Even though we never had any major incident with it, this is still a concern that floats around our heads from time to time. We could have implemented it in a more distributed way, for example, as a sidecar for each consumer service.

We also experienced some organizational trade-offs. Given that the Event Broker microservice was maintained by a single platform team, we ended up creating a strong dependency on this team for a lot of other engineering groups. If a team needed a new feature in this architecture, they had to wait until someone from the platform team became available to help them.

Nowadays, we understand that we should have empowered our teams more and avoided this dependency, and this is the direction we are going now. Instead of providing a single solution for everyone, we are concentrating our efforts on providing paved roads with a few paths that people can choose.

--

--