Documentation ¶
Index ¶
- func CommittedOffsets(r BatchReader) ([]kafka.TopicPartition, error)
- type BatchReader
- type BatchReaderConfig
- type BatchReaderOption
- type ConsumerConfig
- type ConsumerFunc
- type ConsumerOption
- type Decoder
- type Interface
- type Message
- type Producer
- type ProducerConfig
- type ProducerOption
- type StringMessage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CommittedOffsets ¶
func CommittedOffsets(r BatchReader) ([]kafka.TopicPartition, error)
CommittedOffsets .
Types ¶
type BatchReader ¶
type BatchReader interface { ReadN(buf []interface{}, timeout time.Duration) (int, error) Confirm() error Close() error }
BatchReader .
type BatchReaderConfig ¶
type BatchReaderConfig struct { Topics []string `file:"topics" desc:"topics"` Group string `file:"group" desc:"consumer group id"` Options map[string]interface{} `file:"options" desc:"options"` }
BatchReaderConfig .
type ConsumerConfig ¶
type ConsumerConfig struct { Topics []string `file:"topics" desc:"topics"` Group string `file:"group" desc:"consumer group id"` Parallelism uint64 `file:"parallelism" desc:"parallelism"` Options map[string]interface{} `file:"options" desc:"options"` }
ConsumerConfig .
type ConsumerFunc ¶
ConsumerFunc .
type ConsumerOption ¶
type ConsumerOption interface{}
ConsumerOption .
func WithConsumerListener ¶
func WithConsumerListener(fn func(i int, c *kafka.Consumer)) ConsumerOption
WithConsumerListener .
type Interface ¶
type Interface interface { NewBatchReader(c *BatchReaderConfig, options ...BatchReaderOption) (BatchReader, error) NewConsumer(c *ConsumerConfig, handler ConsumerFunc, options ...ConsumerOption) error NewConsumerWitchCreator(c *ConsumerConfig, creator func(i int) (ConsumerFunc, error), options ...ConsumerOption) error NewProducer(c *ProducerConfig, options ...ProducerOption) (writer.Writer, error) Servers() string ProduceChannelSize() int ProduceEventsChannelSize() int NewAdminClient() (*kafka.AdminClient, error) }
Interface .
type ProducerConfig ¶
type ProducerConfig struct { Topic string `file:"topic" env:"KAFKA_P_TOPIC" desc:"topic"` Parallelism uint64 `file:"parallelism" env:"KAFKA_P_PARALLELISM" default:"4" desc:"parallelism"` Batch struct { Size uint64 `file:"size" env:"KAFKA_P_BATCH_SIZE" default:"100" desc:"batch size"` Timeout time.Duration `file:"timeout" env:"KAFKA_P_BUFFER_TIMEOUT" default:"30s" desc:"timeout to flush buffer for batch write"` } `file:"batch"` Options map[string]interface{} `file:"options" desc:"options"` }
ProducerConfig .
type ProducerOption ¶
type ProducerOption interface {
// contains filtered or unexported methods
}
ProducerOption .
func WithAsyncWriteErrorHandler ¶
func WithAsyncWriteErrorHandler(eh func(error) error) ProducerOption
WithAsyncWriteErrorHandler .
Click to show internal directories.
Click to hide internal directories.