Documentation ¶
Index ¶
- Constants
- Variables
- func GetDefaultConfig() (*sarama.Config, *cluster.Config)
- func NewConsumerOption(offset int64) messaging.OptionCreator
- func NewMessage(key string, payload interface{}) (messaging.Messager, error)
- func NewProducer(topic string, config *sarama.Config, brokers ...string) (messaging.Producer, error)
- func NewStreamReader(c messaging.Consumer, options messaging.OptionCreator) (messaging.StreamReader, error)
- func NewStreamWriter(p messaging.Producer, key string) (messaging.StreamWriter, error)
- func Producer(topic string, strategy Strategy, brokers ...string) (messaging.Producer, error)
- func SetLogger(log *log.Logger)
- type ClientOption
- type Consumer
- type CreateTopicOptions
- type GroupInfo
- type KafkaConsumer
- type KafkaManager
- func (m *KafkaManager) AddPartitions(_ context.Context, req TopicPartitionRequest) error
- func (m *KafkaManager) Close() error
- func (m *KafkaManager) CreateTopics(_ context.Context, opts messaging.OptionCreator, topics ...string) error
- func (m *KafkaManager) DeleteConsumerGroups(withRefresh bool, groups ...string) error
- func (m *KafkaManager) DeleteTopics(_ context.Context, topics ...string) error
- func (m *KafkaManager) GetPartitionInfo(topic, consumerGroup string, withRefresh bool) ([]*PartitionInfoContainer, error)
- func (m *KafkaManager) LatestOffset(topic string, partition int32, timeInMs int64) (int64, error)
- func (m *KafkaManager) ListTopics(_ context.Context) (interface{}, error)
- func (m *KafkaManager) ListTopicsLite(_ context.Context) ([]string, []string, error)
- func (m *KafkaManager) Partitions(topic string) ([]int32, error)
- type ListTopicsResponse
- type Message
- type PartitionInfo
- type PartitionInfoContainer
- type Strategy
- type TopicInfo
- type TopicPartitionRequest
Constants ¶
const OffsetNewest = -1
OffsetNewest lets consumers retrieve the newest possible message. It must be the same value as defined in sarama.
const OffsetOldest = -2
OffsetOldest lets consumers retrieve the oldest possible message. It must be the same value as defined in sarama.
Variables ¶
var ( ErrInvalidPartitionCount = errors.New("cannot reduce number of partitions") ErrSamePartitionCount = errors.New("topic has the same partition count") ErrInvalidTopic = errors.New("topic is invalid") )
var ( ErrStreamExitTimedout = errors.New("reader stream is closed based on timeout setting") ErrInvalidMessageType = errors.New("reader stream received incompatible message type") )
var ConsumerGroupOption messaging.OptionCreator = &consumerOptions{}
ConsumerGroupOption is the default for Consumer Group. In this configuration, partition and offset are ignored since they are automatically managed by kafka
Functions ¶
func GetDefaultConfig ¶
GetDefaultConfig returns default specific config
func NewConsumerOption ¶
func NewConsumerOption(offset int64) messaging.OptionCreator
NewConsumerOption specifies consumer policies. Pass in either OffsetOldest, OffsetNewest, or specific offset that you want to consumer from
func NewMessage ¶
NewMessage creates message that is publishable. Client should pass in a JSON Object that has been marshalled into []byte as payload. Otherwise, any other input types will be converted to binary via gob
func NewProducer ¶
func NewProducer(topic string, config *sarama.Config, brokers ...string) (messaging.Producer, error)
NewProducer initializes a new client for publishing messages
func NewStreamReader ¶
func NewStreamReader(c messaging.Consumer, options messaging.OptionCreator) (messaging.StreamReader, error)
NewStreamReader creates a stream reader by wrapping a kafka consumer
func NewStreamWriter ¶
NewStreamWriter creates a stream writer by wrapping a kafka producer
Types ¶
type ClientOption ¶
type ClientOption func(*KafkaConsumer)
func WithBrokers ¶
func WithBrokers(brokers ...string) ClientOption
WithBrokers specifies brokers for kafka consumer
func WithDisableAutoMark ¶
func WithDisableAutoMark() ClientOption
WithDisableAutoMark gives clients the ability to turn off auto-marking which means clients are responsible to mark the offset themselves This is useful when clients want to retry certain message they fail to process
func WithInitialOffset ¶
func WithInitialOffset(offset int64) ClientOption
WithInitialOffset specifies the initial offset to use if no offset was previously committed.
type CreateTopicOptions ¶
type CreateTopicOptions struct { NumPartitions int32 ReplicationFactor int16 ReplicaAssignment map[int32][]int32 ConfigEntries map[string]*string Timeout time.Duration }
CreateTopicOptions is an options that will be applied to topic creation. The properties are idential to sarama.TopicDetail
func (CreateTopicOptions) Options ¶
func (c CreateTopicOptions) Options() interface{}
Options returns the compatible options for creating topics
type KafkaConsumer ¶
func NewConsumer ¶
func NewConsumer(topic, groupID string, opts ...ClientOption) (*KafkaConsumer, error)
NewConsumer initializes a default consumer client for consuming messages. This function uses consumer group and all partitions will be load balanced
func NewConsumerFromPartition ¶
func NewConsumerFromPartition(topic string, partition int, opts ...ClientOption) (*KafkaConsumer, error)
NewConsumerFromPartition initializes a default consumer client for consuming messages
func (*KafkaConsumer) Close ¶
func (c *KafkaConsumer) Close() error
func (*KafkaConsumer) Consume ¶
func (c *KafkaConsumer) Consume(ctx context.Context, opts messaging.OptionCreator) (<-chan messaging.Event, error)
func (*KafkaConsumer) MarkOffset ¶
func (consumer *KafkaConsumer) MarkOffset(msg messaging.Event, metadata string) error
MarkOffset lets clients mark offset manually after they process each message
type KafkaManager ¶
type KafkaManager struct {
// contains filtered or unexported fields
}
func Manager ¶
func Manager(hosts ...string) (*KafkaManager, error)
Manager creates a simple Kafka Manager with default config to perform administrative tasks
func (*KafkaManager) AddPartitions ¶
func (m *KafkaManager) AddPartitions(_ context.Context, req TopicPartitionRequest) error
AddPartitions increases the partition count for a set of topics
func (*KafkaManager) Close ¶
func (m *KafkaManager) Close() error
func (*KafkaManager) CreateTopics ¶
func (m *KafkaManager) CreateTopics(_ context.Context, opts messaging.OptionCreator, topics ...string) error
func (*KafkaManager) DeleteConsumerGroups ¶
func (m *KafkaManager) DeleteConsumerGroups(withRefresh bool, groups ...string) error
DeleteConsumerGroups removes consumer groups from kafka brokers. Error will be thrown if consumer groups have active consumer(s).
func (*KafkaManager) DeleteTopics ¶
func (m *KafkaManager) DeleteTopics(_ context.Context, topics ...string) error
func (*KafkaManager) GetPartitionInfo ¶
func (m *KafkaManager) GetPartitionInfo(topic, consumerGroup string, withRefresh bool) ([]*PartitionInfoContainer, error)
GetPartitionInfo retrieves information for all partitions that are associated with the given consumer_group:topic non-exisiting topic will be created automatically.
func (*KafkaManager) LatestOffset ¶
func (*KafkaManager) ListTopics ¶
func (m *KafkaManager) ListTopics(_ context.Context) (interface{}, error)
func (*KafkaManager) ListTopicsLite ¶
ListTopicsLite is a fast version of ListTopics. It returns a list of topics and a list of consumer groups for all brokers.
func (*KafkaManager) Partitions ¶
func (m *KafkaManager) Partitions(topic string) ([]int32, error)
type ListTopicsResponse ¶
ListTopicsResponse is a map of TopicInfo
type Message ¶
type Message struct { Value []byte Key []byte Offset int64 Partition int32 Time time.Time Topic string }
Message is a data structure representing kafka messages
type PartitionInfo ¶
PartitionInfo contains metadata for a given partition
type PartitionInfoContainer ¶
type PartitionInfoContainer struct { Topic string GroupID string Partition int32 *PartitionInfo }
type Strategy ¶
type Strategy string
Strategy is a type of routing rule
const ( // StrategyRoundRobin distributes writes evenly StrategyRoundRobin Strategy = "RoundRobin" // StrategyLeastBytes distributes writes to nodes with least amount of traffic StrategyLeastBytes Strategy = "LeastBytes" // StrategyHash distributes writes based on 32-bit FNV-1 Hash function. This // guarantees messages with the same key are routed to the same host StrategyHash Strategy = "Hash" // Uses the same strategy for assigning partitions as the java client //https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 StrategyHashMurmur2 Strategy = "HashMurmur2" )
type TopicPartitionRequest ¶
TopicPartitionRequest lets Kafka manager know which topic to modify and the target number of partitions it should have. key = topic name, value = target number of partitions