Skip to content

Conversation

marcosschroh
Copy link
Collaborator

… Related to #265

Copy link
Contributor

github-actions bot commented Mar 6, 2025

PR Preview Action v1.6.0

🚀 View preview at
https://kpn.github.io/kstreams/pr-preview/pr-297/

Built to branch gh-pages at 2025-03-06 13:31 UTC.
Preview will be ready when the GitHub Pages deployment is complete.

@marcosschroh marcosschroh requested a review from woile March 6, 2025 13:36
@@ -0,0 +1,241 @@
`Kafka 0.11.0` includes support for `idempotent` and `transactional` capabilities in the `producer`. Idempotent delivery ensures that messages are delivered `exactly once`
to a particular topic partition during the lifetime of a `single producer`. Transactional delivery allows producers to send data to multiple partitions such that either
all messages are successfully delivered, or none of them are. Together, these capabilities enable `*exactly once semantics*` in Kafka.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think transactions lack some explanation here. I would add that "exactly once semantics" are for only one scenario:

  • consume-process-produce under the same kafka cluster.

) -> typing.Awaitable[RecordMetadata]: ...


class Transaction(typing.Protocol):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a docstring to this class please. It's not clear why we have a protocol Transaction and then we have transaction.Transaction. An explanation on the docstring would clarify.

consider also renaming this to TransactionProtocol.

from .clients import Producer

if typing.TYPE_CHECKING:
from . import transaction # pragma: no cover
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to avoid this circular import? could transaction.Transaction also be a protocol?

- Transaction state is stored in a new internal topic `__transaction_state`. This topic is not created until the the first attempt to use a transactional request API. There are several settings to control the topic's configuration.
- `Topics` which are included in transactions should be configured for durability. In particular, the `replication.factor` should be at least `3`, and the `min.insync.replicas` for these topics should be set to `2`
- `Transactions` always add overhead, meaning that more effort is needed to produce events and the consumer have to apply filters
For example, `transaction.state.log.min.isr` controls the minimum ISR for this topic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For discoverability I would add links to the kafka settings.

Suggested change
For example, `transaction.state.log.min.isr` controls the minimum ISR for this topic.
For example, [`transaction.state.log.min.isr`](https://kafka.apache.org/documentation/#brokerconfigs_transaction.state.log.min.isr) controls the minimum ISR for this topic.


## Usage

From the `kstreams` point of view, the `transaction pattern` is a `context manager` that will `start` a transaction and then `commit` or `aboort` it. Always a `transaction` starts when an event is send. Once that we have the `context` we can send events in two ways:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
From the `kstreams` point of view, the `transaction pattern` is a `context manager` that will `start` a transaction and then `commit` or `aboort` it. Always a `transaction` starts when an event is send. Once that we have the `context` we can send events in two ways:
From the `kstreams` point of view, the "transaction pattern" is a `context manager` that will `start` a transaction and then `commit` or `abort` it. A `transaction` always starts when an event is sent. Once we have the `context` we can send events in two ways:


From the `kstreams` point of view, the `transaction pattern` is a `context manager` that will `start` a transaction and then `commit` or `aboort` it. Always a `transaction` starts when an event is send. Once that we have the `context` we can send events in two ways:

Using the `StreamEngine` directly:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a side-note, the recommended way should be with the type annotation IMO.

@woile
Copy link
Member

woile commented Mar 10, 2025

Excellent work! 💪🏻

@marcosschroh
Copy link
Collaborator Author

marcosschroh commented Mar 11, 2025

I have hit an issue in aiokafka. Currently I am blocked by aio-libs/aiokafka#1098

Copy link

@Masqueey Masqueey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What an insane commit! Incredible that you can manage to implement such a large feature and manage to adapt the testing client to work and test your changes accordingly.

I left some spelling corrections and some questions, but mostly very impressed with the work you did.

@@ -0,0 +1,241 @@
`Kafka 0.11.0` includes support for `idempotent` and `transactional` capabilities in the `producer`. Idempotent delivery ensures that messages are delivered `exactly once`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we call it Kafka, formally it is called Apache Kafka. I think this first mention would be good to use the full name. Further references can stay Kafka.

It is important to notice that:

- `Transaction` always start from a `send` (producer)
- Events sent to one or more topics will only be visible on consumers after the transaction is committed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Events or (offset) commits"

- To use transactions, a `transaction id` (unique id per transaction) must be set prior an event is sent. If you do not provide one `kstreams` will auto generate one for you.
- Transaction state is stored in a new internal topic `__transaction_state`. This topic is not created until the the first attempt to use a transactional request API. There are several settings to control the topic's configuration.
- `Topics` which are included in transactions should be configured for durability. In particular, the `replication.factor` should be at least `3`, and the `min.insync.replicas` for these topics should be set to `2`
- `Transactions` always add overhead, meaning that more effort is needed to produce events and the consumer have to apply filters

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all sentences in this itemisation end in a period (.), I would change them to make sure all of them do. (Sorry for nitpicking.)

- `Topics` which are included in transactions should be configured for durability. In particular, the `replication.factor` should be at least `3`, and the `min.insync.replicas` for these topics should be set to `2`
- `Transactions` always add overhead, meaning that more effort is needed to produce events and the consumer have to apply filters
For example, `transaction.state.log.min.isr` controls the minimum ISR for this topic.
- `Streams` (consumers) will have to filter or not transactional events

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe more: "Consumer will have to set per Stream whether to filter out events based on their transaction header." (Or something like that, I might not understand the process well enough yet.)

queue "Topic A" as topic_a
producer -> topic_a: "produce with a transaction"
topic_a --> stream_a: "consume only transactional events"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more about the success of the transaction, more than whether or not it was part of a transaction, right? So more "filter out aborted transactional events" than "only transactional"?

self._producer: typing.Optional[typing.Type[Producer]] = None
self._producer: typing.Optional[Producer] = None
self._streams: typing.List[Stream] = []
self._transaction_manager = TransactionManager(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come this TransactionManager no longer needs to be imported?

self._assignment: List[TopicPartition] = []
self.partitions_committed: Dict[TopicPartition, int] = {}

# Called to make sure that has all the kafka attributes like _coordinator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"that it has"

self.partitions_committed: Dict[TopicPartition, int] = {}

# Called to make sure that has all the kafka attributes like _coordinator
# so it will behave like an real Kafka Consumer

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"a" real Kafka



@pytest.mark.asyncio
async def test_producer_with_transaction_no_consumer(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not entirely get what this test is supposed to test. Not sure what this scenario is representing.

await asyncio.wait_for(stream.start(), timeout=0.1)

await stream.stop()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only test I'm still missing here is the produce-then-commit transaction test. (Although I'm aware that that one doesn't work because of the bug in aiokafka.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants