kafka

package
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// OffsetNewest stands for the log head offset, i.e. the offset that will be
	// assigned to the next message that will be produced to the partition. You
	// can send this to a client's GetOffset method to get this offset, or when
	// calling ConsumePartition to start consuming new messages.
	OffsetNewest int64 = -1
	// OffsetOldest stands for the oldest offset available on the broker for a
	// partition. You can send this to a client's GetOffset method to get this
	// offset, or when calling ConsumePartition to start consuming from the
	// oldest offset that is still available on the broker.
	OffsetOldest int64 = -2
)

Variables

This section is empty.

Functions

func Client

func Client() sarama.Client

func Consume

func Consume(topic string, partition int, f func(m *Message)) error

func Produce

func Produce(topic string, msgs ...Message) error

func ProduceAsync

func ProduceAsync(topic string, msg Message) error

func RegisterGroupConsumer

func RegisterGroupConsumer(ctx context.Context, h Handler, groupID string, topics ...string)

Types

type Handler

type Handler interface {
	Handle(ctx context.Context, topic string, key, value []byte, partition int32, offset int64) error
}

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

func New

func New(opts ...Option) *Kafka

func (*Kafka) Connect

func (k *Kafka) Connect() error

func (*Kafka) Destory

func (k *Kafka) Destory() error

func (*Kafka) Load

func (k *Kafka) Load(src []byte) error

Load can load a set of configurations represented by a JSON string However, it is invalid if someone provides options at the contruction function Because the configurations provided at the New() function have the highest priority

func (*Kafka) Name

func (k *Kafka) Name() string

type Message

type Message struct {
	Topic     string
	Key       []byte
	Value     []byte
	Offset    int64
	Partition int32
	Timestamp time.Time
}

Message represents the kafka message

func NewProduceMessage

func NewProduceMessage(value []byte) *Message

type Option

type Option func(*Options)

func WithAcks

func WithAcks(ack RequiredAcks) Option

func WithAutoCommit

func WithAutoCommit() Option

func WithOffsetInitial

func WithOffsetInitial(i int64) Option

func WithSASL

func WithSASL(username, password string) Option

type Options

type Options struct {
	Brokers       string       `json:"brokers"`
	SASL          sasl         `json:"sasl"`
	Acks          RequiredAcks `json:"acks"`
	AutoCommit    bool         `json:"auto_commit"`
	OffsetInitial int64        `json:"offset_initial"`
}

type RequiredAcks

type RequiredAcks int16
const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAcks = -1
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL