Documentation ¶
Index ¶
- func RandStringBytes(n int) string
- type Client
- func (k *Client) Close() error
- func (k *Client) CreateTopic(topic string, numPart int) error
- func (k *Client) IsReaderConnected() bool
- func (k *Client) IsWriters() bool
- func (k *Client) ListTopics() []kafka.Topic
- func (k *Client) Listen(ctx context.Context, handler KafkaHandler) error
- func (k *Client) NewConsumer()
- func (k *Client) NewPublisher() error
- func (k *Client) Publish(ctx context.Context, topic string, msg any) error
- type IClient
- type KafkaHandler
- type Message
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RandStringBytes ¶
Types ¶
type Client ¶
type Client struct { Backoff backoff.BackOff // contains filtered or unexported fields }
func NewKafkaClient ¶
func NewKafkaClient(cfg *common_utils.BaseConfig) *Client
func (*Client) IsReaderConnected ¶
func (*Client) ListTopics ¶
func (k *Client) ListTopics() []kafka.Topic
func (*Client) Listen ¶
func (k *Client) Listen(ctx context.Context, handler KafkaHandler) error
Listen manual listen need call msg.Commit() when process done recommend for this process
func (*Client) NewConsumer ¶
func (k *Client) NewConsumer()
func (*Client) NewPublisher ¶
type Message ¶
type Message struct { Offset int64 `json:"offset,omitempty"` Partition int `json:"partition,omitempty"` Topic string `json:"topic,omitempty"` Key string `json:"key,omitempty"` Body []byte `json:"body,omitempty"` Timestamp int64 `json:"timestamp,omitempty"` ConsumerGroup string `json:"consumer_group,omitempty"` Commit func() error MoveToDLQ func() error Headers map[string]string }
Message define message encode/decode sarama message
Click to show internal directories.
Click to hide internal directories.