event

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2024 License: CC0-1.0 Imports: 18 Imported by: 0

README

event

Event Emitter

Supported driver

  • Logger (sender & writer)
  • Kafka (sender)
  • MongoDB outbox (sender & writer)
  • SQL outbox (sender & writer)

Support for hybrid mode, combination of sender and writer

Hybrid mode can be used to combine outbox pattern with direct publish more efficiently. For exmaple, with the combination of SQL writer and Kafka sender, when Publish is called, it will try to write the event in SQL database first, and then asynchronously trying to send the event to kafka topic after that. If the publishing is success, then the SQL writer will delete the record. In an exception case, when the sender is failed to send the event, another service suchs as JobLst could be used to query the database and send the event to kafka.

Usage

API

Publish(ctx context.Context, event, key string, message interface{}, metadata map[string]interface{}) error
Event Config

Event config can be used to map internal event name into actual topic name and to add or replace default metadata

package main

import (
	"github.com/diki-haryadi/govega/event"
)

func main() {
	conf := &EmitterConfig{
		Sender: &DriverConfig{
			Type: "logger",
		},
		EventConfig : &EventConfig{
			EventMap : map[string]string{
				"test" : "kafka-test-topic", // map event test into topic kakfa-test-topic
			},
			Metadata : map[string]map[string]interface{} {
				"test" : { // add metdata on event test
					"schema": "test-schema",	
				},
			}
		}
	}

    ctx := context.Background()

	em, err := event.New(ctx, conf)
    em.Publish(ctx, "test", "t123", "testdata", nil)
}
Example (Logger)
package main

import (
	"github.com/diki-haryadi/govega/event"
)

func main() {
	conf := &EmitterConfig{
		Sender: &DriverConfig{
			Type: "logger",
		},
	}

    ctx := context.Background()

	em, err := event.New(ctx, conf)
    em.Publish(ctx, "test", "t123", "testdata", nil)
}
Example (Kafka Sender)
package main

import (
	"github.com/diki-haryadi/govega/event"
    _ "github.com/diki-haryadi/govega/event/kafka"
)

func main() {
	conf := &event.EmitterConfig{
		Sender: &event.DriverConfig{
			Type: "kafka",
			Config: map[string]interface{}{
				"brokers": []string{"localhost:9092"},
			},
		},
	}

    ctx := context.Background()

	em, err := event.New(ctx, conf)
    em.Publish(ctx, "test", "t123", "testdata", nil)
}
Example (Hybrid Kafka + SQL)
package main

import (
	"github.com/diki-haryadi/govega/database"
	"github.com/diki-haryadi/govega/event"
	"github.com/diki-haryadi/govega/event/sql"
    _ "github.com/diki-haryadi/govega/event/kafka"
    _ "github.com/diki-haryadi/govega/event/sql"
)

func main() {
	dbc := database.DBConfig{
		MasterDSN:     "root:password@(localhost:3306)/outbox?parseTime=true",
		SlaveDSN:      "root:password@(localhost:3306)/outbox?parseTime=true",
		RetryInterval: 10,
		MaxIdleConn:   5,
		MaxConn:       10,
	}

	db := database.New(dbc, database.DriverMySQL)

	conf := &event.EmitterConfig{
		Sender: &event.DriverConfig{
			Type: "kafka",
			Config: map[string]interface{}{
				"brokers": []string{"localhost:9092"},
			},
		},
        Writer: &event.DriverConfig{
			Type: "sql",
			Config: map[string]interface{}{
				"driver": "mysql",
				"table":  "outbox",
				"connection": db,
			},
		},
	}

    ctx := context.Background()

    //Start SQL transaction
    tx, err := db.Master.Begin()

    //Attach transaction into context
	ttx := context.WithValue(ctx, sql.NewSQLTxContext(sql.DefaultContextKey), tx)

	em, err := event.New(ctx, conf)
    em.Publish(ttx, "test", "t123", "testdata", nil)

    //Commit transaction
    tx.Commit()
}

Event Consumer

Supported driver

  • Logger
  • Kafka

Usage

API

//Use add middlewares to actual event handler before accessing the actual handler
Use(middlewares ...EventMiddleware)
//Subscribe to a topic with specific group
Subscribe(ctx context.Context, topic, group string, handler EventHandler) error
//Start activate the consumer and start receiving event
Start() error
//Stop gracefully stop the consumer waiting for all workers to complete before exiting
Stop() error
Event Config

Event config can be used to map internal event name into actual topic name and group name

package main

import (
	"github.com/diki-haryadi/govega/event"
	"github.com/diki-haryadi/govega/log"
)

func main() {
	conf := &ConsumerConfig{
		Listener: &DriverConfig{
			Type: "logger",
		},
		EventConfig : &EventConfig{
			EventMap : map[string]string{
				"test" : "kafka-test-topic", // map event test into topic kakfa-test-topic
			},
			GroupMap : map[string]string{
				"test" : "kafka-test-group", // map group test into group kakfa-test-group
			},
		}
	}

    ctx := context.Background()

	consumer, err := event.NewConsumer(ctx, conf)
	if err != nil {
		//handle error
	}

	consumer.Subscribe(context.Background(), "test", "test",
		func(ctx context.Context, msg *event.EventMessage) error {
			log.WithFields(log.Fields{
				"message": fmt.Sprintf("%+v", msg),
			}).Println("new message")
			return nil
		},
	)
}
Worker Pool Config

Worker pool config can be used to set the number or workes for each topic group subscriber

The number of worker pool for each topic group subscriber will be decided with order:

  • specific topic group from config
  • topic default worker pool from config
  • default worker pool from config
  • default worker pool (1)
package main

import (
	"github.com/diki-haryadi/govega/event"
	"github.com/diki-haryadi/govega/log"
)

func main() {
	conf := &ConsumerConfig{
		Listener: &DriverConfig{
			Type: "logger",
		},
		EventConfig : &EventConfig{
			EventMap : map[string]string{
				"test" : "kafka-test-topic", // map event test into topic kakfa-test-topic
			},
			GroupMap : map[string]string{
				"test" : "kafka-test-group", // map group test into group kakfa-test-group
			},
		},
		WorkerPoolConfig: &WorkerPoolConfig{
			"default": 2, //default worker pool for each subscriber
			"kafka-test-topic": {
				"default": 1, // default worker pool for kafka-test-topic Topic subscriber
				"kafka-test-group": 3 // default worker pool for kafka-test-topic Topic with kafka-test-group Group
			}
		}
	}

    ctx := context.Background()
	consumer, err := event.NewConsumer(ctx, conf)
	if err != nil {
		//handle error
	}

	consumer.Subscribe(context.Background(), "test", "test",
		func(ctx context.Context, msg *event.EventMessage) error {
			log.WithFields(log.Fields{
				"message": fmt.Sprintf("%+v", msg),
			}).Println("new message")
			return nil
		},
	)
}
Example (Kafka)
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"

	"github.com/diki-haryadi/govega/event"
	_ "github.com/diki-haryadi/govega/event/kafka"
	"github.com/diki-haryadi/govega/log"
)

func main() {
	conf := &event.ConsumerConfig{
		Listener: &event.DriverConfig{
			Type: "kafka",
			Config: map[string]interface{}{
				"brokers": []string{"localhost:9092"},
			},
		},
		EventConfig: &event.EventConfig{
			EventMap: map[string]string{
				"test": "kafka-test-topic", // map event test into topic kakfa-test-topic
			},
			GroupMap: map[string]string{
				"test": "kafka-test-group", // map group test into group kakfa-test-group
			},
		},
		WorkerPoolConfig: &event.WorkerPoolConfig{
			"default": 2, //default worker pool for each subscriber
			"kafka-test-topic": map[string]interface{}{
				"default":          1, // default worker pool for kafka-test-topic Topic subscriber
				"kafka-test-group": 3, // default worker pool for kafka-test-topic Topic with kafka-test-group Group
			},
		},
		ConsumeStrategy: &event.DriverConfig{
			Type:   "commit_on_success", // available strategy commit_on_success and always_commit, default: commit_on_success
			Config: map[string]interface{}{},
		},
	}

	ctx := context.Background()
	consumer, err := event.NewConsumer(ctx, conf)
	if err != nil {
		log.WithError(err).Fatalln("failed to configure consumer")
	}

	err = consumer.Subscribe(context.Background(), "test", "test",
		func(_ context.Context, msg *event.EventConsumeMessage) error {
			log.WithFields(log.Fields{
				"message": fmt.Sprintf("%+v", msg),
			}).Println("new message")
			return nil
		},
	)
	if err != nil {
		log.WithError(err).Panicln("failed to subscribe topic")
	}

	if err := consumer.Start(); err != nil {
		log.WithError(err).Fatalln("failed to start consumer")
	}

	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, os.Interrupt)

	<-signalCh

	if err := consumer.Stop(); err != nil {
		log.WithError(err).Errorln("error on stopping consumer")
	}

	log.Println("bye 👋")
}

Documentation

Index

Constants

View Source
const (
	MetaHash    = "hash"
	MetaTime    = "timestamp"
	MetaEvent   = "event"
	MetaVersion = "version"
	MetaDefault = "default"
)
View Source
const (
	DefaultConsumerWorkers = 1
)

Variables

View Source
var (
	ErrConsumerStarted = errors.New("Consumer already started")
)

Functions

func AlwaysCommitStrategy

func AlwaysCommitStrategy(ctx context.Context, message ConsumeMessage, handler EventHandler) error

AlwaysCommitStrategy will always commit the message no matter what is the handler result

func CommitOnSuccessStrategy

func CommitOnSuccessStrategy(ctx context.Context, message ConsumeMessage, handler EventHandler) error

CommitOnSuccessStrategy will only commit the message if handler doesn't return error

func GetConsumerGroupFromContext

func GetConsumerGroupFromContext(ctx context.Context) string

GetConsumerGroupFromContext return consumer group from contet if any

func RegisterConsumeStrategy

func RegisterConsumeStrategy(name string, fn ConsumeStrategyFactory)

RegisterConsumeStrategy register consumestrategy

func RegisterListener

func RegisterListener(name string, fn ListenerFactory)

func RegisterSender

func RegisterSender(name string, fn SenderFactory)

func RegisterWriter

func RegisterWriter(name string, fn WriterFactory)

Types

type Closer

type Closer interface {
	Close() error
}

type ConsumeMessage

type ConsumeMessage interface {
	GetEventConsumeMessage(ctx context.Context) (*EventConsumeMessage, error)
	Commit(ctx context.Context) error
}

type ConsumeStrategy

type ConsumeStrategy func(ctx context.Context, message ConsumeMessage, handler EventHandler) error

type ConsumeStrategyFactory

type ConsumeStrategyFactory func(ctx context.Context, config interface{}) (ConsumeStrategy, error)

func NoConfigConsumeStrategyFactory

func NoConfigConsumeStrategyFactory(strategy ConsumeStrategy) ConsumeStrategyFactory

NoConfigConsumeStrategyFactory util function to create factory for consume strategy which doesn't need any config

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(ctx context.Context, config *ConsumerConfig) (*Consumer, error)

NewConsumer create new instance of consumer

func (*Consumer) Start

func (c *Consumer) Start() error

Start activate the consumer and start receiving event

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop gracefully stop the consumer waiting for all workers to complete before exiting

func (*Consumer) StopContext

func (c *Consumer) StopContext(ctx context.Context) error

StopContext gracefully stop the consumer or until the context timeout

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(ctx context.Context, topic, group string,
	handler EventHandler) error

Subscribe to a topic with specific group this should be call before Start the consumer

func (*Consumer) Use

func (c *Consumer) Use(middlewares ...EventMiddleware)

Use add middlewares to actual event handler before accessing the actual handler Please add your middlewares before calling subscribe or it may not work properly

func (*Consumer) WithConsumeStrategy

func (c *Consumer) WithConsumeStrategy(strategy ConsumeStrategy)

WithConsumeStrategy, set consume strategy for this consumer

type ConsumerConfig

type ConsumerConfig struct {
	Listener         *DriverConfig     `json:"listener" mapstructure:"listener"`
	EventConfig      *EventConfig      `json:"event_config" mapstructure:"event_config"`
	WorkerPoolConfig *WorkerPoolConfig `json:"worker_pool_config" mapstructure:"worker_pool_config"`
	ConsumeStrategy  *DriverConfig     `json:"consume_strategy" mapstructure:"consume_strategy"`
}

type DriverConfig

type DriverConfig struct {
	Type   string      `json:"type" mapstructure:"type"`
	Config interface{} `json:"config" mapstructure:"config"`
}

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, config *EmitterConfig) (*Emitter, error)

func (*Emitter) Publish

func (e *Emitter) Publish(ctx context.Context, event, key string, message interface{}, metadata map[string]interface{}) error

type EmitterConfig

type EmitterConfig struct {
	Sender      *DriverConfig `json:"sender" mapstructure:"sender"`
	Writer      *DriverConfig `json:"writer" mapstructure:"writer"`
	EventConfig *EventConfig  `json:"event_config" mapstructure:"event_config"`
}

type EventConfig

type EventConfig struct {
	Metadata map[string]map[string]interface{} `json:"metadata,omitempty" mapstructure:"metadata"`
	EventMap map[string]string                 `json:"event_map,omitempty" mapstructure:"event_map"`
	GroupMap map[string]string                 `json:"group_map,omitempty" mapstructure:"group_map"`
}

func NewEventConfig

func NewEventConfig() *EventConfig

type EventConsumeMessage

type EventConsumeMessage struct {
	Topic    string
	Key      string
	Metadata map[string]interface{}
	Data     []byte
}

func NewEventConsumeMessage

func NewEventConsumeMessage(v []byte) (*EventConsumeMessage, error)

NewEventConsumeMessage return event consume message from byte data

type EventHandler

type EventHandler func(ctx context.Context, message *EventConsumeMessage) error

type EventLogger

type EventLogger struct {
	// contains filtered or unexported fields
}

func NewEventLogger

func NewEventLogger(ctx context.Context, config interface{}) (*EventLogger, error)

func (*EventLogger) Delete

func (e *EventLogger) Delete(ctx context.Context, message *EventMessage) error

func (*EventLogger) Listen

func (e *EventLogger) Listen(ctx context.Context, topic, group string) (Iterator, error)

func (*EventLogger) Next

func (*EventLogger) Send

func (e *EventLogger) Send(ctx context.Context, message *EventMessage) error

type EventMessage

type EventMessage struct {
	Topic    string                 `json:"-"`
	Key      string                 `json:"-"`
	Data     interface{}            `json:"data,omitempty" mapstructure:"data"`
	Metadata map[string]interface{} `json:"metadata,omitempty" mapstructure:"metadata"`
	RawData  []byte                 `json:"-"` //To provide raw data to consumer
}

func (*EventMessage) Hash

func (m *EventMessage) Hash() (string, error)

func (*EventMessage) ToBytes

func (m *EventMessage) ToBytes() ([]byte, error)

type EventMiddleware

type EventMiddleware func(next EventHandler) EventHandler

type Iterator

type Iterator interface {
	Next(ctx context.Context) (ConsumeMessage, error)
}

type IteratorFunc

type IteratorFunc func(ctx context.Context) (ConsumeMessage, error)

func (IteratorFunc) Next

type Job

type Job func(ctx context.Context) error

type Listener

type Listener interface {
	Listen(ctx context.Context, topic, group string) (Iterator, error)
}

func EventLoggerListener

func EventLoggerListener(ctx context.Context, config interface{}) (Listener, error)

type ListenerFactory

type ListenerFactory func(ctx context.Context, config interface{}) (Listener, error)

type ListenerWorkerPool

type ListenerWorkerPool struct {
	// contains filtered or unexported fields
}

type OutboxRecord

type OutboxRecord struct {
	ID        string    `json:"id,omitempty" mapstructure:"id"`
	Topic     string    `json:"topic,omitempty" mapstructure:"topic"`
	Key       string    `json:"key,omitempty" mapstructure:"key"`
	Value     string    `json:"value,omitempty" mapstructure:"value"`
	CreatedAt time.Time `json:"created_at,omitempty" mapstructure:"created_at"`
}

OutboxRecord outbox model

func OutboxFromMessage

func OutboxFromMessage(msg *EventMessage) (*OutboxRecord, error)

func (*OutboxRecord) GenerateID

func (o *OutboxRecord) GenerateID() *OutboxRecord

GenerateID generate record ID

func (*OutboxRecord) Hash

func (o *OutboxRecord) Hash() []byte

Hash calculate request hash

type Sender

type Sender interface {
	Send(ctx context.Context, message *EventMessage) error
}

func EventLoggerSender

func EventLoggerSender(ctx context.Context, config interface{}) (Sender, error)

type SenderFactory

type SenderFactory func(ctx context.Context, config interface{}) (Sender, error)

type WorkerPoolConfig

type WorkerPoolConfig map[string]interface{}

type Writer

type Writer interface {
	Sender
	Delete(ctx context.Context, message *EventMessage) error
}

func EventLoggerWriter

func EventLoggerWriter(ctx context.Context, config interface{}) (Writer, error)

type WriterFactory

type WriterFactory func(ctx context.Context, config interface{}) (Writer, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL