Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { BootstrapServers string MessageTopic string // topic for delivering messages MarkerTopic string // topic for markers used for message redelivery RedeliveryTimeout time.Duration // time after which messages will be redelivered (maximum 12 hours) ConsumerReceiveBufferCapacity uint // capacity of the receive buffer for queue consumers ConsumerNextMessageTimeout time.Duration // timeout used in the consumer loop to read messages off of Kafka ConsumerMaxMessagesPerCommit uint // the maximum number of messages consumed before offsets are committed RedeliveryTracker RedeliveryTrackerConfig }
Config contains the configuration for a Kafka-based queue implementation for task distribution
func NewConfigFromCLI ¶
NewConfigFromCLI returns the config with a specified CLI command
func NewConfigFromFile ¶
NewConfigFromFile returns the config specified from the filepath
type Consumer ¶
type Consumer interface { // Close all resources needed to receive messages. io.Closer // Receive a message from the queue. // Returns `nil, nil` if the specified context completes before a message is received from the queue. Receive(ctx context.Context) (Message, error) }
Consumer provides a means to receive messages from a Queue.
type ErrorEvent ¶
type ErrorEvent = kafka.ErrorEvent
ErrorEvent signifies that delivery of a message has failed
type Event ¶
Event is a delivery notification event that is returned on the `deliveryCh` provided to Producer.SendAsync (see below)
type Message ¶
type Message interface { // Payload returns the message payload Payload() []byte // Ack notifies that message processing has been completed. // This message will never be delivered again. Ack() error // Nack notifies that message processing has failed. // This message will never be delivered again. Nack() error // Interrupt notifies that message processing has been interrupted. // The message will be made visible to other Queue Consumers. Interrupt() error }
Message represents a message received by a Queue Consumer
type MessageDeliveredEvent ¶
type MessageDeliveredEvent = kafka.MessageDeliveredEvent
MessageDeliveredEvent signifies that a message has been delivered
type Producer ¶
type Producer interface { // Close all resources needed to send messages. io.Closer // Send a message to the queue. // This is a synchronous, blocking call. Send(payload []byte) error // Send a message asynchronously to the queue with delivery notifications // returned on the specified channel SendAsync(payload []byte, deliveryCh chan Event) // Flush all messages queued for send and block until either all have been sent // or the specified context completes, whichever comes first. Flush(ctx context.Context) }
Producer provides a means to sending messages to a Queue.
type Queue ¶
type Queue interface { // Producer returns a new Producer instance that is capable of sending messages to this Queue. // The returned Producer must be closed to release resources after it is used. Producer() (Producer, error) // Consumer returns a new Consumer instance that is capable of receiving messages from this Queue. // The returned Consumer must be closed to release resources after it is used. Consumer() (Consumer, error) }
Queue encapsulates the transport of messages sent by Producers and received by Consumers
type RedeliveryTrackerConfig ¶
type RedeliveryTrackerConfig struct { UseNowIfNoMarkerSeen time.Duration // controls when to use the local time of the redelivery tracker to determine what messages require redelivery NumOffsetsPerCommit uint // the number of markers that can be consumed between offset commits }
RedeliveryTrackerConfig contain the configuration options for the Kafka-based queue redelivery tracker service
type Service ¶
type Service interface { // GetOrCreateQueue returns a `Queue` identified by the specified id. GetOrCreateQueue(id string) Queue }
Service provides a means to create or get a `Queue`
func NewService ¶
func NewService(kafkaClientFactory kafka.ClientFactory, config Config) (Service, error)
NewService returns a Service instance backed by Kafka