broker

package
v1.14.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

README

Broker

Include default broker (Kafka & RabbitMQ), or other broker (GCP PubSub, STOMP/AMQ) can be found in candi plugin.

Kafka

Register Kafka broker in service config

Modify configs/configs.go in your service

package configs

import (
	"github.com/golangid/candi/broker"
...

// LoadServiceConfigs load selected dependency configuration in this service
func LoadServiceConfigs(baseCfg *config.Config) (deps dependency.Dependency) {
	
		...

		brokerDeps := broker.InitBrokers(
			broker.NewKafkaBroker(),
		)

		... 
}

If you want to use Kafka consumer, just set USE_KAFKA_CONSUMER=true in environment variable, and follow this example.

If you want to use Kafka publisher in your usecase, follow this example code:

package usecase

import (
	"context"

	"github.com/golangid/candi/candishared"
	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type usecaseImpl {
	kafkaPub interfaces.Publisher
}

func NewUsecase(deps dependency.Dependency) Usecase {
	return &usecaseImpl{
		kafkaPub: deps.GetBroker(types.Kafka).GetPublisher(),
	}
}

func (uc *usecaseImpl) UsecaseToPublishMessage(ctx context.Context) error {
	err := uc.kafkaPub.PublishMessage(ctx, &candishared.PublisherArgument{
		Topic:  "example-topic",
		Data:   "hello world",
	})
	return err
}

RabbitMQ

Register RabbitMQ broker in service config

Modify configs/configs.go in your service

package configs

import (
	"github.com/golangid/candi/broker"
...

// LoadServiceConfigs load selected dependency configuration in this service
func LoadServiceConfigs(baseCfg *config.Config) (deps dependency.Dependency) {
	
		...

		brokerDeps := broker.InitBrokers(
			broker.NewRabbitMQBroker(),
		)

		... 
}

If you want to use RabbitMQ consumer, just set USE_RABBITMQ_CONSUMER=true in environment variable, and follow this example.

If you want to use RabbitMQ publisher in your usecase, follow this example code:

package usecase

import (
	"context"

	"github.com/golangid/candi/broker"
	"github.com/golangid/candi/candishared"
	"github.com/golangid/candi/codebase/factory/dependency"
	"github.com/golangid/candi/codebase/factory/types"
	"github.com/golangid/candi/codebase/interfaces"
)

type usecaseImpl {
	rabbitmqPub interfaces.Publisher
}

func NewUsecase(deps dependency.Dependency) Usecase {
	return &usecaseImpl{
		rabbitmqPub: deps.GetBroker(types.RabbitMQ).GetPublisher(),
	}
}

func (uc *usecaseImpl) UsecaseToPublishMessage(ctx context.Context) error {
	err := uc.rabbitmqPub.PublishMessage(ctx, &candishared.PublisherArgument{
		Topic:  "example-topic",
		Data:   "hello world"
		Header: map[string]interface{}{
			broker.RabbitMQDelayHeader: 5000, // if you want set delay consume your message by active consumer for 5 seconds
		},
	})
	return err
}

Documentation

Index

Constants

View Source
const (
	// RabbitMQDelayHeader header key, value in millisecond
	RabbitMQDelayHeader = "x-delay"
)

Variables

This section is empty.

Functions

func GenerateKeyDeleteRedisPubSubMessage added in v1.14.9

func GenerateKeyDeleteRedisPubSubMessage(topic string, message interface{}) string

GenerateKeyDeleteRedisPubSubMessage delete redis key pubsub message pattern

func GetDefaultKafkaConfig added in v1.11.23

func GetDefaultKafkaConfig(additionalConfigFunc ...func(*sarama.Config)) *sarama.Config

GetDefaultKafkaConfig construct default kafka config

func NewKafkaPublisher

func NewKafkaPublisher(client sarama.Client, async bool) interfaces.Publisher

NewKafkaPublisher setup only kafka publisher with client connection

func NewRabbitMQPublisher

func NewRabbitMQPublisher(conn *amqp.Connection) interfaces.Publisher

NewRabbitMQPublisher setup only rabbitmq publisher with client connection

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker model

func InitBrokers

func InitBrokers(brokers ...interfaces.Broker) *Broker

InitBrokers register all broker for publisher or consumer

* for Kafka, pass NewKafkaBroker(...KafkaOptionFunc) in param, init kafka broker configuration from env KAFKA_BROKERS, KAFKA_CLIENT_ID, KAFKA_CLIENT_VERSION

* for RabbitMQ, pass NewRabbitMQBroker(...RabbitMQOptionFunc) in param, init rabbitmq broker configuration from env RABBITMQ_BROKER, RABBITMQ_CONSUMER_GROUP, RABBITMQ_EXCHANGE_NAME

func (*Broker) Disconnect

func (b *Broker) Disconnect(ctx context.Context) error

Disconnect disconnect all registered broker

func (*Broker) GetBrokers

func (b *Broker) GetBrokers() map[types.Worker]interfaces.Broker

GetBrokers get all registered broker

func (*Broker) RegisterBroker

func (b *Broker) RegisterBroker(brokerName types.Worker, bk interfaces.Broker)

RegisterBroker register new broker

type KafkaBroker

type KafkaBroker struct {
	// contains filtered or unexported fields
}

KafkaBroker configuration

func NewKafkaBroker

func NewKafkaBroker(opts ...KafkaOptionFunc) *KafkaBroker

NewKafkaBroker setup kafka configuration for publisher or consumer, empty option param for default configuration

func (*KafkaBroker) Disconnect

func (k *KafkaBroker) Disconnect(ctx context.Context) error

Disconnect method

func (*KafkaBroker) GetConfiguration

func (k *KafkaBroker) GetConfiguration() interface{}

GetConfiguration method

func (*KafkaBroker) GetName added in v1.7.2

func (k *KafkaBroker) GetName() types.Worker

GetName method

func (*KafkaBroker) GetPublisher

func (k *KafkaBroker) GetPublisher() interfaces.Publisher

GetPublisher method

func (*KafkaBroker) Health

func (k *KafkaBroker) Health() map[string]error

Health method

type KafkaOptionFunc

type KafkaOptionFunc func(*KafkaBroker)

KafkaOptionFunc func type

func KafkaSetBrokerHost added in v1.7.2

func KafkaSetBrokerHost(brokers []string) KafkaOptionFunc

KafkaSetBrokerHost set custom broker host

func KafkaSetConfig

func KafkaSetConfig(cfg *sarama.Config) KafkaOptionFunc

KafkaSetConfig set custom sarama configuration

func KafkaSetPublisher

func KafkaSetPublisher(pub interfaces.Publisher) KafkaOptionFunc

KafkaSetPublisher set custom publisher

type RabbitMQBroker

type RabbitMQBroker struct {
	// contains filtered or unexported fields
}

RabbitMQBroker broker

func NewRabbitMQBroker

func NewRabbitMQBroker(opts ...RabbitMQOptionFunc) *RabbitMQBroker

NewRabbitMQBroker setup rabbitmq configuration for publisher or consumer, default connection from RABBITMQ_BROKER environment

func (*RabbitMQBroker) Disconnect

func (r *RabbitMQBroker) Disconnect(ctx context.Context) error

Disconnect method

func (*RabbitMQBroker) GetConfiguration

func (r *RabbitMQBroker) GetConfiguration() interface{}

GetConfiguration method

func (*RabbitMQBroker) GetName added in v1.7.2

func (r *RabbitMQBroker) GetName() types.Worker

GetName method

func (*RabbitMQBroker) GetPublisher

func (r *RabbitMQBroker) GetPublisher() interfaces.Publisher

GetPublisher method

func (*RabbitMQBroker) Health

func (r *RabbitMQBroker) Health() map[string]error

Health method

type RabbitMQOptionFunc

type RabbitMQOptionFunc func(*RabbitMQBroker)

RabbitMQOptionFunc func type

func RabbitMQSetBrokerHost added in v1.7.2

func RabbitMQSetBrokerHost(brokers string) RabbitMQOptionFunc

RabbitMQSetBrokerHost set custom broker host

func RabbitMQSetChannel

func RabbitMQSetChannel(ch *amqp.Channel) RabbitMQOptionFunc

RabbitMQSetChannel set custom channel configuration

func RabbitMQSetPublisher

func RabbitMQSetPublisher(pub interfaces.Publisher) RabbitMQOptionFunc

RabbitMQSetPublisher set custom publisher

type RedisBroker added in v1.14.9

type RedisBroker struct {
	// contains filtered or unexported fields
}

func NewRedisBroker added in v1.14.9

func NewRedisBroker(pool *redis.Pool, opts ...RedisOptionFunc) *RedisBroker

NewRedisBroker setup redis for publish message

func (*RedisBroker) Disconnect added in v1.14.9

func (r *RedisBroker) Disconnect(ctx context.Context) error

Disconnect method

func (*RedisBroker) GetConfiguration added in v1.14.9

func (r *RedisBroker) GetConfiguration() interface{}

GetConfiguration method, return redis pubsub connection

func (*RedisBroker) GetName added in v1.14.9

func (r *RedisBroker) GetName() types.Worker

GetName method

func (*RedisBroker) GetPublisher added in v1.14.9

func (r *RedisBroker) GetPublisher() interfaces.Publisher

GetPublisher method

func (*RedisBroker) Health added in v1.14.9

func (r *RedisBroker) Health() map[string]error

Health method

func (*RedisBroker) PublishMessage added in v1.14.9

func (r *RedisBroker) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error)

PublishMessage method

type RedisMessage added in v1.14.9

type RedisMessage struct {
	HandlerName string `json:"h"`
	Message     string `json:"message"`
	EventID     string `json:"id,omitempty"`
}

RedisMessage messaging model for redis subscriber key

func ParseRedisPubSubKeyTopic added in v1.14.9

func ParseRedisPubSubKeyTopic(key []byte) (redisMessage RedisMessage)

ParseRedisPubSubKeyTopic parse key to redis message

type RedisOptionFunc added in v1.14.9

type RedisOptionFunc func(*RedisBroker)

RedisOptionFunc func type

func RedisSetConfigCommands added in v1.14.9

func RedisSetConfigCommands(commands ...string) RedisOptionFunc

RedisSetConfigCommands set config commands

func RedisSetSubscribeChannels added in v1.14.9

func RedisSetSubscribeChannels(channels ...string) RedisOptionFunc

RedisSetSubscribeChannels set channels

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL