eventing

package
v9.1.72+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2020 License: MIT Imports: 4 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrMessageNotAutoCommit = errors.New("message isn't auto commit")

ErrMessageNotAutoCommit is returned if you call commit on a message that isn't in auto commit mode

Functions

This section is empty.

Types

type CommitOverride

type CommitOverride func(m Message) error

type Consumer

type Consumer interface {
	// Close will stop listening for events
	Close() error
	// Consume will start consuming from the consumer using the callback
	Consume(callback ConsumerCallback)
	// Pause will allow the consumer to be stopped temporarily from processing further messages
	Pause() error
	// Resume will allow the paused consumer to be resumed
	Resume() error
}

Consumer will create a consumer for receiving events

type ConsumerCallback

type ConsumerCallback interface {
	// OnDataReceived is called when an event is received
	DataReceived(msg Message) error
	// OnErrorReceived is called when an error is received
	ErrorReceived(err error)
}

ConsumerCallback will receive events from producers

type ConsumerCallbackAdapter

type ConsumerCallbackAdapter struct {
	// OnDataReceived is called when an event is received
	OnDataReceived func(msg Message) error
	// OnErrorReceived is called when an error is received
	OnErrorReceived func(err error)
	// OnEOF is called when a topic partition EOF is received
	OnEOF func(topic string, partition int32, offset int64)
	// OnStats is called when topic stats is generated
	OnStats func(stats map[string]interface{})
	// OnShouldProcess is called before unmarshalling to allow the
	// consumer control over forwarding decisions
	OnShouldProcess func(o interface{}) bool
	// OnShouldFilter is called before forwarding to the consumer
	// to give the consumer control over filtering messages
	OnShouldFilter func(m *Message) bool
	// OnPartitionAssignment is called when partitions are assigned to the consumer
	OnPartitionAssignment func(partitions []TopicPartition)
	// OnPartitionRevocation is called when partitions are unassigned to the consumer
	OnPartitionRevocation func(partitions []TopicPartition)
	// OnOffsetsCommitted is called when offsets are committed
	OnOffsetsCommitted func(offsets []TopicPartition)
	// contains filtered or unexported fields
}

ConsumerCallbackAdapter will receive events from producers

func (*ConsumerCallbackAdapter) Close

func (cb *ConsumerCallbackAdapter) Close() error

func (*ConsumerCallbackAdapter) DataReceived

func (cb *ConsumerCallbackAdapter) DataReceived(msg Message) (err error)

func (*ConsumerCallbackAdapter) EOF

func (cb *ConsumerCallbackAdapter) EOF(topic string, partition int32, offset int64)

func (*ConsumerCallbackAdapter) ErrorReceived

func (cb *ConsumerCallbackAdapter) ErrorReceived(err error)

func (*ConsumerCallbackAdapter) OffsetsCommitted

func (cb *ConsumerCallbackAdapter) OffsetsCommitted(offsets []TopicPartition)

OnOffsetsCommitted is called when offsets are committed

func (*ConsumerCallbackAdapter) PartitionAssignment

func (cb *ConsumerCallbackAdapter) PartitionAssignment(partitions []TopicPartition)

OnPartitionAssignment is called when partitions are assigned to the consumer

func (*ConsumerCallbackAdapter) PartitionRevocation

func (cb *ConsumerCallbackAdapter) PartitionRevocation(partitions []TopicPartition)

OnPartitionRevocation is called when partitions are unassigned to the consumer

func (*ConsumerCallbackAdapter) ShouldFilter

func (cb *ConsumerCallbackAdapter) ShouldFilter(m *Message) bool

func (*ConsumerCallbackAdapter) ShouldProcess

func (cb *ConsumerCallbackAdapter) ShouldProcess(o interface{}) bool

func (*ConsumerCallbackAdapter) Stats

func (cb *ConsumerCallbackAdapter) Stats(stats map[string]interface{})

type ConsumerCallbackEventFilter

type ConsumerCallbackEventFilter interface {
	ShouldFilter(m *Message) bool
}

ConsumerCallbackEventFilter is a filter for handling forwarding decisions after deserialization from kafka before deliver to the consumer callback. return true to forward or false to stop processing

type ConsumerCallbackMessageFilter

type ConsumerCallbackMessageFilter interface {
	ShouldProcess(o interface{}) bool
}

ConsumerCallbackMessageFilter is a filter for handling forwarding decisions after receiving from kafka before before unmarshalling NOTE: we explicitly use an object here which the implementer needs to cast as a *kafka.Message so as not to create an explicit dependency on the go confluent kafka library for this package where you don't need it (and that library is a bit of a pain because its Cgo and requires third-party library to compile)

type ConsumerCallbackPartitionLifecycle

type ConsumerCallbackPartitionLifecycle interface {
	// OnPartitionAssignment is called when partitions are assigned to the consumer
	PartitionAssignment(partitions []TopicPartition)
	// OnPartitionRevocation is called when partitions are unassigned to the consumer
	PartitionRevocation(partitions []TopicPartition)
	// OnOffsetsCommitted is called when offsets are committed
	OffsetsCommitted(offsets []TopicPartition)
}

ConsumerCallbackPartitionLifecycle will receive events for partition lifecycle changes

type Message

type Message struct {
	Encoding  ValueEncodingType
	Key       string
	Value     []byte
	Headers   map[string]string
	Timestamp time.Time
	Extra     map[string]interface{}
	Topic     string
	Partition int32
	Offset    int64

	// internal, do not set
	CommitOverride CommitOverride `json:"-"`
	AutoCommit     bool           `json:"-"`
}

Message encapsulates data for the event system

func (Message) Commit

func (m Message) Commit() error

Commit is used to commit the processing of this event and store the offset

func (Message) IsAutoCommit

func (m Message) IsAutoCommit() bool

IsAutoCommit returns true if the message is automatically commit or false if you must call Commit when completed

type Producer

type Producer interface {
	// Send will send the event
	Send(ctx context.Context, msg Message) error
	// Close will close the producer
	Close() error
}

Producer will emit events to consumers

type TopicPartition

type TopicPartition struct {
	Topic     string
	Partition int32
	Offset    int64
}

TopicPartition has information about the partition

type ValueEncodingType

type ValueEncodingType string

ValueEncodingType is the type of encoding for the Value payload

const (
	// JSONEncoding is json encoding for the Value payload
	JSONEncoding ValueEncodingType = "json"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL