Documentation ¶
Index ¶
Constants ¶
View Source
const ( ContextKeyLogger string = "context_key_logger" ContextKeyKafkaConsumer = "router_context_key_kafka_consumer" ContextKeyEventHandlers = "router_context_key_event_handlers" )
Variables ¶
View Source
var ErrUnsupportedEventVersion = errors.New("unsupported event version")
ErrUnsupportedEventVersion is returned when the detected version of the event is not supported by any of the existing handlers.
Functions ¶
func ConsumeMessages ¶
ConsumeMessages fetches messages from a Kafka topic, detects events, and routes the events to the appropriate handler. The handlers are expected to continue retrying on temporary errors, while permanent errors are expected to be written to a dead letter queue.
Types ¶
type EventHandlers ¶
type EventHandlers interface { PermanentErrorHandler(ctx context.Context, event kafkamsgs.Event, err error) CapabilityCreatedHandler(ctx context.Context, event kafkamsgs.Event) }
func GetEventHandlers ¶
func GetEventHandlers(ctx context.Context) EventHandlers
type KafkaConsumer ¶
type KafkaConsumer interface { FetchMessage(ctx context.Context) (kafka.Message, error) CommitMessages(ctx context.Context, msgs ...kafka.Message) error }
func GetKafkaConsumer ¶
func GetKafkaConsumer(ctx context.Context) KafkaConsumer
Click to show internal directories.
Click to hide internal directories.