kafka

package
v0.0.33 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2022 License: MIT Imports: 9 Imported by: 0

README

Introduction

Usage

For Initialization:

import (
    // ...
    "github.com/hirpc/hrpc/mq/kafka"
    "github.com/hirpc/hrpc/log"
    "github.com/hirpc/hrpc/option"
)

func main() {
    s, err := hrpc.NewServer(
		option.WithMessageQueues(kafka.New(kafka.WithVersion("1.1.1"))),
	)
    if err != nil {
        panic(err)
    }
    // ....
}

For Producer:

import (
    "github.com/hirpc/hrpc/log"
    "github.com/hirpc/hrpc/mq/kafka"
)

func Foo() error {
    if err := kafka.Produce(
		"TOPIC_NAME",
		*kafka.NewProduceMessage([]byte("MESSAGE")),
	); err != nil {
		log.WithFields(ctx).Error(err)
		return err
	}
	return nil
}

For Consumer:

import (
    // ...
    "github.com/hirpc/hrpc/mq/kafka"
    "github.com/hirpc/hrpc/log"
    "github.com/hirpc/hrpc/option"
)

type tmp struct{}

// Handle will receive messages from the kafka with topic, key, value, partition, offset/
// Every message received should return nil or error. The difference between these two values is:
//  nil   -> do commit for this message
//  error -> do NOT commit for this message
func (t tmp) Handle(ctx context.Context, topic string, key, value []byte, partition int32, offset int64) error {
	log.WithFields(ctx, "KKK", "Kafka").Warn(topic, string(value), partition, offset)
	return nil
}

func main() {
    s, err := hrpc.NewServer(
        // ...
		option.WithMessageQueues(kafka.New(kafka.WithVersion("1.1.1"))),
        // ...
		option.WithHealthCheck(),
	)
    if err != nil {
        panic(err)
    }

    kafka.RegisterGroupConsumer(hrpc.BackgroundContext(), tmp{}, "GROUP_NAME", "TOPIC_NAME")

    if err := s.Serve(); err != nil {
		panic(err)
	}
}

Documentation

Index

Constants

View Source
const (
	// OffsetNewest stands for the log head offset, i.e. the offset that will be
	// assigned to the next message that will be produced to the partition. You
	// can send this to a client's GetOffset method to get this offset, or when
	// calling ConsumePartition to start consuming new messages.
	OffsetNewest int64 = -1
	// OffsetOldest stands for the oldest offset available on the broker for a
	// partition. You can send this to a client's GetOffset method to get this
	// offset, or when calling ConsumePartition to start consuming from the
	// oldest offset that is still available on the broker.
	OffsetOldest int64 = -2
)

Variables

This section is empty.

Functions

func Client

func Client() sarama.Client

func Consume

func Consume(topic string, partition int, f func(m *Message)) error

func Produce

func Produce(topic string, msgs ...Message) error

func ProduceAsync

func ProduceAsync(topic string, msg Message) error

func RegisterGroupConsumer

func RegisterGroupConsumer(ctx context.Context, h Handler, groupID string, topics ...string)

Types

type Handler

type Handler interface {
	Handle(ctx context.Context, topic string, key, value []byte, partition int32, offset int64) error
}

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

func New

func New(opts ...Option) *Kafka

func (*Kafka) Connect

func (k *Kafka) Connect() error

func (*Kafka) Destory

func (k *Kafka) Destory()

func (*Kafka) Load

func (k *Kafka) Load(src []byte) error

Load can load a set of configurations represented by a JSON string However, it is invalid if someone provides options at the contruction function Because the configurations provided at the New() function have the highest priority

func (*Kafka) Name

func (k *Kafka) Name() string

type Message

type Message struct {
	Topic     string
	Key       []byte
	Value     []byte
	Offset    int64
	Partition int32
	Timestamp time.Time
}

Message represents the kafka message

func NewProduceMessage

func NewProduceMessage(value []byte) *Message

type Option

type Option func(*Options)

func WithAcks

func WithAcks(ack RequiredAcks) Option

func WithAutoCommit

func WithAutoCommit() Option

func WithOffsetInitial

func WithOffsetInitial(i int64) Option

func WithSASL

func WithSASL(username, password string) Option

func WithVersion added in v0.0.10

func WithVersion(v string) Option

type Options

type Options struct {
	Brokers       string       `json:"brokers"`
	SASL          sasl         `json:"sasl"`
	Acks          RequiredAcks `json:"acks"`
	AutoCommit    bool         `json:"auto_commit"`
	OffsetInitial int64        `json:"offset_initial"`
	// contains filtered or unexported fields
}

type RequiredAcks

type RequiredAcks int16
const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAcks = -1
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL