Documentation ¶
Index ¶
- Constants
- Variables
- type Broker
- type BrokerMetadata
- type Callback
- type Checkpoint
- type Consumer
- type ConsumerGroup
- type ConsumerGroups
- type Event
- type GroupMember
- type LocalOffsetManager
- func (l *LocalOffsetManager) GetOffsetFiles(environment string, topicFilter *regexp.Regexp) ([]string, error)
- func (l *LocalOffsetManager) ListLocalOffsets(topicFilter *regexp.Regexp, envFilter *regexp.Regexp) (map[string]TopicPartitionOffset, error)
- func (l *LocalOffsetManager) ReadLocalTopicOffsets(topic string, environment string) (PartitionOffset, error)
- type Manager
- func (m *Manager) Close()
- func (m *Manager) DeleteConsumerGroup(group string) error
- func (m *Manager) DeleteTopic(topic string) error
- func (m *Manager) GetBrokers(ctx context.Context, includeMetadata bool) ([]Broker, error)
- func (m *Manager) GetConsumerGroups(ctx context.Context, includeMembers bool, ...) (ConsumerGroups, error)
- func (m *Manager) GetGroupTopics(ctx context.Context, group string, includeOffsets bool, ...) (TopicPartitionOffset, error)
- func (m *Manager) GetTopics(ctx context.Context, filter *regexp.Regexp, includeOffsets bool, ...) (TopicPartitionOffset, error)
- type Offset
- type OffsetMode
- type Option
- type Options
- type PartitionOffset
- type Topic
- type TopicPartitionOffset
- type TopicsByName
Constants ¶
const ( SASLMechanismNone = "none" SASLMechanismPlain = "plain" SASLMechanismSCRAM256 = "scram-sha-256" SASLMechanismSCRAM512 = "scram-sha-512" )
Variables ¶
var ( // ErrEmptyEnvironment occurs when the provided environment is empty. ErrEmptyEnvironment = errors.New("The environment cannot be empty") // ErrEmptyTopic occurs when the provided topic is empty. ErrEmptyTopic = errors.New("The topic cannot be empty") )
var (
DefaultClusterVersion = sarama.MaxVersion.String()
)
Functions ¶
This section is empty.
Types ¶
type Broker ¶ added in v1.0.0
type Broker struct { Address string ID int Meta *BrokerMetadata }
type BrokerMetadata ¶ added in v1.0.0
type Callback ¶
type Callback func(topic string, partition int32, offset int64, time time.Time, key, value []byte) error
Callback the function which will get called upon receiving a message from Kafka.
type Checkpoint ¶ added in v0.0.7
type Checkpoint struct {
// contains filtered or unexported fields
}
Checkpoint represents a point in time or offset, from which the consumer has to start consuming from the specified topic.
func NewCheckpoint ¶ added in v0.0.7
func NewCheckpoint(rewind bool) *Checkpoint
NewCheckpoint creates a new checkpoint instance.
In rewind mode, the consumer will start consuming from the oldest available offset which means to consume all the old messages from the beginning of the stream.
func (*Checkpoint) Mode ¶ added in v0.0.7
func (c *Checkpoint) Mode() OffsetMode
Mode returns the current mode of the checkpoint.
func (*Checkpoint) Offset ¶ added in v0.0.7
func (c *Checkpoint) Offset() int64
Offset returns the final offset value from which consuming will be started.
In MillisecondsOffsetMode, the offset will be the milliseconds of the specified time. This is what Kafka needs to figure out the closest available offset at the given time.
func (*Checkpoint) OffsetString ¶ added in v0.0.7
func (c *Checkpoint) OffsetString() string
OffsetString returns the string representation of the time offset in `02-01-2006T15:04:05.999999999` format if in MillisecondsOffsetMode mode, otherwise returns the string representation of the offset value.
func (*Checkpoint) SetOffset ¶ added in v0.0.7
func (c *Checkpoint) SetOffset(offset int64)
SetOffset sets the offset of the checkpoint and switches the mode to ExplicitOffsetMode.
func (*Checkpoint) SetTimeOffset ¶ added in v0.0.7
func (c *Checkpoint) SetTimeOffset(at time.Time)
SetTimeOffset sets the offset to the milliseconds of the given time and sets the mode to MillisecondsOffsetMode.
func (*Checkpoint) TimeOffset ¶ added in v0.0.7
func (c *Checkpoint) TimeOffset() time.Time
TimeOffset returns the originally provided time value of the time-based offset in MillisecondsOffsetMode mode.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a new Kafka cluster consumer.
func NewConsumer ¶
func NewConsumer(brokers []string, printer internal.Printer, environment string, enableAutoTopicCreation bool, options ...Option) (*Consumer, error)
NewConsumer creates a new instance of Kafka cluster consumer.
func (*Consumer) Events ¶ added in v0.0.11
Events the channel to which the Kafka events will be published.
You MUST listen to this channel before you start the consumer to avoid deadlock.
func (*Consumer) Start ¶
Start starts consuming from the specified topics and executes the callback function on each message.
This is a blocking call which will be terminated on cancellation of the context parameter. The method returns error if the topic list is empty or the callback function is nil.
func (*Consumer) StoreOffset ¶ added in v0.0.11
StoreOffset stores the offset of the successfully processed message into the offset store.
type ConsumerGroup ¶ added in v1.0.0
type ConsumerGroup struct { // Members the clients attached to the consumer groups. Members []GroupMember // TopicOffsets the offsets of each topic belong to the group. TopicOffsets TopicPartitionOffset }
ConsumerGroup represents a consumer group.
type ConsumerGroups ¶ added in v1.0.0
type ConsumerGroups map[string]*ConsumerGroup
ConsumerGroups the map of consumer groups keyed by consumer group ID.
func (ConsumerGroups) Names ¶ added in v1.0.0
func (c ConsumerGroups) Names() []string
Names returns the names of the consumer groups
type Event ¶ added in v0.0.11
type Event struct { // Topic the topic from which the message was consumed. Topic string // Key partition key. Key []byte // Value message content. Value []byte // Timestamp message timestamp. Timestamp time.Time // Partition the Kafka partition to which the message belong. Partition int32 // Offset the message offset. Offset int64 }
Event Kafka event.
type GroupMember ¶ added in v1.0.0
type GroupMember struct { // ID the member identifier. ID string // ClientID client ID. ClientID string // Host the host name/IP of the client machine. Host string }
GroupMember represents a consumer group member.
func (GroupMember) String ¶ added in v1.0.0
func (g GroupMember) String() string
type LocalOffsetManager ¶ added in v1.0.0
func NewLocalOffsetManager ¶ added in v1.0.0
func NewLocalOffsetManager(level internal.VerbosityLevel) *LocalOffsetManager
func (*LocalOffsetManager) GetOffsetFiles ¶ added in v1.0.0
func (l *LocalOffsetManager) GetOffsetFiles(environment string, topicFilter *regexp.Regexp) ([]string, error)
GetOffsetFiles returns a list of all the offset files for the given environment.
func (*LocalOffsetManager) ListLocalOffsets ¶ added in v1.0.0
func (l *LocalOffsetManager) ListLocalOffsets(topicFilter *regexp.Regexp, envFilter *regexp.Regexp) (map[string]TopicPartitionOffset, error)
ListLocalOffsets lists the locally stored offsets for the the topics of all the available environments.
The returned map is keyed by the environment name.
func (*LocalOffsetManager) ReadLocalTopicOffsets ¶ added in v1.0.0
func (l *LocalOffsetManager) ReadLocalTopicOffsets(topic string, environment string) (PartitionOffset, error)
ReadLocalTopicOffsets returns the locally stored offsets of the given topic for the specified environment if exists.
If there is no local offsets, the method will return an empty partition-offset map.
type Manager ¶ added in v1.0.0
Manager a type to query Kafka metadata.
func NewManager ¶ added in v1.0.0
func NewManager(brokers []string, verbosity internal.VerbosityLevel, options ...Option) (*Manager, error)
NewManager creates a new instance of Kafka manager
func (*Manager) Close ¶ added in v1.0.0
func (m *Manager) Close()
Close closes the underlying Kafka connection.
func (*Manager) DeleteConsumerGroup ¶ added in v1.0.0
func (*Manager) DeleteTopic ¶ added in v1.0.0
func (*Manager) GetBrokers ¶ added in v1.0.0
GetBrokers returns the current set of active brokers as retrieved from cluster metadata.
func (*Manager) GetConsumerGroups ¶ added in v1.0.0
func (*Manager) GetGroupTopics ¶ added in v1.0.1
type Offset ¶ added in v1.0.0
type Offset struct { // Latest the latest available offset of the partition reported by the server. Latest int64 // Current the current value of the local or consumer group offset. This is where the consumer up to. Current int64 }
Offset represents an offset pair for a given partition.
A pair contains the latest offset of the partition reported by the server and the local or consumer group offset.
type OffsetMode ¶ added in v0.0.7
type OffsetMode int8
OffsetMode represents the offset mode for a checkpoint.
const ( // UndefinedOffsetMode the user has not requested for any specific offset. UndefinedOffsetMode OffsetMode = iota // MillisecondsOffsetMode the closet available offset at a given time will be fetched from the server // before the consumer starts pulling messages from Kafka. MillisecondsOffsetMode // ExplicitOffsetMode the user has explicitly asked for a specific offset. ExplicitOffsetMode )
type Option ¶
type Option func(options *Options)
Option represents a configuration function.
func WithClusterVersion ¶
WithClusterVersion kafka cluster version.
func WithLogWriter ¶ added in v1.0.0
WithLogWriter sets the writer to write the internal Sarama logs to.
type Options ¶
type Options struct { // DisableErrorReporting disables sending consumer errors to the Errors() channel. DisableErrorReporting bool // ClusterVersion kafka cluster version. ClusterVersion string // TLS configuration to connect to Kafka cluster. TLS *tls.Config // contains filtered or unexported fields }
Options holds the configuration settings for kafka consumer.
func NewOptions ¶
func NewOptions() *Options
NewOptions creates a new Options object with default values.
type PartitionOffset ¶ added in v1.0.0
PartitionOffset represents a map of partition offset pairs.
func ToPartitionOffset ¶ added in v1.0.0
func ToPartitionOffset(po map[int32]int64, latest bool) PartitionOffset
ToPartitionOffset creates a new PartitionOffset map from a raw map.
Set latest parameter to true, if you would like to set the Latest offset value instead of the Current value.
func (PartitionOffset) SortPartitions ¶ added in v1.0.0
func (p PartitionOffset) SortPartitions() []int
SortPartitions returns a list of sorted partitions.
type TopicPartitionOffset ¶ added in v1.0.0
type TopicPartitionOffset map[string]PartitionOffset
TopicPartitionOffset represents a map of topic offset pairs for all the partitions.
func ToTopicPartitionOffset ¶ added in v1.0.0
func ToTopicPartitionOffset(tpo map[string]map[int32]int64, latest bool) TopicPartitionOffset
ToTopicPartitionOffset creates a new TopicPartitionOffset from a raw map.
Set latest parameter to true, if you would like to set the Latest offset value instead of the Current value.
func (TopicPartitionOffset) SortedTopics ¶ added in v1.0.0
func (t TopicPartitionOffset) SortedTopics() []string
SortedTopics returns a list of sorted topics.
type TopicsByName ¶ added in v1.0.0
type TopicsByName []Topic
func (TopicsByName) Len ¶ added in v1.0.0
func (t TopicsByName) Len() int
func (TopicsByName) Less ¶ added in v1.0.0
func (t TopicsByName) Less(i, j int) bool
func (TopicsByName) Swap ¶ added in v1.0.0
func (t TopicsByName) Swap(i, j int)