Documentation ¶
Index ¶
- Constants
- func NewBatcher(input chan kafka.Message, size int, timeout time.Duration) *batcher
- func NewOffsetManager(logger log.Logger, reader Reader, batchSize int, batchTimeout time.Duration) *offsetManager
- func NewReader(logger log.Logger, dialer *kafka.Dialer, conf *Settings, opts ...ReaderOption) (*kafka.Reader, error)
- type Batcher
- type Consumer
- type Offset
- type OffsetManager
- type Reader
- type ReaderOption
- type Settings
Constants ¶
View Source
const ( // MaxBatchBytes is the Kafka default max batch size in bytes. MaxBatchBytes = 1000000 // MaxBatchSize is the maximum batch size in number of messages. MaxBatchSize = 1024 )
View Source
const ( // DefaultMaxRetryAttempts is how many times to retry a failed operation. DefaultMaxRetryAttempts = 3 // DefaultConsumerGroupRetentionTime is the retention period of current offsets. DefaultConsumerGroupRetentionTime = 7 * 24 * time.Hour // DefaultMaxWait is a reasonable minimum for MaxWait. DefaultMaxWait = time.Second // CommitOffsetsSync == 0 means that auto-commit is disabled. CommitOffsetsSync = time.Duration(0) )
Variables ¶
This section is empty.
Functions ¶
func NewBatcher ¶
func NewOffsetManager ¶
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
type OffsetManager ¶
type ReaderOption ¶
type ReaderOption func(*kafka.ReaderConfig)
type Settings ¶
type Settings struct { ConnectionName string `cfg:"connection" validate:"required"` Topic string `cfg:"topic" validate:"required"` // FQTopic is the fully-qualified topic name (with prefixes). FQTopic string GroupID string BatchSize int `cfg:"batch_size" default:"1"` BatchTimeout time.Duration `cfg:"idle_timeout" default:"1s"` // contains filtered or unexported fields }
func (*Settings) Connection ¶
func (s *Settings) Connection() *connection.Settings
func (*Settings) WithConnection ¶
func (s *Settings) WithConnection(conn *connection.Settings) *Settings
Source Files ¶
Click to show internal directories.
Click to hide internal directories.