Skip to Content

Queue

The queue delivery method lets you consume a topic as an unordered set of messages.

Messages are delivered in parallel across all consumers of a consumer group, and any given message is delivered to only one consumer.

Publishing messages

In the msgs module, publishing is done to topics that are shared across all delivery methods, with the different delivery methods only applying at consumption time. Please refer to the topic publishing API for information on how to publish messages to a topic.

Receiving messages

Receiving messages is done using the coyote.msgs.queue.receive API as follows:

coyote.msgs.queue.receive( topic="topic-123", // Topic to consume from consumer_group: "group1", // The consumer group to use // Wait until batch_size messages are available, // or batch_wait_millis has passed (whichever comes first). batch_size: 100, batch_wait_ms: 10_000, // How long a consumer owns a message before it goes back to the queue if unacknowledged. lease_duration_ms: 30_000 }

Which returns:

{ msgs: [ { "msg_id": "msg1", "topic": "topic-123", "value": b"msg-payload-1", "timestamp": Date(...), }, { "msg_id": "msg2", "topic": "topic-123", "value": b"msg-payload-2", "headers": {}, "timestamp": Date(...), }, { "msg_id": "msg3", "topic": "topic-123", "value": b"msg-payload-3", "headers": { "h1": "v1" }, "timestamp": Date(...), } ] }

Processing messages

When calling receive, the consumers own a message for the lease duration within which they need to finish processing the message and either acknowledge its processing (ack), or report an error (nack). If the lease expires before either is done, the message is treated as if there was an explicit nack.

Acknowledging messages (ack)

Acknowledging a message is how you mark a message processing as successful which means it won’t be returned again using the receive API.

coyote.msgs.queue.ack( topic="topic-123", consumer_group="group1", msg_id="msg2" )

Rejecting messages (nack)

Rejecting a message is how you report an error in the processing of the message. Rejecting a message immediately releases the lease and the message will become available for consumption again based on the configured retry schedule.

coyote.msgs.queue.ack( topic="topic-123", consumer_group="group1", msg_id="msg2", // Setting terminal=true skips the retry_schedule and sends a message directly to the DLQ terminal=false )

Extending message lease

If you need more time when processing a message, you can extend the message lease like so:

coyote.msgs.queue.extend-lease( topic="topic-123", consumer_group="group1", msg_id="msg3", // New lease duration starting from this moment lease_duration=10_000 )

Dead letter queue (DLQ)

A dead letter queue (DLQ) is a holding queue where messages that fail to be processed successfully (after retries) are redirected for later inspection and debugging.

Message retry schedule

You can configure a retry schedule for a specific topic + consumer group combination, letting you manage how many times a message to retry, and whether to add a delay between retries.

Once a retry schedule is exhausted messages are sent to the DLQ if one is configured, otherwise they are just marked as processed and skipped. If no retry schedule is configured, messages are sent directly to the DLQ on failure.

Redriving the DLQ

You can consume the DLQ like any other topic, or alternatively you can automatically redrive all the messages back to the original queue:

coyote.msgs.queue.redrive-dlq( topic="topic-123", consumer_group="group1", )

Configuration

You can configure the behavior of consumer groups for a specific topic using the coyote.msgs.queue.configure API as follows:

coyote.msgs.queue.configure( topic="topic-123", consumer_group="group1", retry_schedule_ms=[0, 5_000, 10_000, 10_000], dlq="dlq-for-topic-123", )

Advanced functionality

Time travel (reset)

Thanks to how topics and consumer groups work, Coyote supports resetting a consumer group so that it behaves as if it started consuming a topic at a previous point in time.

coyote.msgs.queue.reset( topic="topic-123", consumer_group="group1", // Can only set either timestamp or position, not both. "timestamp": Date(...), "position": "earliest" | "latest", }
Last updated on