kafka

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2020 License: MIT Imports: 12 Imported by: 0

README

package pubsub/kafka

package pubsub/kafka provides a set of interfaces for publishing and subscribing events from Kafka.

Usage

Basic

import (
	"github.com/Shopify/sarama"

	"github.com/mo-work/mo-kit/pubsub/kafka"
)

func main() {
  consumer, err := kafka.NewConsumer(kafkaBrokers, kafkaGroup, "2.2.0", log.With(logger, "component", "kafka"), nr)
	if err != nil {
		panic(fmt.Sprintf("Error initializing kafka: %s", err))
	}
	handlers := no.NewEventHandlers(us)

	errs := make(chan error)

	go func() {
		consumer.Start(kafkaTopics, handlers)
	}()
}

Create event handlers and assign to the module

With topic and key given

import (
	"github.com/Shopify/sarama"

	"github.com/mo-work/mo-kit/pubsub/kafka"
)


func NewEventHandlers(s Service) *kafka.EventHandlers {
	handlers := make(kafka.EventHandlers)
	handlers["moment_created"] = momentHandler(s)

	return &handlers
}

func momentHandler(s Service) kafka.EventHandler {
	return func(message *sarama.ConsumerMessage) error {
    fmt.Println("Message has been received")
		return nil
	}
}

With only topic given

import (
	"github.com/Shopify/sarama"

	"github.com/mo-work/mo-kit/pubsub/kafka"
)


func NewEventHandlers(s Service) *kafka.EventHandlers {
	handlers := make(kafka.EventHandlers)
	handlers["moment"] = momentHandler(s)

	return &handlers
}

func momentHandler(s Service) kafka.EventHandler {
	return func(message *sarama.ConsumerMessage) error {
    fmt.Println("Message has been received")
		return nil
	}
}

With only topic and topic / key combination

import (
	"github.com/Shopify/sarama"

	"github.com/mo-work/mo-kit/pubsub/kafka"
)


func NewEventHandlers(s Service) *kafka.EventHandlers {
	handlers := make(kafka.EventHandlers)
  handlers["moment_unique"] = momentUniqueHandler(s)
	handlers["moment"] = momentHandler(s)


	return &handlers
}

func momentHandler(s Service) kafka.EventHandler {
	return func(message *sarama.ConsumerMessage) error {
    fmt.Println("Message has been received")
		return nil
	}
}

func momentUniqueHandler(s Service) kafka.EventHandler {
	return func(message *sarama.ConsumerMessage) error {
    fmt.Println("Moment Unique Event has been received")
		return nil
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a Sarama consumer group consumer

func NewConsumer

func NewConsumer(
	brokers string,
	group string,
	version string,
	logger log.Logger,
	nr newrelic.Application,
	handler func(error),
) (c *Consumer, err error)

NewConsumer creates new kakfa consumer, and wires all provided middlewares

func (*Consumer) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) Close

func (c *Consumer) Close() error

Close is to close current Kafka session.

func (*Consumer) ConsumeClaim

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Consumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*Consumer) Start

func (c *Consumer) Start(topics string, handlers *EventHandlers) error

Start is to start listening kafka events

type EventHandler

type EventHandler func(message *sarama.ConsumerMessage) (err error)

EventHandler repersents an event handling logic for specific topic / key

type EventHandlers

type EventHandlers map[string]EventHandler

EventHandlers is a hash object with "topic_key" => event handler

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer provides means of emitting events to Kafka

func NewProducer

func NewProducer(brokers string, clientID string) (*Producer, error)

NewProducer get a pointer to an instance of a producer so messages can be sent

func (Producer) SendEvent added in v0.1.0

func (p Producer) SendEvent(event *pubsub.Event) error

SendEvent sends a given Event

func (Producer) SendMessage

func (p Producer) SendMessage(topic string, key string, val string) error

SendMessage sends a Kafka message

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL