Documentation
¶
Index ¶
- Constants
- type KafkaStreamConfig
- type StreamConfig
- func (sc StreamConfig) ChannelProduce(topic *string, value []byte)
- func (sc *StreamConfig) Close() error
- func (sc *StreamConfig) Consume(consumer StreamConsumer, config interface{}) error
- func (sc StreamConfig) Flush(ms int) int
- func (sc StreamConfig) FullTopic(t string) string
- func (sc StreamConfig) GetBrokers() string
- func (sc StreamConfig) GetConsumer() *kafka.Consumer
- func (sc StreamConfig) GetPrefix() string
- func (sc StreamConfig) GetProducer() *kafka.Producer
- func (sc *StreamConfig) NewConsumer(km *kafka.ConfigMap) (*kafka.Consumer, error)
- func (sc *StreamConfig) NewProducer(km *kafka.ConfigMap) (*kafka.Producer, error)
- func (sc *StreamConfig) Produce(topic *string, value []byte) error
- func (sc StreamConfig) ProducerDefaults() *kafka.ConfigMap
- func (sc *StreamConfig) SetBrokers(brokers string)
- func (sc *StreamConfig) SetDeliveryError(f func(*kafka.Message))
- func (sc *StreamConfig) SetFlags()
- func (sc *StreamConfig) SetPrefix(prefix string)
- func (sc *StreamConfig) SetTopic(topic string)
- func (sc StreamConfig) String() string
- type StreamConsumer
Constants ¶
View Source
const AtsuUnsetGroupId = "atsu-unset-group-id"
AtsuUnsetGroupId is used by library callers, but also manually set as default below.
View Source
const DefaultFlushInterval = 100
View Source
const SessionTimeoutDefault = 6000 // ms
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaStreamConfig ¶
type KafkaStreamConfig interface { NewProducer(km *kafka.ConfigMap) (*kafka.Producer, error) NewConsumer(km *kafka.ConfigMap) (*kafka.Consumer, error) SetDeliveryError(f func(*kafka.Message)) SetTopic(topic string) SetPrefix(prefix string) SetBrokers(brokers string) SetFlags() ProducerDefaults() *kafka.ConfigMap Produce(topic *string, value []byte) error Flush(ms int) int FullTopic(t string) string ChannelProduce(topic *string, value []byte) GetConsumer() *kafka.Consumer GetProducer() *kafka.Producer GetBrokers() string GetPrefix() string Close() error }
type StreamConfig ¶
type StreamConfig struct { Brokers string `json:"brokers" yaml:"brokers" default:"kafka-atsu-prod-01:9092,kafka-atsu-prod-02:9092,kafka-atsu-prod-03:9092"` Prefix string `json:"prefix" yaml:"prefix" default:"atsu"` // defaults to atsu Topic string `json:"topic" yaml:"topic" default:"unset"` // defaults to unset Messages int `json:"messages" yaml:"messages"` Bytes int `json:"bytes" yaml:"bytes"` Timeout time.Duration `ignored:"true" json:"-"` // Must be set explicitly Interval time.Duration `ignored:"true" json:"-"` // Must be set explicitly Offset string `default:"latest" json:"offset" yaml:"offset"` GroupId string `default:"atsu-unset-group-id" split_words:"true" json:"group_id" yaml:"group_id"` Glob bool `default:"false" json:"glob" yaml:"glob"` DeliveryReports bool `default:"false" json:"reports" yaml:"reports"` Codec string `default:"none" json:"codec" yaml:"codec"` // contains filtered or unexported fields }
StreamConfig provides Kafka-related configuration
func (StreamConfig) ChannelProduce ¶
func (sc StreamConfig) ChannelProduce(topic *string, value []byte)
func (*StreamConfig) Close ¶
func (sc *StreamConfig) Close() error
func (*StreamConfig) Consume ¶
func (sc *StreamConfig) Consume(consumer StreamConsumer, config interface{}) error
func (StreamConfig) Flush ¶
func (sc StreamConfig) Flush(ms int) int
func (StreamConfig) FullTopic ¶
func (sc StreamConfig) FullTopic(t string) string
FullTopic returns prefix.topic (XXX don't like this naming yet) if t == "" the sc.Topic will be used
func (StreamConfig) GetBrokers ¶
func (sc StreamConfig) GetBrokers() string
func (StreamConfig) GetConsumer ¶
func (sc StreamConfig) GetConsumer() *kafka.Consumer
/ XXX Temporary functions to allow more advanced usage
func (StreamConfig) GetPrefix ¶
func (sc StreamConfig) GetPrefix() string
func (StreamConfig) GetProducer ¶
func (sc StreamConfig) GetProducer() *kafka.Producer
func (*StreamConfig) NewConsumer ¶
NewConsumer() creates a new Kafka consumer and subscribes to the underlying topic
func (*StreamConfig) NewProducer ¶
NewProducer() creates a new Kafka producer
func (StreamConfig) ProducerDefaults ¶
func (sc StreamConfig) ProducerDefaults() *kafka.ConfigMap
producerDefaults returns a *kafka.ConfigMap with sane defaults
func (*StreamConfig) SetBrokers ¶
func (sc *StreamConfig) SetBrokers(brokers string)
func (*StreamConfig) SetDeliveryError ¶
func (sc *StreamConfig) SetDeliveryError(f func(*kafka.Message))
func (*StreamConfig) SetFlags ¶
func (sc *StreamConfig) SetFlags()
SetFlags to install various command-line flag(s)
func (*StreamConfig) SetPrefix ¶
func (sc *StreamConfig) SetPrefix(prefix string)
func (*StreamConfig) SetTopic ¶
func (sc *StreamConfig) SetTopic(topic string)
func (StreamConfig) String ¶
func (sc StreamConfig) String() string
String returns JSON representation
type StreamConsumer ¶
type StreamConsumer interface { Start(*StreamConfig, interface{}) error Message(*kafka.Message) error // error != nil, stop consumer Interval(time.Time) error // error != nil, stop consumer Timeout(time.Time, bool) bool // bool != false, stop consumer Error(kafka.Error) bool // bool != false, stop consumer Process() (bool, error) Finish() error // This value will be returned to the caller DoneCh() <-chan bool }
Click to show internal directories.
Click to hide internal directories.