kafka

package
v1.1.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2023 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RandStringBytes

func RandStringBytes(n int) string

Types

type Client

type Client struct {
	Backoff backoff.BackOff
	// contains filtered or unexported fields
}

func NewKafkaClient

func NewKafkaClient(cfg *common_utils.BaseConfig) *Client

func (*Client) Close

func (k *Client) Close() error

func (*Client) CreateTopic

func (k *Client) CreateTopic(topic string, numPart int) error

func (*Client) IsReaderConnected

func (k *Client) IsReaderConnected() bool

func (*Client) IsWriters

func (k *Client) IsWriters() bool

func (*Client) ListTopics

func (k *Client) ListTopics() []kafka.Topic

func (*Client) Listen

func (k *Client) Listen(ctx context.Context, handler KafkaHandler) error

Listen manual listen need call msg.Commit() when process done recommend for this process

func (*Client) NewConsumer

func (k *Client) NewConsumer()

func (*Client) NewPublisher

func (k *Client) NewPublisher() error

func (*Client) Publish

func (k *Client) Publish(ctx context.Context, topic string, msg any) error

type IClient

type IClient interface {
	Listen(ctx context.Context, handler KafkaHandler) error
	NewConsumer()
	IsWriters() bool
	Close() error

	NewPublisher() error
	Publish(ctx context.Context, topic string, msg any) error

	IsReaderConnected() bool
	// contains filtered or unexported methods
}

type KafkaHandler

type KafkaHandler interface {
	Process(context.Context, *Message) error
}

type Message

type Message struct {
	Offset        int64  `json:"offset,omitempty"`
	Partition     int    `json:"partition,omitempty"`
	Topic         string `json:"topic,omitempty"`
	Key           string `json:"key,omitempty"`
	Body          []byte `json:"body,omitempty"`
	Timestamp     int64  `json:"timestamp,omitempty"`
	ConsumerGroup string `json:"consumer_group,omitempty"`
	Commit        func() error
	MoveToDLQ     func() error
	Headers       map[string]string
}

Message define message encode/decode sarama message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL