gokafkaclient

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2019 License: Apache-2.0 Imports: 12 Imported by: 0

README

go-kafka-client

Kafka Golang library using confluent golang library and librdkafka

Build Status codecov Go Report Card CircleCI Coverage Status

Getting Started

You can get the library using go get

go get github.com/ervitis/go-kafka-client

In this repository there is a docker-compose.yml repository where we can try the library using a docker image from landoop

For an example how to use this library, see inside the folder examples.

We can try it if you run the following commands:

go run examples/simple_consumer/main.go

Open another terminal and type:

go run examples/simple_producer/main.go

To stop the consumer send a kill signal to the process

API
Kafka client

To build a client just call the function

package main

import gkc "github.com/ervitis/go-kafka-client"

client := gkc.NewKafkaClient()

Created the client we can set the configuration for the kafka producer and consumer

producer := client.SetProducerConfig(...)
                  .SetProducerTopicConfig("topic", gkc.PartitionAny)
                  .BuildProducer()

consumer := client.SetConsumerConfig(...).BuildConsumer()
Producer

When using the producer, we can set it schema to validate the message it will produce.

producer.SetSchema("topic", "create-user", "1.0.0").ActivateValidator()

Then, produce an event

err := producer.Produce([]byte(`hello`))
if err != nil {
	panic(err)
}
Consumer

Using the consumer function is simple as using the producer, it need the topic from it will
listen the event, two handler functions and an array of condition to filter the events.

The consumer will be waiting until it can read a message, so we should use go routines to handle them.

func handlerEvent(msg []byte) { fmt.Println("ey!") }

func handlerError(msg []byte, err error) { fmt.Println(err) }

conditions := []ConsumerConditions{ {Key: "EventType", Value: "create-user"} }

consumer.Subscribe("topic", handlerEvent, handlerError, conditions)
Prerequisites

This library is using confluent kafka go client version 1.0.0 and librdkafka 1.0.1 version

git clone -b v1.0.1 https://github.com/edenhill/librdkafka

./configure
make
sudo make install

Running the tests

go test -race -v ./...

Built With

  • Golang - GoLang programming language

Versioning

We use SemVer for versioning. For the versions available, see the tags on this repository.

Authors

  • Victor Martin - Initial work - ervitis

License

This project is licensed under the Apache 2.0 - see the LICENSE.md file for details

Documentation

Index

Constants

View Source
const (

	/**
	Type of partition, it will be handled by kafka library
	*/
	PartitionAny = kafka.PartitionAny
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerConditions

type ConsumerConditions struct {
	Key   string
	Value string
}

*

Consumer filter conditions

type ConsumerErrorHandler

type ConsumerErrorHandler func(msg []byte, err error)

*

Consumer error handler

type ConsumerHandler

type ConsumerHandler func(msg []byte)

*

Consumer handler type of function

type JsonValidator

type JsonValidator struct{}

func (*JsonValidator) IsReachable

func (jv *JsonValidator) IsReachable(schema schema) bool

func (*JsonValidator) ValidateData

func (jv *JsonValidator) ValidateData(msg []byte, schema schema) (bool, error)

type KafkaClient

type KafkaClient struct {
	// contains filtered or unexported fields
}

*

Kafka client

func NewKafkaClient

func NewKafkaClient() *KafkaClient

* Constructor of the kafka client

func (*KafkaClient) BuildConsumer

func (kc *KafkaClient) BuildConsumer() (*consumerClient, error)

* Consumer builder

func (*KafkaClient) BuildProducer

func (kc *KafkaClient) BuildProducer() (*producerClient, error)

* Producer builder

func (*KafkaClient) SetConsumerConfig

func (kc *KafkaClient) SetConsumerConfig(cfg map[string]interface{}) *KafkaClient

* Consumer config setter

func (*KafkaClient) SetProducerConfig

func (kc *KafkaClient) SetProducerConfig(cfg map[string]interface{}) *KafkaClient

* Producer config setter

func (*KafkaClient) SetProducerTopicConfig

func (kc *KafkaClient) SetProducerTopicConfig(topicName string, partitionType int32) *KafkaClient

* Topic configuration setter for the producer. It sets what partition should be write the message into the topic passed as parameter

func (*KafkaClient) SetTimeoutPolling

func (kc *KafkaClient) SetTimeoutPolling(polling int) *KafkaClient

* Timeout polling setter for the consumer when reading messages from kafka

type ProducerHeader

type ProducerHeader struct {
	Key   string
	Value string
}

*

Producer header

type Validator

type Validator interface {
	ValidateData(msg []byte, schema schema) (bool, error)
	IsReachable(schema schema) bool
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL