Stream
The stream delivery method lets you consume a topic as an ordered log, where consumer groups track their-progress based on the offset in the log.
The stream delivery method is more complex than the others, but it’s the best fit for Kafka-like stream processing behavior which requires exactly once and head-of-line blocking. If you need ordered delivery but don’t need exactly-once processing (e.g. CDC), please consider using fifo instead as it provides for more flexible ordered consumption.
Publishing messages
In the msgs module, publishing is done to topics that are shared across all delivery methods, with the different delivery methods only applying at consumption time. Please refer to the topic publishing API for information on how to publish messages to a topic.
Partitions
Partitions are subdivisions of a topic that enable parallel processing and scalability. Because of how stream processing works, only one consumer can consume a single partition at a time, so dividing a topic to multiple partitions makes it possible to process the topic using multiple consumers in parallel.
Many use-cases don’t require parallel stream processing, so if you don’t think you need it, just stick to the default 1 partition per topic. With that being said, partitions are cheap in Coyote, so you can create a high number of partitions from the get go and not worry about it.
Partitions are named using the topic~N convention: a topic foo has partitions foo~0, foo~1, etc. You can read from a topic (Coyote picks the partition for you) or target a specific partition directly.
Partition counts can be increased at any time, but never decreased.
The concept of partitions technically applies to all delivery methods, but it’s completely abstracted away in all but stream.
Ordering and keys
Coyote uses the group_key property to decide which messages goes to which partitions; which also means that stream retains the relative ordering of messages with a group_key.
Topics are divided into one or more partitions. Messages with the same key are always routed to the same partition, preserving their relative order. Messages without a key are distributed round-robin across partitions.
If ordering matters, always set group_key when publishing.
Increasing the partition count may break ordering between messages published before and after the change. Though that will change soon as we implement functionality to support partition count increase while maintaining ordering.
Receiving messages
Coyote automatically assigns partitions to consumers within a consumer group. Each consumer only receives messages from its assigned partitions, ensuring that multiple consumers can work in parallel without blocking each other.
Receiving messages is done using the coyote.msgs.stream.receive API as follows:
coyote.msgs.stream.receive(
topic="topic-123", // Topic (or specific partition) to consume from
consumer_group="group1", // The consumer group to use
// Wait until batch_size messages are available,
// or batch_wait_millis has passed (whichever comes first).
batch_size: 100,
batch_wait_millis: 10_000,
// How long a consumer holds a batch before it is considered abandoned.
lease_duration_millis: 500_000
)Which returns:
{
msgs: [
{
"offset": 0,
"topic": "topic-123~0",
"value": b"msg-payload-1",
"timestamp": Date(...),
},
{
"offset": 1,
"topic": "topic-123~0",
"value": b"msg-payload-2",
"headers": {},
"timestamp": Date(...),
},
{
"offset": 0,
"topic": "topic-123~1",
"value": b"msg-payload-3",
"headers": { "h1": "v1" },
"timestamp": Date(...),
}
]
}As you can see, each message has an associate offset, and the topic + partition it was received from. Messages from the same partition will always be sorted from the lower to the higher offset.
Committing offsets
After processing messages, commit the highest offset you have successfully handled. Coyote will resume from the next offset on the next receive call.
coyote.msgs.stream.commit(
topic="topic-123~0", // Must be a specific partition
consumer_group="group1",
offset=3 // Commits all messages up to and including this offset
)Seeking
You can manually reposition a consumer group’s read position using coyote.msgs.stream.seek. This is useful for replaying messages or skipping ahead.
coyote.msgs.stream.seek(
topic="topic-123~0", // Topic or specific partition
consumer_group="group1",
// Provide exactly one of: timestamp, or position
timestamp: Date(...),
position: "earliest" | "latest"
)Exactly-once delivery
For exactly-once semantics, track offsets per partition in your own storage and commit them server-side after each successful write to your destination. If additional partitions are added to the topic later, begin tracking their offsets as well.
Even when managing offsets yourself, you are still expected to commit offsets server-side using coyote.msgs.stream.commit.
Advanced offset management
The following APIs operate on individual partitions only (not on a topic as a whole):
coyote.msgs.stream.offset_get: returns the current committed offset for a partition and consumer group.coyote.msgs.stream.offset_seek: directly sets the committed offset for a partition and consumer group.
Configuration
Partition count is configured per topic using coyote.msgs.topic.configure:
coyote.msgs.topic.configure(
topic="topic-123",
partitions=4
)Please refer to the partitions section above for more information on setting partition counts.