Before explaining how Kafka can achieve exactly-once semantics, let’s first enumerate the different types of Messaging Semantics one can find in a distributed pub-sub system.

  • At-least-once: the most commonly used, if the producer receives an ACK from the broker, then the message has been written to a topic. However either the ACK can fail to be sent or the broker itself can fail before sending the ACK, in both scenarios the producer would timeout and retry sending another message with the assumption that the first one was not written to the broker, leading to duplicates.

  • At-most-once: if the producer never retries to send the message again. In this scenario, the message might never be received by the broker (thus never written).

  • Exactly-once: even if the message is sent twice, it is only delivered once (i.e. duplicates are ignored). This heavily depends on the consumer application as well, as it’s not only a matter of ensuring that the system doesn’t have the same message twice within the Kafka topic but also that consumers won’t process the same message twice.

Challenges

Let’s consider a simple example of a producer app that writes to a Kafka topic, a consumer app that reads from the same Kafka topic and writes to both another Kafka topic and a database. We also want this to have exactly-once semantics (as described above).

  1. The first challenge we face is on the producer app, as it has to write the message but fails to get ACK. If we retry the message can end up duplicated (at-least-once delivery) if we don’t the message may never be written (at-most-once delivery).

  2. The second challenge is on the consumer side. The consumer app can read the messages from the broker and write their contents to both the Kafka topic and database, but fail before committing the offset back to Kafka. This means that eventually the consumer would consume again messages and write the data that was already written. If it were to first commit the offset and only then write, then the problem would be processing the message without actually writing the contents on the output topic and database, thus losing data.

Producer

The first challenge can be solved by making the producer idempotent, i.e. the ability to apply the same operation multiple times without changing the result beyond the first try. This allows the producer to always retry until it receives the ACK and Kafka will transparently detect and ignore duplicates.

If a producer retries to send messages those will be idempotent and only persisted in the log exactly once. This is done by assigning each producer a PID during initialization and assigning each message a sequence number per topic partition - if the broker has a given message already, then the message is discarded. Though it’s important to note that this is only guaranteed within a single producer session, meaning that if a new producer is created (or restarted) a new PID will be used.

Consumer

When it comes to the second challenge, the consumer has to do a few things:

  1. consume the message
  2. write the data into a database
  3. write another message on a Kafka topic
  4. commit the offset back

In order to be able to do this, Kafka introduced Transactions.

At the core, transactional guarantees enable applications to produce to multiple TopicPartitions atomically, ie. all writes to these TopicPartitions will succeed or fail as a unit.

This means that we can create a transaction, make a set of operations (e.g. produce to multiple topics), and then commit or rollback the transaction. In order to commit the transaction the two phase commit protocol is used. Then consumers can choose to read only committed messages (those who resulted from a committed transaction) - this also means that messages are never deleted from the Kafka log, they will stay there and consumers can read them if they choose to do it as there’s an option to read uncommitted messages.

When it comes to writing data into a database, there are a few possible options developers can use. One of them is storing the consumer offset along with the data written into the database; another option is to use Kafka itself to store the changes (or a Kafka Connector). Keep in mind however that exactly-once semantics only ensures exactly-once processing at the Kafka level, meaning that if your system has side effects you will be back to the same challenge, since if a failure occurs messages will still be processed multiple times internally.

Last but not least, the consumer has to commit the offset, and this should also be part of the transaction. Funny enough, since Kafka 0.8.2 committing the offset amounts to a write to Kafka, meaning that this is no different from producing a message to a Kafka topic.

References