Documentation ¶
Index ¶
- Variables
- type CommitOverride
- type Consumer
- type ConsumerCallback
- type ConsumerCallbackAdapter
- func (cb *ConsumerCallbackAdapter) Close() error
- func (cb *ConsumerCallbackAdapter) DataReceived(msg Message) (err error)
- func (cb *ConsumerCallbackAdapter) EOF(topic string, partition int32, offset int64)
- func (cb *ConsumerCallbackAdapter) ErrorReceived(err error)
- func (cb *ConsumerCallbackAdapter) OffsetsCommitted(offsets []TopicPartition)
- func (cb *ConsumerCallbackAdapter) PartitionAssignment(partitions []TopicPartition)
- func (cb *ConsumerCallbackAdapter) PartitionRevocation(partitions []TopicPartition)
- func (cb *ConsumerCallbackAdapter) ShouldFilter(m *Message) bool
- func (cb *ConsumerCallbackAdapter) ShouldProcess(o interface{}) bool
- func (cb *ConsumerCallbackAdapter) Stats(stats map[string]interface{})
- type ConsumerCallbackEventFilter
- type ConsumerCallbackMessageFilter
- type ConsumerCallbackPartitionLifecycle
- type Message
- type Producer
- type TopicPartition
- type ValueEncodingType
Constants ¶
This section is empty.
Variables ¶
var ErrMessageNotAutoCommit = errors.New("message isn't auto commit")
ErrMessageNotAutoCommit is returned if you call commit on a message that isn't in auto commit mode
Functions ¶
This section is empty.
Types ¶
type CommitOverride ¶
type Consumer ¶
type Consumer interface { // Close will stop listening for events Close() error // Consume will start consuming from the consumer using the callback Consume(callback ConsumerCallback) // Pause will allow the consumer to be stopped temporarily from processing further messages Pause() error // Resume will allow the paused consumer to be resumed Resume() error }
Consumer will create a consumer for receiving events
type ConsumerCallback ¶
type ConsumerCallback interface { // OnDataReceived is called when an event is received DataReceived(msg Message) error // OnErrorReceived is called when an error is received ErrorReceived(err error) }
ConsumerCallback will receive events from producers
type ConsumerCallbackAdapter ¶
type ConsumerCallbackAdapter struct { // OnDataReceived is called when an event is received OnDataReceived func(msg Message) error // OnErrorReceived is called when an error is received OnErrorReceived func(err error) // OnEOF is called when a topic partition EOF is received OnEOF func(topic string, partition int32, offset int64) // OnStats is called when topic stats is generated OnStats func(stats map[string]interface{}) // OnShouldProcess is called before unmarshalling to allow the // consumer control over forwarding decisions OnShouldProcess func(o interface{}) bool // OnShouldFilter is called before forwarding to the consumer // to give the consumer control over filtering messages OnShouldFilter func(m *Message) bool // OnPartitionAssignment is called when partitions are assigned to the consumer OnPartitionAssignment func(partitions []TopicPartition) // OnPartitionRevocation is called when partitions are unassigned to the consumer OnPartitionRevocation func(partitions []TopicPartition) // OnOffsetsCommitted is called when offsets are committed OnOffsetsCommitted func(offsets []TopicPartition) // contains filtered or unexported fields }
ConsumerCallbackAdapter will receive events from producers
func (*ConsumerCallbackAdapter) Close ¶
func (cb *ConsumerCallbackAdapter) Close() error
func (*ConsumerCallbackAdapter) DataReceived ¶
func (cb *ConsumerCallbackAdapter) DataReceived(msg Message) (err error)
func (*ConsumerCallbackAdapter) EOF ¶
func (cb *ConsumerCallbackAdapter) EOF(topic string, partition int32, offset int64)
func (*ConsumerCallbackAdapter) ErrorReceived ¶
func (cb *ConsumerCallbackAdapter) ErrorReceived(err error)
func (*ConsumerCallbackAdapter) OffsetsCommitted ¶
func (cb *ConsumerCallbackAdapter) OffsetsCommitted(offsets []TopicPartition)
OnOffsetsCommitted is called when offsets are committed
func (*ConsumerCallbackAdapter) PartitionAssignment ¶
func (cb *ConsumerCallbackAdapter) PartitionAssignment(partitions []TopicPartition)
OnPartitionAssignment is called when partitions are assigned to the consumer
func (*ConsumerCallbackAdapter) PartitionRevocation ¶
func (cb *ConsumerCallbackAdapter) PartitionRevocation(partitions []TopicPartition)
OnPartitionRevocation is called when partitions are unassigned to the consumer
func (*ConsumerCallbackAdapter) ShouldFilter ¶
func (cb *ConsumerCallbackAdapter) ShouldFilter(m *Message) bool
func (*ConsumerCallbackAdapter) ShouldProcess ¶
func (cb *ConsumerCallbackAdapter) ShouldProcess(o interface{}) bool
func (*ConsumerCallbackAdapter) Stats ¶
func (cb *ConsumerCallbackAdapter) Stats(stats map[string]interface{})
type ConsumerCallbackEventFilter ¶
ConsumerCallbackEventFilter is a filter for handling forwarding decisions after deserialization from kafka before deliver to the consumer callback. return true to forward or false to stop processing
type ConsumerCallbackMessageFilter ¶
type ConsumerCallbackMessageFilter interface {
ShouldProcess(o interface{}) bool
}
ConsumerCallbackMessageFilter is a filter for handling forwarding decisions after receiving from kafka before before unmarshalling NOTE: we explicitly use an object here which the implementer needs to cast as a *kafka.Message so as not to create an explicit dependency on the go confluent kafka library for this package where you don't need it (and that library is a bit of a pain because its Cgo and requires third-party library to compile)
type ConsumerCallbackPartitionLifecycle ¶
type ConsumerCallbackPartitionLifecycle interface { // OnPartitionAssignment is called when partitions are assigned to the consumer PartitionAssignment(partitions []TopicPartition) // OnPartitionRevocation is called when partitions are unassigned to the consumer PartitionRevocation(partitions []TopicPartition) // OnOffsetsCommitted is called when offsets are committed OffsetsCommitted(offsets []TopicPartition) }
ConsumerCallbackPartitionLifecycle will receive events for partition lifecycle changes
type Message ¶
type Message struct { Encoding ValueEncodingType Key string Value []byte Headers map[string]string Timestamp time.Time Extra map[string]interface{} Topic string Partition int32 Offset int64 // internal, do not set CommitOverride CommitOverride `json:"-"` AutoCommit bool `json:"-"` }
Message encapsulates data for the event system
func (Message) IsAutoCommit ¶
IsAutoCommit returns true if the message is automatically commit or false if you must call Commit when completed
type Producer ¶
type Producer interface { // Send will send the event Send(ctx context.Context, msg Message) error // Close will close the producer Close() error }
Producer will emit events to consumers
type TopicPartition ¶
TopicPartition has information about the partition
type ValueEncodingType ¶
type ValueEncodingType string
ValueEncodingType is the type of encoding for the Value payload
const ( // JSONEncoding is json encoding for the Value payload JSONEncoding ValueEncodingType = "json" )