kafkaesque

package module
v0.0.0-...-b99eba7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2020 License: Apache-2.0 Imports: 4 Imported by: 0

README

kafkaesque logo

Reactive Observables for Apache Kafka

Go Report Card PkgGoDev License
This is a simple package for using Kafka in Golang with the Observer-pattern.

Example

We have two Kafka-topics in JSON and want to serialize the events in a pipeline.

Create new type

in order to serialize our events

type BookRequest struct {
	Book   string `json:"book"`
	Person string `json:"person"`
}
Create a new config
const config = &kafkaesque.Config{
		AutoOffsetReset:  kafkaesque.AutoOffsetReset.Latest,
		BootstrapServers: []string{"localhost:9092"},
		IsolationLevel:   kafkaesque.IsolationLevel.ReadCommitted,
		MessageMaxBytes:  1048588,
		ClientID:         "test_1",
		GroupID:          "test_group",
		SecurityProtocol: kafkaesque.SecurityProtocol.Plaintext,
    }
Create the consumer
testConsumer, err := kafkaesque.NewConsumer(config, "book-requests", "external-book-requests")
if err != nil {
	    panic(err)
    }
Serialize BookRequest
bookRequests := testConsumer.Events.
		Map(func(_ context.Context, item interface{}) (interface{}, error) {
			var request BookRequest
			err := json.Unmarshal(kafkaesque.ValueToBytes(item), &request)
			return request, err
		})
Show Types
for event := range bookRequests.Observe() {
		log.Println(fmt.Sprintf("%T", event.V))
	}
Test

Test Program

  1. Start your Confluent Plattform:

    cd example
    docker-compose up
    
  2. Start the program:

    go run . -name test_consumer -topics test
    
  3. In a new terminal run:

    docker exec -it broker bin/kafka-console-producer --topic test --bootstrap-server localhost:9092
    
  4. Insert a BookRequest in JSON:

    {"book": "Moby Dick", "person": "Max Musterknabe"}
    

    The output should be:

    2020/10/03 15:32:08 Showing events for [test test1 test2] on [localhost:9092]:
    2020/10/03 15:32:16 Book:	Moby Dick	Person:	Max Musterknabe
    

    Or start the example as producer:

    go run . -mode producer -name test_producer -topics test
    

    The output should be:

    2020/10/03 15:32:08 Showing events for [test test1 test2] on [localhost:9092]:
    2020/10/03 15:32:16 Book:	10 Awesome Ways to Photograph Blue Bottles	Person:	Pamela Osborne
    2020/10/03 15:33:02 Book:	10 Awesome Ways to Photograph Blue Bottles	Person:	Pamela Osborne
    2020/10/03 15:33:02 Book:	21 Myths About Blue bottles Debunked	Person:	Inayah Brown
    2020/10/03 15:33:02 Book:	How to Make Your Own Vast Hat for less than £5	Person:	Ikrah Blair
    2020/10/03 15:33:03 Book:	An analysis of handsome jugs	Person:	Aurora Stafford
                                    ...
    

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AutoOffsetReset = struct {
	Earliest AutoOffsetResetOption
	Latest   AutoOffsetResetOption
	Largest  AutoOffsetResetOption
	Smallest AutoOffsetResetOption
}{
	Earliest: "earliest",
	Latest:   "latest",
	Largest:  "largest",
	Smallest: "smallest",
}

AutoOffsetReset commit the message you brought from Kafka using Zookeeper to persist the last 'offset' which it read.

View Source
var IsolationLevel = struct {
	ReadUncommitted IsolationLevelOption
	ReadCommitted   IsolationLevelOption
}{
	ReadUncommitted: "read_uncommitted",
	ReadCommitted:   "read_committed",
}

IsolationLevel the kafka isolation level for transactions.

View Source
var SecurityProtocol = struct {
	Plaintext     SecurityProtocolOption
	SSL           SecurityProtocolOption
	SASLPlaintext SecurityProtocolOption
	SASLSSL       SecurityProtocolOption
}{
	Plaintext:     "plaintext",
	SSL:           "ssl",
	SASLPlaintext: "sasl_plaintext",
	SASLSSL:       "sasl_ssl",
}

SecurityProtocol the security protocols for kafka.

Functions

func ValueToBytes

func ValueToBytes(e interface{}) []byte

ValueToBytes convert event to byte array

func ValueToString

func ValueToString(e interface{}) string

ValueToString convert event to string.

Types

type AutoOffsetResetOption

type AutoOffsetResetOption string

AutoOffsetResetOption wether to commit the message you brought from Kafka using Zookeeper to persist the last 'offset' which it read.

type Config

type Config struct {
	Acks                   int
	AutoOffsetReset        AutoOffsetResetOption
	BootstrapServers       []string
	ClientID               string
	GroupID                string
	IsolationLevel         IsolationLevelOption
	MessageMaxBytes        uint32
	SecurityProtocol       SecurityProtocolOption
	SslCertificateLocation string
	SslCaLocation          string
	SslKeyLocation         string
	SslValidate            bool
}

Config the configuration for your kafka consumer.

func (*Config) Map

func (c *Config) Map() *kafka.ConfigMap

Map the Config-struct to a ConfigMap for confluent.

type Consumer

type Consumer struct {
	Config *Config
	Topics []string

	Infos  rxgo.Observable
	Events rxgo.Observable
	// contains filtered or unexported fields
}

Consumer holds the observables and channels in one struct.

func NewConsumer

func NewConsumer(config *Config, topics ...string) (*Consumer, error)

NewConsumer creates a Consumer via config and topics.

func (*Consumer) Close

func (c *Consumer) Close()

Close stop the consumer.

func (*Consumer) Open

func (c *Consumer) Open() (*Consumer, error)

Open start the consumer.

type ErrorEvent

type ErrorEvent struct {
	Error error
}

ErrorEvent will be exported when producer crashes

func (ErrorEvent) String

func (ee ErrorEvent) String() string

type Event

type Event *kafka.Message

Event mapping of confluent message

func ToEvent

func ToEvent(e interface{}) Event

ToEvent convert Item to Event

type Header kafka.Header

Header mapping of kafka header

type InfoEvent

type InfoEvent struct {
	Error   error
	Message Event
}

InfoEvent hold error and message

type IsolationLevelOption

type IsolationLevelOption string

IsolationLevelOption what isolation level to use.

type Producer

type Producer struct {
	Config *Config
	Topic  string
	Key    []byte

	Events rxgo.Observable
	// contains filtered or unexported fields
}

Producer holds the observables and channels in one struct.

func NewProducer

func NewProducer(config *Config, topic string, key []byte, events rxgo.Observable, header ...Header) (*Producer, error)

NewProducer creates a new producer instance from a observable

func (*Producer) Close

func (p *Producer) Close()

Close the producer

func (*Producer) GetInfos

func (p *Producer) GetInfos() rxgo.Observable

GetInfos return infos from producer as Observable

type SecurityProtocolOption

type SecurityProtocolOption string

SecurityProtocolOption what security protocol to use.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL