Versions in this module Expand all Collapse all v0 v0.0.0 Apr 9, 2020 Changes in this version + const AtsuUnsetGroupId + const DefaultFlushInterval + const SessionTimeoutDefault + type KafkaStreamConfig interface + ChannelProduce func(topic *string, value []byte) + Close func() error + Flush func(ms int) int + FullTopic func(t string) string + GetBrokers func() string + GetConsumer func() *kafka.Consumer + GetPrefix func() string + GetProducer func() *kafka.Producer + NewConsumer func(km *kafka.ConfigMap) (*kafka.Consumer, error) + NewProducer func(km *kafka.ConfigMap) (*kafka.Producer, error) + Produce func(topic *string, value []byte) error + ProducerDefaults func() *kafka.ConfigMap + SetBrokers func(brokers string) + SetDeliveryError func(f func(*kafka.Message)) + SetFlags func() + SetPrefix func(prefix string) + SetTopic func(topic string) + type StreamConfig struct + Brokers string + Bytes int + Codec string + DeliveryReports bool + Glob bool + GroupId string + Interval time.Duration + Messages int + Offset string + Prefix string + Timeout time.Duration + Topic string + func (sc *StreamConfig) Close() error + func (sc *StreamConfig) Consume(consumer StreamConsumer, config interface{}) error + func (sc *StreamConfig) NewConsumer(km *kafka.ConfigMap) (*kafka.Consumer, error) + func (sc *StreamConfig) NewProducer(km *kafka.ConfigMap) (*kafka.Producer, error) + func (sc *StreamConfig) Produce(topic *string, value []byte) error + func (sc *StreamConfig) SetBrokers(brokers string) + func (sc *StreamConfig) SetDeliveryError(f func(*kafka.Message)) + func (sc *StreamConfig) SetFlags() + func (sc *StreamConfig) SetPrefix(prefix string) + func (sc *StreamConfig) SetTopic(topic string) + func (sc StreamConfig) ChannelProduce(topic *string, value []byte) + func (sc StreamConfig) Flush(ms int) int + func (sc StreamConfig) FullTopic(t string) string + func (sc StreamConfig) GetBrokers() string + func (sc StreamConfig) GetConsumer() *kafka.Consumer + func (sc StreamConfig) GetPrefix() string + func (sc StreamConfig) GetProducer() *kafka.Producer + func (sc StreamConfig) ProducerDefaults() *kafka.ConfigMap + func (sc StreamConfig) String() string + type StreamConsumer interface + DoneCh func() <-chan bool + Error func(kafka.Error) bool + Finish func() error + Interval func(time.Time) error + Message func(*kafka.Message) error + Process func() (bool, error) + Start func(*StreamConfig, interface{}) error + Timeout func(time.Time, bool) bool