Documentation
¶
Index ¶
Constants ¶
View Source
const ( ConsumerIsolationCommitted = ConsumerIsolationMode(kafka.ReadCommitted) ConsumerIsolationUnCommitted = ConsumerIsolationMode(kafka.ReadUncommitted) )
View Source
const ( ProducerAckNone = ProducerAckMode(kafka.RequireNone) ProducerAckLeaderOnly = ProducerAckMode(kafka.RequireOne) ProducerAckAllInSync = ProducerAckMode(kafka.RequireAll) )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AutoCommitConsumerGroup ¶
type AutoCommitConsumerGroup struct {
// contains filtered or unexported fields
}
func NewAutoCommitConsumerGroup ¶
func NewAutoCommitConsumerGroup(watcher *watchdog.Client, brokers []string, groupID string, topics []string, handler ConsumeHandler, opts ...ConsumerOption) (*AutoCommitConsumerGroup, error)
type ConsumeHandler ¶
type ConsumeHandler func(ctx context.Context, msg ConsumerMessage) error
type ConsumerIsolationMode ¶
type ConsumerIsolationMode kafka.IsolationLevel
type ConsumerMessage ¶
type ConsumerMessage struct {
// contains filtered or unexported fields
}
func (ConsumerMessage) Key ¶
func (m ConsumerMessage) Key() string
func (ConsumerMessage) Payload ¶
func (m ConsumerMessage) Payload() []byte
func (ConsumerMessage) Time ¶
func (m ConsumerMessage) Time() time.Time
func (ConsumerMessage) Topic ¶
func (m ConsumerMessage) Topic() string
type ConsumerOption ¶
type ConsumerOption func(c *kafka.ReaderConfig)
func ConsumerConsumptionMaxBytes ¶
func ConsumerConsumptionMaxBytes(size int) ConsumerOption
func ConsumerConsumptionMinBytes ¶
func ConsumerConsumptionMinBytes(size int) ConsumerOption
func ConsumerQueueSize ¶
func ConsumerQueueSize(size int) ConsumerOption
func ConsumerTransactionIsolationMode ¶
func ConsumerTransactionIsolationMode(mode ConsumerIsolationMode) ConsumerOption
type ManualCommitConsumerGroup ¶
type ManualCommitConsumerGroup struct {
AutoCommitConsumerGroup
}
func NewManualCommitConsumerGroup ¶
func NewManualCommitConsumerGroup(watcher *watchdog.Client, brokers []string, groupID string, topics []string, handler ConsumeHandler, opts ...ConsumerOption) (*ManualCommitConsumerGroup, error)
func (*ManualCommitConsumerGroup) Commit ¶
func (c *ManualCommitConsumerGroup) Commit(ctx context.Context, msg ConsumerMessage) error
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(watcher *watchdog.Client, brokers []string, opts ...ProducerOption) *Producer
type ProducerAckMode ¶
type ProducerAckMode kafka.RequiredAcks
type ProducerOption ¶
type ProducerOption func(w *kafka.Writer)
func ProducerInAsync ¶
func ProducerInAsync() ProducerOption
func ProducerThroughputSettings ¶
func ProducerThroughputSettings(batchSize int, batchWait time.Duration) ProducerOption
func ProducerWithAckMode ¶
func ProducerWithAckMode(mode ProducerAckMode) ProducerOption
func ProducerWithPartitioningMode ¶
func ProducerWithPartitioningMode(mode ProducerPartitioningModeOpt) ProducerOption
type ProducerPartitioningModeOpt ¶
type ProducerPartitioningModeOpt func(w *kafka.Writer)
func ProducerWithPartitioningCRC32 ¶
func ProducerWithPartitioningCRC32(consistent bool) ProducerPartitioningModeOpt
func ProducerWithPartitioningHash ¶
func ProducerWithPartitioningHash(hashFunc hash.Hash32) ProducerPartitioningModeOpt
func ProducerWithPartitioningMurmur2 ¶
func ProducerWithPartitioningMurmur2(consistent bool) ProducerPartitioningModeOpt
Click to show internal directories.
Click to hide internal directories.