kafka

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2021 License: Apache-2.0 Imports: 8 Imported by: 0

README

Kafka

Broker: Kafka

You can run examples by replace configs use your kafka instance configs.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(config *ConsumerConfig) *Consumer

func (*Consumer) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Consumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*Consumer) Start

func (c *Consumer) Start()

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(handler ConsumerHandler)

type ConsumerConfig

type ConsumerConfig struct {
	Addr  []string
	Topic []string
	Gid   string

	EnableSASLAuth bool
	SASLMechanism  string
	SASLUser       string
	SASLPassword   string
	SASLHandshake  bool

	DialTimeout time.Duration

	ConsumeOldest     bool
	EnableReturnError bool

	ClientID string
}

type ConsumerHandler

type ConsumerHandler func(context.Context, *sarama.ConsumerMessage) error

type ProducerConfig

type ProducerConfig struct {
	Addr  []string
	Topic []string

	EnableSASLAuth bool
	SASLMechanism  string
	SASLUser       string
	SASLPassword   string
	SASLHandshake  bool

	DialTimeout      time.Duration
	SlowSendDuration time.Duration

	EnableReturnSuccess bool

	ClientID string
}

type SyncProducer

type SyncProducer struct {
	// contains filtered or unexported fields
}

func NewSyncProducer

func NewSyncProducer(config *ProducerConfig) *SyncProducer

func (*SyncProducer) Close

func (sp *SyncProducer) Close() error

func (*SyncProducer) SendSyncMsg

func (sp *SyncProducer) SendSyncMsg(ctx context.Context, content string) error

type TraceInterceptor

type TraceInterceptor struct {
	TraceID string
}

func (*TraceInterceptor) OnSend

func (ti *TraceInterceptor) OnSend(message *sarama.ProducerMessage)

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL