kafka

package
v9.1.53+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2020 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// CompressionHeader is the name of the header used to indicate compressed value
	CompressionHeader = "pinpt-compression"
	// CompressionGzip is the value to indicate the compression type
	CompressionGzip = "gzip"
)
View Source
const DefaultIdleDuration = time.Second

DefaultIdleDuration is the default duration once we receive EOF for all partitions to determine if the consumer group is idle

View Source
const DefaultMinGzipBytes = 1024

DefaultMinGzipBytes is the minimum size of data before we compress (assuming config.Gzip = true)

Variables

View Source
var ErrMissingTopic = errors.New("error: missing topic in message")

ErrMissingTopic is an error that is returned if the topic is missing in the Message

View Source
var ErrMissingTopics = errors.New("error: missing at least one topic for consumer")

ErrMissingTopics is returned if no topics are passed

Functions

func IsMessageGzipCompressed

func IsMessageGzipCompressed(headers map[string]string) bool

IsMessageGzipCompressed returns true if the header contains a gzip compressed header indicating that the value is gzip bytes

func NewConfigMap

func NewConfigMap(config Config) *ck.ConfigMap

NewConfigMap returns a ConfigMap from a Config

Types

type AdminClient

type AdminClient interface {
	// NewTopic will create a new topic
	NewTopic(name string, config TopicConfig) error
	// DeleteTopic will delete a topic
	DeleteTopic(name string) error
	// GetTopic details
	GetTopic(name string) (*ck.TopicMetadata, error)
	// ListTopics will return all topics
	ListTopics() ([]*ck.TopicMetadata, error)
}

AdminClient provides an interfae for talking with the Kafka admin

type AdminClientImpl

type AdminClientImpl struct {
	// contains filtered or unexported fields
}

func NewAdminClientUsingConsumer

func NewAdminClientUsingConsumer(c *Consumer) (*AdminClientImpl, error)

NewAdminClientUsingConsumer will create a new AdminClient from a Consumer

func NewAdminClientUsingProducer

func NewAdminClientUsingProducer(p *Producer) (*AdminClientImpl, error)

NewAdminClientUsingProducer will create a new AdminClient from a Producer

func (*AdminClientImpl) DeleteTopic

func (c *AdminClientImpl) DeleteTopic(name string) error

DeleteTopic will delete a topic

func (*AdminClientImpl) GetTopic

func (c *AdminClientImpl) GetTopic(name string) (*ck.TopicMetadata, error)

GetTopic will return the metadata for a given topic

func (*AdminClientImpl) ListTopics

func (c *AdminClientImpl) ListTopics() ([]*ck.TopicMetadata, error)

ListTopics will return all topics

func (*AdminClientImpl) NewTopic

func (c *AdminClientImpl) NewTopic(name string, config TopicConfig) error

type Config

type Config struct {
	Brokers                   []string
	Username                  string
	Password                  string
	Extra                     map[string]interface{}
	Offset                    string
	DisableAutoCommit         bool
	ResetOffset               bool
	ShouldProcessKafkaMessage ShouldProcessKafkaMessage
	ShouldProcessEventMessage ShouldProcessEventMessage
	ClientID                  string
	DefaultPollTime           time.Duration // only for consumers
	Context                   context.Context
	Logger                    log.Logger
	Gzip                      bool
	GzipMinBytes              int // if not set, defaults to DefaultMinGzipBytes
}

Config holds the configuration for connection to the broker

type Consumer

type Consumer struct {
	DefaultPollTime time.Duration
	// contains filtered or unexported fields
}

Consumer will return a kafka consumer

func NewConsumer

func NewConsumer(config Config, groupid string, topics ...string) (*Consumer, error)

NewConsumer returns a new Consumer instance

func NewPingConsumer

func NewPingConsumer(config Config, topic string) (*Consumer, error)

NewPingConsumer returns a new Consumer instance that supports only pings

func (*Consumer) Close

func (c *Consumer) Close() error

Close will stop listening for events

func (*Consumer) Commit

func (c *Consumer) Commit(topic string, partition int32, offset int64) (err error)

Commit will commit to a specific topic for a given partition and offset

func (*Consumer) Consume

func (c *Consumer) Consume(callback eventing.ConsumerCallback)

Consume will start consuming from the consumer using the callback

func (*Consumer) Pause

func (c *Consumer) Pause() error

Pause will allow the consumer to be stopped temporarily from processing further messages

func (*Consumer) Ping

func (c *Consumer) Ping() bool

Ping will cause a ping against the broker by way of fetching metadata from the ping topic

func (*Consumer) Resume

func (c *Consumer) Resume() error

Resume will allow the paused consumer to be resumed

func (*Consumer) WaitForAssignments

func (c *Consumer) WaitForAssignments()

WaitForAssignments will wait for initial assignments to arrive. If they have already arrived before calling this function, it will not block and immediately return. If they assignments have not arrived, it will block until they arrive.

type ConsumerEOFCallback

type ConsumerEOFCallback interface {
	EOF(topic string, partition int32, offset int64)
}

ConsumerEOFCallback is an interface for handling topic EOF events

type ConsumerStatsCallback

type ConsumerStatsCallback interface {
	Stats(stats map[string]interface{})
}

ConsumerStatsCallback is an interface for handling stats events

type EOFCallback

type EOFCallback interface {
	eventing.ConsumerCallback

	// GroupEOF is called when the consumer group reaches EOF all partitions
	GroupEOF(count int64, jobcounts map[JobKey]int64)
}

TrackingConsumerEOF is a handler for receiving the EOF for the consumer group

func NewConsumerCallbackWithGroupEOF

func NewConsumerCallbackWithGroupEOF(callback *eventing.ConsumerCallbackAdapter, h func(total int64, jobcounts map[JobKey]int64)) EOFCallback

NewConsumerCallbackWithGroupEOF will create a delegate for handling a ConsumerCallbackAdapter and adding a GroupEOF event as a func handler

type JobKey

type JobKey struct {
	CustomerID string
	JobID      string
	RefType    string
	Topic      string
}

JobKey is information contained in the job header

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer will emit events to kafka

func NewProducer

func NewProducer(config Config) (*Producer, error)

NewProducer returns a new Producer instance

func (*Producer) Close

func (p *Producer) Close() error

Close will close the producer

func (*Producer) Count

func (p *Producer) Count() int64

Count returns the number of records transmitted

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, msg eventing.Message) error

Send will send the event

func (*Producer) Size

func (p *Producer) Size() int64

Count returns the number of bytes transmitted

type ShouldProcessEventMessage

type ShouldProcessEventMessage func(msg *eventing.Message) bool

ShouldProcessEventMessage is a handler for deciding if we should process the event after deserialization but before we deliver to consumer handler

type ShouldProcessKafkaMessage

type ShouldProcessKafkaMessage func(msg *ck.Message) bool

ShouldProcessKafkaMessage is a handler for deciding if we should process the incoming kafka message before it's deserialized

type TopicConfig

type TopicConfig struct {
	NumPartitions     int
	ReplicationFactor int
	RetentionPeriod   time.Duration
	MaxMessageSize    int64
	Config            map[string]string
	CleanupPolicy     string
}

TopicConfig is the configuration for the topic

type TrackingConsumer

type TrackingConsumer struct {
	// contains filtered or unexported fields
}

TrackingConsumer is an utility which will track a consumer group and detect when the consumer group has it EOF across all the partitions in the consumer group

func NewTrackingConsumer

func NewTrackingConsumer(topic string, groupID string, config Config, redisClient *redisdb.Client, callback EOFCallback) (*TrackingConsumer, error)

NewTrackingConsumer returns a consumer callback adapter which tracks EOF

func (*TrackingConsumer) Assignments

func (tc *TrackingConsumer) Assignments() []eventing.TopicPartition

Assignments returns the current assignments for this consumer

func (*TrackingConsumer) AtEOF

func (tc *TrackingConsumer) AtEOF() bool

AtEOF returns true if the consumer is currently at EOF

func (*TrackingConsumer) Close

func (tc *TrackingConsumer) Close() error

func (*TrackingConsumer) DataReceived

func (tc *TrackingConsumer) DataReceived(msg eventing.Message) error

func (*TrackingConsumer) EOF

func (tc *TrackingConsumer) EOF(topic string, partition int32, offset int64)

func (*TrackingConsumer) ErrorReceived

func (tc *TrackingConsumer) ErrorReceived(err error)

func (*TrackingConsumer) OffsetsCommitted

func (tc *TrackingConsumer) OffsetsCommitted(offsets []eventing.TopicPartition)

func (*TrackingConsumer) PartitionAssignment

func (tc *TrackingConsumer) PartitionAssignment(partitions []eventing.TopicPartition)

func (*TrackingConsumer) PartitionRevocation

func (tc *TrackingConsumer) PartitionRevocation(partitions []eventing.TopicPartition)

func (*TrackingConsumer) Positions

func (tc *TrackingConsumer) Positions() map[int32]int64

Positions returns the per partition consumer offset positions

func (*TrackingConsumer) RecordCount

func (tc *TrackingConsumer) RecordCount() int64

RecordCount returns true current number of records that have been processed assuming not EOF

func (*TrackingConsumer) ShouldFilter

func (tc *TrackingConsumer) ShouldFilter(m *eventing.Message) bool

func (*TrackingConsumer) ShouldProcess

func (tc *TrackingConsumer) ShouldProcess(o interface{}) bool

func (*TrackingConsumer) Stats

func (tc *TrackingConsumer) Stats(stats map[string]interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL