titan

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2020 License: MIT Imports: 3 Imported by: 0

README

titan

Event Listener for Go using Kafka

Install
go get github.com/ramadani/titan
Dependencies
  • github.com/Shopify/sarama
Usage
Emitter

Create event

type userRegistration struct {
	Name  string `json:"name"`
	Email string `json:"email"`
}

type userRegisteredEvent struct {
	data *userRegistration
}

func (u userRegisteredEvent) Header() string {
	return "userRegistered"
}

func (u userRegisteredEvent) Body() ([]byte, error) {
	return json.Marshal(u.data)
}

Setup emitter

config := sarama.NewConfig()
config.Producer.Retry.Max = 5
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true

addresses := []string{"localhost:9092"}

prd, err := sarama.NewSyncProducer(addresses, config)
if err != nil {
    log.Fatal(err)
}
defer prd.Close()

event := titan.NewEmitter(prd)

Send event

ctx := context.Background()
userRegistered := &userRegistration{
    Name:  "Ramadani",
    Email: "dani@gmail.com",
}

if err = event.Emit(ctx, &userRegisteredEvent{data: userRegistered}); err != nil {
    log.Fatal(err)
}
Event Listener

Create listeners

type sendEmailVerificationListener struct{}

func (l *sendEmailVerificationListener) Handle(ctx context.Context, value []byte) (err error) {
	data := &userRegistration{}
	_ = json.Unmarshal(value, data)

	log.Println("send email verification to", data.Email)
	return
}

Listen the events

saramaCog := sarama.NewConfig()
saramaCog.Version = sarama.V0_10_2_0
saramaCog.Consumer.Offsets.Initial = sarama.OffsetOldest
saramaCog.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin

addresses := []string{"localhost:9092"}
consumerGroup, err := sarama.NewConsumerGroup(addresses, "titan-1", saramaCog)
if err != nil {
    log.Panic(err)
}

ctx, cancel := context.WithCancel(context.Background())
event := titan.DefaultConsumerGroupEventListener(consumerGroup)

_ = event.On(ctx, "userRegistered", []saturn.Listener{
    &sendEmailVerificationListener{},
})

go func() {    
    if err = event.Listen(ctx); err != nil {
        log.Panic(err)
    }
}()

<-event.Ready()

log.Println("Titan up and running!...")

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
    log.Println("terminating: context cancelled")
case <-sigterm:
    log.Println("terminating: via signal")
}
cancel()

if err = consumerGroup.Close(); err != nil {
    log.Panicf("Error closing consumer group: %v", err)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewEmitter

func NewEmitter(prd sarama.SyncProducer) saturn.Emitter

Types

type EventListener

type EventListener interface {
	saturn.EventListener
	Ready() <-chan struct{}
}

func DefaultConsumerGroupEventListener

func DefaultConsumerGroupEventListener(consumer sarama.ConsumerGroup) EventListener

func NewConsumerGroupEventListener

func NewConsumerGroupEventListener(consumer sarama.ConsumerGroup, evLst eventListenersMap) EventListener

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL