Documentation ¶
Index ¶
- Constants
- Variables
- func IsMessageGzipCompressed(headers map[string]string) bool
- func NewConfigMap(config Config) *ck.ConfigMap
- type AdminClient
- type AdminClientImpl
- type Config
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) Commit(topic string, partition int32, offset int64) (err error)
- func (c *Consumer) Consume(callback eventing.ConsumerCallback)
- func (c *Consumer) Pause() error
- func (c *Consumer) Ping() bool
- func (c *Consumer) Resume() error
- func (c *Consumer) WaitForAssignments()
- type ConsumerEOFCallback
- type ConsumerStatsCallback
- type EOFCallback
- type JobKey
- type Producer
- type ShouldProcessEventMessage
- type ShouldProcessKafkaMessage
- type TopicConfig
- type TrackingConsumer
- func (tc *TrackingConsumer) Assignments() []eventing.TopicPartition
- func (tc *TrackingConsumer) AtEOF() bool
- func (tc *TrackingConsumer) Close() error
- func (tc *TrackingConsumer) DataReceived(msg eventing.Message) error
- func (tc *TrackingConsumer) EOF(topic string, partition int32, offset int64)
- func (tc *TrackingConsumer) ErrorReceived(err error)
- func (tc *TrackingConsumer) OffsetsCommitted(offsets []eventing.TopicPartition)
- func (tc *TrackingConsumer) PartitionAssignment(partitions []eventing.TopicPartition)
- func (tc *TrackingConsumer) PartitionRevocation(partitions []eventing.TopicPartition)
- func (tc *TrackingConsumer) Positions() map[int32]int64
- func (tc *TrackingConsumer) RecordCount() int64
- func (tc *TrackingConsumer) ShouldFilter(m *eventing.Message) bool
- func (tc *TrackingConsumer) ShouldProcess(o interface{}) bool
- func (tc *TrackingConsumer) Stats(stats map[string]interface{})
Constants ¶
const ( // CompressionHeader is the name of the header used to indicate compressed value CompressionHeader = "pinpt-compression" // CompressionGzip is the value to indicate the compression type CompressionGzip = "gzip" )
const DefaultIdleDuration = time.Second
DefaultIdleDuration is the default duration once we receive EOF for all partitions to determine if the consumer group is idle
const DefaultMinGzipBytes = 1024
DefaultMinGzipBytes is the minimum size of data before we compress (assuming config.Gzip = true)
Variables ¶
var ErrMissingTopic = errors.New("error: missing topic in message")
ErrMissingTopic is an error that is returned if the topic is missing in the Message
var ErrMissingTopics = errors.New("error: missing at least one topic for consumer")
ErrMissingTopics is returned if no topics are passed
Functions ¶
func IsMessageGzipCompressed ¶
IsMessageGzipCompressed returns true if the header contains a gzip compressed header indicating that the value is gzip bytes
func NewConfigMap ¶
NewConfigMap returns a ConfigMap from a Config
Types ¶
type AdminClient ¶
type AdminClient interface { // NewTopic will create a new topic NewTopic(name string, config TopicConfig) error // DeleteTopic will delete a topic DeleteTopic(name string) error // GetTopic details GetTopic(name string) (*ck.TopicMetadata, error) // ListTopics will return all topics ListTopics() ([]*ck.TopicMetadata, error) }
AdminClient provides an interfae for talking with the Kafka admin
type AdminClientImpl ¶
type AdminClientImpl struct {
// contains filtered or unexported fields
}
func NewAdminClientUsingConsumer ¶
func NewAdminClientUsingConsumer(c *Consumer) (*AdminClientImpl, error)
NewAdminClientUsingConsumer will create a new AdminClient from a Consumer
func NewAdminClientUsingProducer ¶
func NewAdminClientUsingProducer(p *Producer) (*AdminClientImpl, error)
NewAdminClientUsingProducer will create a new AdminClient from a Producer
func (*AdminClientImpl) DeleteTopic ¶
func (c *AdminClientImpl) DeleteTopic(name string) error
DeleteTopic will delete a topic
func (*AdminClientImpl) GetTopic ¶
func (c *AdminClientImpl) GetTopic(name string) (*ck.TopicMetadata, error)
GetTopic will return the metadata for a given topic
func (*AdminClientImpl) ListTopics ¶
func (c *AdminClientImpl) ListTopics() ([]*ck.TopicMetadata, error)
ListTopics will return all topics
func (*AdminClientImpl) NewTopic ¶
func (c *AdminClientImpl) NewTopic(name string, config TopicConfig) error
type Config ¶
type Config struct { Brokers []string Username string Password string Extra map[string]interface{} Offset string DisableAutoCommit bool ResetOffset bool ShouldProcessKafkaMessage ShouldProcessKafkaMessage ShouldProcessEventMessage ShouldProcessEventMessage ClientID string DefaultPollTime time.Duration // only for consumers Context context.Context Logger log.Logger Gzip bool GzipMinBytes int // if not set, defaults to DefaultMinGzipBytes }
Config holds the configuration for connection to the broker
type Consumer ¶
Consumer will return a kafka consumer
func NewConsumer ¶
NewConsumer returns a new Consumer instance
func NewPingConsumer ¶
NewPingConsumer returns a new Consumer instance that supports only pings
func (*Consumer) Consume ¶
func (c *Consumer) Consume(callback eventing.ConsumerCallback)
Consume will start consuming from the consumer using the callback
func (*Consumer) Pause ¶
Pause will allow the consumer to be stopped temporarily from processing further messages
func (*Consumer) Ping ¶
Ping will cause a ping against the broker by way of fetching metadata from the ping topic
func (*Consumer) WaitForAssignments ¶
func (c *Consumer) WaitForAssignments()
WaitForAssignments will wait for initial assignments to arrive. If they have already arrived before calling this function, it will not block and immediately return. If they assignments have not arrived, it will block until they arrive.
type ConsumerEOFCallback ¶
ConsumerEOFCallback is an interface for handling topic EOF events
type ConsumerStatsCallback ¶
type ConsumerStatsCallback interface {
Stats(stats map[string]interface{})
}
ConsumerStatsCallback is an interface for handling stats events
type EOFCallback ¶
type EOFCallback interface { eventing.ConsumerCallback // GroupEOF is called when the consumer group reaches EOF all partitions GroupEOF(count int64, jobcounts map[JobKey]int64) }
TrackingConsumerEOF is a handler for receiving the EOF for the consumer group
func NewConsumerCallbackWithGroupEOF ¶
func NewConsumerCallbackWithGroupEOF(callback *eventing.ConsumerCallbackAdapter, h func(total int64, jobcounts map[JobKey]int64)) EOFCallback
NewConsumerCallbackWithGroupEOF will create a delegate for handling a ConsumerCallbackAdapter and adding a GroupEOF event as a func handler
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer will emit events to kafka
func NewProducer ¶
NewProducer returns a new Producer instance
type ShouldProcessEventMessage ¶
ShouldProcessEventMessage is a handler for deciding if we should process the event after deserialization but before we deliver to consumer handler
type ShouldProcessKafkaMessage ¶
ShouldProcessKafkaMessage is a handler for deciding if we should process the incoming kafka message before it's deserialized
type TopicConfig ¶
type TopicConfig struct { NumPartitions int ReplicationFactor int RetentionPeriod time.Duration MaxMessageSize int64 Config map[string]string CleanupPolicy string }
TopicConfig is the configuration for the topic
type TrackingConsumer ¶
type TrackingConsumer struct {
// contains filtered or unexported fields
}
TrackingConsumer is an utility which will track a consumer group and detect when the consumer group has it EOF across all the partitions in the consumer group
func NewTrackingConsumer ¶
func NewTrackingConsumer(topic string, groupID string, config Config, redisClient *redisdb.Client, callback EOFCallback) (*TrackingConsumer, error)
NewTrackingConsumer returns a consumer callback adapter which tracks EOF
func (*TrackingConsumer) Assignments ¶
func (tc *TrackingConsumer) Assignments() []eventing.TopicPartition
Assignments returns the current assignments for this consumer
func (*TrackingConsumer) AtEOF ¶
func (tc *TrackingConsumer) AtEOF() bool
AtEOF returns true if the consumer is currently at EOF
func (*TrackingConsumer) Close ¶
func (tc *TrackingConsumer) Close() error
func (*TrackingConsumer) DataReceived ¶
func (tc *TrackingConsumer) DataReceived(msg eventing.Message) error
func (*TrackingConsumer) EOF ¶
func (tc *TrackingConsumer) EOF(topic string, partition int32, offset int64)
func (*TrackingConsumer) ErrorReceived ¶
func (tc *TrackingConsumer) ErrorReceived(err error)
func (*TrackingConsumer) OffsetsCommitted ¶
func (tc *TrackingConsumer) OffsetsCommitted(offsets []eventing.TopicPartition)
func (*TrackingConsumer) PartitionAssignment ¶
func (tc *TrackingConsumer) PartitionAssignment(partitions []eventing.TopicPartition)
func (*TrackingConsumer) PartitionRevocation ¶
func (tc *TrackingConsumer) PartitionRevocation(partitions []eventing.TopicPartition)
func (*TrackingConsumer) Positions ¶
func (tc *TrackingConsumer) Positions() map[int32]int64
Positions returns the per partition consumer offset positions
func (*TrackingConsumer) RecordCount ¶
func (tc *TrackingConsumer) RecordCount() int64
RecordCount returns true current number of records that have been processed assuming not EOF
func (*TrackingConsumer) ShouldFilter ¶
func (tc *TrackingConsumer) ShouldFilter(m *eventing.Message) bool
func (*TrackingConsumer) ShouldProcess ¶
func (tc *TrackingConsumer) ShouldProcess(o interface{}) bool
func (*TrackingConsumer) Stats ¶
func (tc *TrackingConsumer) Stats(stats map[string]interface{})