kafkago

package module
v0.0.0-...-1bd268d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2023 License: MIT Imports: 2 Imported by: 0

README

kafka-go

Wrapper of the kafka client implementing retry and DLQ

Usage
import "fmt"

func LogFunc(l string) {
	fmt.Println(l)
}

func LogErrorFunc(l string) {
	fmt.Println(l)
}

func main() {
	//sample config
	myConfig := &kafkago.KafkaConfig{
		producer:         true,
		consumer:         true,
		server:           "address",
		groupId:          "gid",
		clientId:         "cid",
		autoOffsetReset:  "latest",
		securityProtocol: "SASL_SSL",
		saslMechanism:    "PLAIN",
		saslUser:         "user",
		saslPass:         "pass",
		serviceDLQ:       "mysqServiceDLQ",
	}
	//create the client
	client, err := kafkago.NewClient(myConfig, LogFunc, LogErrorFunc)
	if err != nil {
		//treat it
	}
	//sample topics => channels
	myChannel := make(chan []byte)
	myTopicsAndChannels := map[string]chan []byte{
		"topic1": myChannel,
	}
	//consume
	client.Consume(myTopicsAndChannels)

	//produce
	client.Produce("topic1", []byte("My message"))
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	Disconnect()
	Consume(map[string]chan []byte)
	Produce(topic string, message []byte)
}

type DLQMessage

type DLQMessage struct {
	Topic string `json:"topic"`
	Body  string `json:"body"`
}

type KafkaClient

type KafkaClient struct {
	Consumer *kafka.Consumer
	Producer *kafka.Producer

	TopicDLQ         string
	LogFunction      func(string)
	ErrorLogFunction func(string)
	// contains filtered or unexported fields
}

func NewClient

func NewClient(c *KafkaConfig, logFunction func(string), errorFunction func(string)) (*KafkaClient, error)

func (*KafkaClient) Consume

func (k *KafkaClient) Consume(channels map[string]chan []byte)

func (*KafkaClient) Disconnect

func (k *KafkaClient) Disconnect()

func (*KafkaClient) Produce

func (k *KafkaClient) Produce(topic string, message []byte)

type KafkaConfig

type KafkaConfig struct {
	Producer         bool
	Consumer         bool
	Server           string
	GroupId          string
	ClientId         string
	AutoOffsetReset  string
	SecurityProtocol string
	SaslMechanism    string
	SaslUser         string
	SaslPass         string
	ServiceDLQ       string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL