Documentation ¶
Index ¶
- func CreateOutbox(ctx context.Context, db *sql.DB) error
- func SendAsync(ctx context.Context, event Event)
- func SendTx(ctx context.Context, tx sqlx.ExecerContext, event Event) error
- func TimeToEventTimestamp(ts time.Time) int64
- func WriteToOutbox(ctx context.Context, tx sqlx.ExecerContext, metadata EventMetadata, ...) error
- type Event
- type EventHeader
- type EventHeaders
- type EventMetadata
- type EventSender
- type EventSenderInitializer
- type EventTopics
- type KafkaEvent
- type NoopEventSender
- type NormalizedEventTypes
- type TopicsFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SendAsync ¶ added in v2.2.58
SendAsync sends the event without blocking and with no error reporting. If an error happens or the event queue is full, the event might be dropped.
func SendTx ¶ added in v2.2.58
SendTx ensures that the event is sent if the transaction is committed successfully. If the transaction rollbacks, the event will be discarded.
func TimeToEventTimestamp ¶ added in v2.2.18
func WriteToOutbox ¶ added in v2.2.58
func WriteToOutbox(ctx context.Context, tx sqlx.ExecerContext, metadata EventMetadata, payload []byte) error
Types ¶
type EventHeader ¶ added in v2.2.58
type EventHeaders ¶ added in v2.2.58
type EventHeaders []EventHeader
func (EventHeaders) ToKafka ¶ added in v2.2.58
func (headers EventHeaders) ToKafka() []kafka.Header
type EventMetadata ¶ added in v2.2.58
type EventMetadata struct { Type reflect.Type Topic string Key *string Headers EventHeaders }
type EventSender ¶
type EventSender interface { // SendAsync sends the given event. This method should be non blocking and // must never fail. If sending would block, you are allowed to discard the event. // Errors will be logged to the terminal but otherwise ignored. SendAsync(ctx context.Context, event Event) // SendAsyncCh returns a channel you can use to enqueue events to the sender. Sending events on this // channel will give you no delivery guarantees for those events. SendAsyncCh() chan<- Event // SendInTx sends the message in the transaction. // Returns an error, if sending fails. SendInTx(ctx context.Context, tx sqlx.ExecerContext, event Event) error // Close the event sender and flush all pending events. // Waits for all events to be send out. Close() error }
var Sender EventSender = &NoopEventSender{}
type EventSenderInitializer ¶ added in v2.2.58
type EventSenderInitializer interface { Close() error Initialize() (EventSender, error) }
func NewInitializer ¶ added in v2.2.58
func NewInitializer( confluentClient *confluent.Client, kafkaSender *kafka.Producer, fileSender io.WriteCloser, eventTopics EventTopics, bufferSize uint, ) (EventSenderInitializer, error)
type EventTopics ¶
EventTopics contains a mapping from event struct type to a kafka topic. This map must contain all event types that are going to be send. If this misses an event, sending that event will fail.
func (*EventTopics) Normalized ¶ added in v2.2.58
func (topics *EventTopics) Normalized() (*NormalizedEventTypes, error)
Normalized returns a Normalized EventTopics instance where the event types point directly to the events struct type and not to any kind of pointer. This ways we prevent issues with pointers to event types not getting matched to a topic.
func (*EventTopics) Topics ¶
func (topics *EventTopics) Topics() kafka.Topics
type KafkaEvent ¶ added in v2.2.58
type KafkaEvent struct { Event Key string Headers []EventHeader }
func ToKafkaEvent ¶ added in v2.1.14
func ToKafkaEvent(key string, ev Event) *KafkaEvent
func WithKey ¶ added in v2.2.58
func WithKey(ev Event, key string, headers ...EventHeader) *KafkaEvent
type NoopEventSender ¶
type NoopEventSender struct {
// contains filtered or unexported fields
}
func (*NoopEventSender) Close ¶
func (n *NoopEventSender) Close() error
func (*NoopEventSender) SendAsync ¶ added in v2.2.58
func (n *NoopEventSender) SendAsync(ctx context.Context, event Event)
func (*NoopEventSender) SendAsyncCh ¶ added in v2.2.88
func (n *NoopEventSender) SendAsyncCh() chan<- Event
func (*NoopEventSender) SendInTx ¶ added in v2.2.58
func (n *NoopEventSender) SendInTx(ctx context.Context, tx sqlx.ExecerContext, event Event) error
type NormalizedEventTypes ¶ added in v2.2.58
type NormalizedEventTypes struct {
EventTopics
}
func (*NormalizedEventTypes) MetadataOf ¶ added in v2.2.58
func (topics *NormalizedEventTypes) MetadataOf(event Event) (*EventMetadata, error)
func (*NormalizedEventTypes) TopicForType ¶ added in v2.2.58
func (topics *NormalizedEventTypes) TopicForType(eventType reflect.Type) (string, error)
type TopicsFunc ¶
type TopicsFunc func(replicationFactor int16) EventTopics
TopicsFunc builds an EventTopics instance for the given kafka replication factor.