gobus

package module
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2022 License: MIT Imports: 7 Imported by: 0

README

go-bus

Universal pub-sub library for rabbitmq and kafka in Go
Dev version - only for tests

Installation

go get github.com/hetacode/go-bus

Consumer implementation

General implementation

// fakeLogger is only for tests - you should use own implementation with own logger
type fakeLogger struct{}

func (l *fakeLogger) Infof(message string, args ...interface{}) {
	log.Printf(message, args...)
}
func (l *fakeLogger) Errorf(message string, args ...interface{}) {
	log.Printf(message, args...)
}

func main() {
	eventsMapper := new(goeh.EventsMapper)
	eventsMapper.Register(new(TestEvent))

	done := make(<-chan os.Signal)

	kind := gobus.RabbitMQServiceBusOptionsFanOutKind
	bus := gobus.NewRabbitMQServiceBus(eventsMapper,  new(fakeLogger), &gobus.RabbitMQServiceBusOptions{
		Kind:      &kind,
		Exchanage: "test-ex",
		Queue:     "test-queue",
		Server:    "amqp://rabbit:5672",
	})
	go func() {
		msgCh, errCh := bus.Consume()
		for {
			select {
			case msg := <-msgCh:
				log.Printf("Do something with received event: %+v", msg)
			case err := <-errCh:
				if err != nil {
					panic(err)
				}
			}
		}
	}()

	<-done
	log.Printf("the end")
}
Bind queue to multiple exchanges

Consumer queue can be bind to multiple exchanges.

Important! In this case can be only zero or one routing key attach between exchange <-> queue

kind := gobus.RabbitMQServiceBusOptionsFanOutKind
bus := gobus.NewRabbitMQServiceBus(eventsMapper,  new(fakeLogger), &gobus.RabbitMQServiceBusOptions{
		Kind:      &kind,
	Exchange: "test-ex1|test-ex2",
	Queue:     "test-queue",
	RoutingKey: "" // or "routing-key"
	Server:    "amqp://rabbit:5672",
})
Bind queue to exchange with multiple routing keys

Consumer queue can be bind to only one exchange with multiple routing keys.

kind := gobus.RabbitMQServiceBusOptionsFanOutKind
bus := gobus.NewRabbitMQServiceBus(eventsMapper,  new(fakeLogger), &gobus.RabbitMQServiceBusOptions{
		Kind:      &kind,
	Exchange: "test-ex", // one exchange
	Queue:     "test-queue",
	RoutingKey: "routing-key1|routing-key2"
	Server:    "amqp://rabbit:5672",
})

Producer implementation

// fakeLogger is only for tests - you should use own implementation with own logger
type fakeLogger struct{}

func (l *fakeLogger) Infof(message string, args ...interface{}) {
	log.Printf(message, args...)
}
func (l *fakeLogger) Errorf(message string, args ...interface{}) {
	log.Printf(message, args...)
}

func main() {
	eventsMapper := new(goeh.EventsMapper)
	eventsMapper.Register(new(TestEvent))

	kind := gobus.RabbitMQServiceBusOptionsFanOutKind
	bus := gobus.NewRabbitMQServiceBus(eventsMapper, new(fakeLogger), &gobus.RabbitMQServiceBusOptions{
		Kind:      &kind,
		Exchanage: "test-ex",
		Server:    "amqp://rabbit:5672",
	})
    
    n := 1
	for n < 10 {
		bus.Publish(&TestEvent{
			EventData: &goeh.EventData{ID: strconv.Itoa(n)},
			FullName:  fmt.Sprintf("Janusz %d", n),
		})
    }
    
	log.Printf("the end")
}

Retry option

It's a retry mechanism that can be add to both kafka and rabbit implementation of service bus. When send event function return any error a library try send the event again after some delay.

type RetryOptions struct {
	Attempts int
	Delay    time.Duration
}

The RetryOption can be add to the both RabbitMQServiceBusOptions or KafkaServiceBusOptions

Documentation

Index

Constants

View Source
const (
	// RabbitMQServiceBusOptionsTopicKind topic kind of rabbitmq
	RabbitMQServiceBusOptionsTopicKind string = "topic"
	// RabbitMQServiceBusOptionsFanOutKind fanout kind of rabbitmq
	RabbitMQServiceBusOptionsFanOutKind string = "fanout"
)

Variables

View Source
var ServiceBusModeNameMapping map[ServiceBusMode]string = map[ServiceBusMode]string{
	PublisherServiceBusMode: "publisher",
	ConsumerServiceBusMode:  "consumer",
}

Functions

This section is empty.

Types

type BusMiddleware added in v0.0.8

type BusMiddleware interface {
	Before(ev goeh.Event, err error)
	After(ev goeh.Event, err error)
}

type KafkaServiceBus

type KafkaServiceBus struct {
	// contains filtered or unexported fields
}

KafkaServiceBus implementation of service bus

func (*KafkaServiceBus) Consume

func (s *KafkaServiceBus) Consume() (<-chan goeh.Event, <-chan error)

Consume events from kafka partition

func (*KafkaServiceBus) Publish

func (s *KafkaServiceBus) Publish(event goeh.Event) error

Publish event to kafka topic Event ID should represent kafka message key - it means that can be same for multiple events which should were put on the same partition

func (*KafkaServiceBus) PublishWithRouting added in v0.0.7

func (s *KafkaServiceBus) PublishWithRouting(key string, event goeh.Event) error

PublishWithRouting - routing is only for RabbitMQ implementation so Kafka version should behave like Publish

type KafkaServiceBusOptions

type KafkaServiceBusOptions struct {
	Servers           string
	Topic             string
	Retry             *RetryOptions
	MiddlewareOptions MiddlewareOptions

	// For consumer
	GroupName               string
	IsGroupNameAutoGenerate bool
}

KafkaServiceBusOptions configuration struct for kafka service bus

type MiddlewareOptions added in v0.0.8

type MiddlewareOptions struct {
	ConsumerMiddlewares  []BusMiddleware
	PublisherMiddlewares []BusMiddleware
}

type RabbitMQServiceBus

type RabbitMQServiceBus struct {
	// contains filtered or unexported fields
}

RabbitMQServiceBus implementation of service bus

func (*RabbitMQServiceBus) Consume

func (b *RabbitMQServiceBus) Consume() (<-chan goeh.Event, <-chan error)

Consume events

func (*RabbitMQServiceBus) Publish

func (b *RabbitMQServiceBus) Publish(event goeh.Event) error

Publish message

func (*RabbitMQServiceBus) PublishWithRouting added in v0.0.7

func (b *RabbitMQServiceBus) PublishWithRouting(key string, event goeh.Event) error

PublishWithRouting - send message with specific routing key

type RabbitMQServiceBusOptions

type RabbitMQServiceBusOptions struct {
	Server string
	Queue  string
	// Exchange - queue can be bind to multiple exchanges ex1|ex2...
	Exchange string
	// RoutingKey - queue can be bind to exchange with multiple routing keys rk1|rk2...
	RoutingKey        string
	Kind              *string
	Retry             *RetryOptions
	MiddlewareOptions MiddlewareOptions
}

RabbitMQServiceBusOptions struct with configuration for rabbitmq service bus

type RetryOptions added in v0.0.7

type RetryOptions struct {
	Attempts int
	Delay    time.Duration
}

type ServiceBus

type ServiceBus interface {
	Consume() (<-chan goeh.Event, <-chan error)
	Publish(message goeh.Event) error
	PublishWithRouting(key string, message goeh.Event) error
}

ServiceBus general abstraction for bus

func NewKafkaServiceBus

func NewKafkaServiceBus(mode ServiceBusMode, eventsMapper *goeh.EventsMapper, options *KafkaServiceBusOptions, logger ServiceBusLogger) ServiceBus

NewKafkaServiceBus instance eventsMapper is using only in consumer mode

func NewRabbitMQServiceBus

func NewRabbitMQServiceBus(mode ServiceBusMode, eventsMapper *goeh.EventsMapper, logger ServiceBusLogger, options *RabbitMQServiceBusOptions) ServiceBus

NewRabbitMQServiceBus new instance of queue

type ServiceBusLogger added in v0.0.7

type ServiceBusLogger interface {
	Infof(message string, args ...interface{})
	Errorf(message string, args ...interface{})
}

type ServiceBusMode added in v0.0.7

type ServiceBusMode int8
const (
	PublisherServiceBusMode ServiceBusMode = 1
	ConsumerServiceBusMode  ServiceBusMode = 2
)

Directories

Path Synopsis
Package mock_gobus is a generated GoMock package.
Package mock_gobus is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL