broker

package
v1.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2021 License: Apache-2.0 Imports: 13 Imported by: 0

README

Broker

Include default broker (Kafka & RabbitMQ), or other broker (GCP PubSub, STOMP/AMQ) can be found in candi plugin.

Kafka

Register Kafka broker in service config

File configs/configs.go in your service

package configs

import (
	"github.com/golangid/candi/broker"
...

// LoadServiceConfigs load selected dependency configuration in this service
func LoadServiceConfigs(baseCfg *config.Config) (deps dependency.Dependency) {
	
		...

		brokerDeps := broker.InitBrokers(
			broker.SetKafka(broker.NewKafkaBroker()),
		)

		... 
}

If you want to use Kafka consumer, just set USE_KAFKA_CONSUMER=true in environment variable, and follow this example.

If you want to use Kafka publisher in your usecase, follow this example code:

package usecase

import (
	"context"

	"github.com/golangid/candi/candishared"
	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type usecaseImpl {
	kafkaPub interfaces.Publisher
}

func NewUsecase(deps dependency.Dependency) Usecase {
	return &usecaseImpl{
		kafkaPub: deps.GetBroker(types.Kafka).GetPublisher(),
	}
}

func (uc *usecaseImpl) UsecaseToPublishMessage(ctx context.Context) error {
	err := uc.kafkaPub.PublishMessage(ctx, &candishared.PublisherArgument{
		Topic:  "example-topic",
		Data:   "hello world",
	})
	return err
}

RabbitMQ

Register RabbitMQ broker in service config

File configs/configs.go in your service

package configs

import (
	"github.com/golangid/candi/broker"
...

// LoadServiceConfigs load selected dependency configuration in this service
func LoadServiceConfigs(baseCfg *config.Config) (deps dependency.Dependency) {
	
		...

		brokerDeps := broker.InitBrokers(
			broker.SetRabbitMQ(broker.NewRabbitMQBroker()),
		)

		... 
}

If you want to use RabbitMQ consumer, just set USE_RABBITMQ_CONSUMER=true in environment variable, and follow this example.

If you want to use RabbitMQ publisher in your usecase, follow this example code:

package usecase

import (
	"context"

	"github.com/golangid/candi/broker"
	"github.com/golangid/candi/candishared"
	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type usecaseImpl {
	rabbitmqPub interfaces.Publisher
}

func NewUsecase(deps dependency.Dependency) Usecase {
	return &usecaseImpl{
		rabbitmqPub: deps.GetBroker(types.RabbitMQ).GetPublisher(),
	}
}

func (uc *usecaseImpl) UsecaseToPublishMessage(ctx context.Context) error {
	err := uc.rabbitmqPub.PublishMessage(ctx, &candishared.PublisherArgument{
		Topic:  "example-topic",
		Data:   "hello world"
		Header: map[string]interface{}{
			broker.RabbitMQDelayHeader: 5000, // if you want set delay consume your message by active consumer for 5 seconds
		},
	})
	return err
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewKafkaPublisher

func NewKafkaPublisher(client sarama.Client, async bool) interfaces.Publisher

NewKafkaPublisher setup only kafka publisher with client connection

func NewRabbitMQPublisher

func NewRabbitMQPublisher(conn *amqp.Connection) interfaces.Publisher

NewRabbitMQPublisher setup only rabbitmq publisher with client connection

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker model

func InitBrokers

func InitBrokers(opts ...OptionFunc) *Broker

InitBrokers register all broker for publisher or consumer

* for kafka, pass NewKafkaBroker(...KafkaOptionFunc) in param, init kafka broker configuration from env KAFKA_BROKERS, KAFKA_CLIENT_ID, KAFKA_CLIENT_VERSION

* for rabbitmq, pass NewRabbitMQBroker(...RabbitMQOptionFunc) in param, init rabbitmq broker configuration from env RABBITMQ_BROKER, RABBITMQ_CONSUMER_GROUP, RABBITMQ_EXCHANGE_NAME

func (*Broker) Disconnect

func (b *Broker) Disconnect(ctx context.Context) error

Disconnect disconnect all registered broker

func (*Broker) GetBrokers

func (b *Broker) GetBrokers() map[types.Worker]interfaces.Broker

GetBrokers get all registered broker

func (*Broker) RegisterBroker

func (b *Broker) RegisterBroker(brokerName types.Worker, bk interfaces.Broker)

RegisterBroker register new broker

type KafkaBroker

type KafkaBroker struct {
	// contains filtered or unexported fields
}

KafkaBroker configuration

func NewKafkaBroker

func NewKafkaBroker(opts ...KafkaOptionFunc) *KafkaBroker

NewKafkaBroker setup kafka configuration for publisher or consumer, empty option param for default configuration

func (*KafkaBroker) Disconnect

func (k *KafkaBroker) Disconnect(ctx context.Context) error

Disconnect method

func (*KafkaBroker) GetConfiguration

func (k *KafkaBroker) GetConfiguration() interface{}

GetConfiguration method

func (*KafkaBroker) GetPublisher

func (k *KafkaBroker) GetPublisher() interfaces.Publisher

GetPublisher method

func (*KafkaBroker) Health

func (k *KafkaBroker) Health() map[string]error

Health method

type KafkaOptionFunc

type KafkaOptionFunc func(*KafkaBroker)

KafkaOptionFunc func type

func KafkaSetConfig

func KafkaSetConfig(cfg *sarama.Config) KafkaOptionFunc

KafkaSetConfig set custom sarama configuration

func KafkaSetPublisher

func KafkaSetPublisher(pub interfaces.Publisher) KafkaOptionFunc

KafkaSetPublisher set custom publisher

type OptionFunc

type OptionFunc func(*Broker)

OptionFunc type

func SetKafka

func SetKafka(bk interfaces.Broker) OptionFunc

SetKafka setup kafka broker for publisher or consumer

func SetRabbitMQ

func SetRabbitMQ(bk interfaces.Broker) OptionFunc

SetRabbitMQ setup rabbitmq broker for publisher or consumer

type RabbitMQBroker

type RabbitMQBroker struct {
	// contains filtered or unexported fields
}

RabbitMQBroker broker

func NewRabbitMQBroker

func NewRabbitMQBroker(opts ...RabbitMQOptionFunc) *RabbitMQBroker

NewRabbitMQBroker setup rabbitmq configuration for publisher or consumer, connection from RABBITMQ_BROKER environment

func (*RabbitMQBroker) Disconnect

func (r *RabbitMQBroker) Disconnect(ctx context.Context) error

Disconnect method

func (*RabbitMQBroker) GetConfiguration

func (r *RabbitMQBroker) GetConfiguration() interface{}

GetConfiguration method

func (*RabbitMQBroker) GetPublisher

func (r *RabbitMQBroker) GetPublisher() interfaces.Publisher

GetPublisher method

func (*RabbitMQBroker) Health

func (r *RabbitMQBroker) Health() map[string]error

Health method

type RabbitMQOptionFunc

type RabbitMQOptionFunc func(*RabbitMQBroker)

RabbitMQOptionFunc func type

func RabbitMQSetChannel

func RabbitMQSetChannel(ch *amqp.Channel) RabbitMQOptionFunc

RabbitMQSetChannel set custom channel configuration

func RabbitMQSetPublisher

func RabbitMQSetPublisher(pub interfaces.Publisher) RabbitMQOptionFunc

RabbitMQSetPublisher set custom publisher

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL