Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewKafkaPublisher ¶
func NewKafkaPublisher(client sarama.Client, async bool) interfaces.Publisher
NewKafkaPublisher setup only kafka publisher with client connection
func NewRabbitMQPublisher ¶
func NewRabbitMQPublisher(conn *amqp.Connection) interfaces.Publisher
NewRabbitMQPublisher setup only rabbitmq publisher with client connection
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker model
func InitBrokers ¶
func InitBrokers(opts ...OptionFunc) *Broker
InitBrokers register all broker for publisher or consumer
* for kafka, pass NewKafkaBroker(...KafkaOptionFunc) in param, init kafka broker configuration from env KAFKA_BROKERS, KAFKA_CLIENT_ID, KAFKA_CLIENT_VERSION
* for rabbitmq, pass NewRabbitMQBroker(...RabbitMQOptionFunc) in param, init rabbitmq broker configuration from env RABBITMQ_BROKER, RABBITMQ_CONSUMER_GROUP, RABBITMQ_EXCHANGE_NAME
func (*Broker) Disconnect ¶
Disconnect disconnect all registered broker
func (*Broker) GetBrokers ¶
func (b *Broker) GetBrokers() map[types.Worker]interfaces.Broker
GetBrokers get all registered broker
func (*Broker) RegisterBroker ¶
func (b *Broker) RegisterBroker(brokerName types.Worker, bk interfaces.Broker)
RegisterBroker register new broker
type KafkaBroker ¶
type KafkaBroker struct {
// contains filtered or unexported fields
}
KafkaBroker configuration
func NewKafkaBroker ¶
func NewKafkaBroker(opts ...KafkaOptionFunc) *KafkaBroker
NewKafkaBroker setup kafka configuration for publisher or consumer, empty option param for default configuration
func (*KafkaBroker) Disconnect ¶
func (k *KafkaBroker) Disconnect(ctx context.Context) error
Disconnect method
func (*KafkaBroker) GetConfiguration ¶
func (k *KafkaBroker) GetConfiguration() interface{}
GetConfiguration method
func (*KafkaBroker) GetPublisher ¶
func (k *KafkaBroker) GetPublisher() interfaces.Publisher
GetPublisher method
type KafkaOptionFunc ¶
type KafkaOptionFunc func(*KafkaBroker)
KafkaOptionFunc func type
func KafkaSetConfig ¶
func KafkaSetConfig(cfg *sarama.Config) KafkaOptionFunc
KafkaSetConfig set custom sarama configuration
func KafkaSetPublisher ¶
func KafkaSetPublisher(pub interfaces.Publisher) KafkaOptionFunc
KafkaSetPublisher set custom publisher
type OptionFunc ¶
type OptionFunc func(*Broker)
OptionFunc type
func SetKafka ¶
func SetKafka(bk interfaces.Broker) OptionFunc
SetKafka setup kafka broker for publisher or consumer
func SetRabbitMQ ¶
func SetRabbitMQ(bk interfaces.Broker) OptionFunc
SetRabbitMQ setup rabbitmq broker for publisher or consumer
type RabbitMQBroker ¶
type RabbitMQBroker struct {
// contains filtered or unexported fields
}
RabbitMQBroker broker
func NewRabbitMQBroker ¶
func NewRabbitMQBroker(opts ...RabbitMQOptionFunc) *RabbitMQBroker
NewRabbitMQBroker setup rabbitmq configuration for publisher or consumer, connection from RABBITMQ_BROKER environment
func (*RabbitMQBroker) Disconnect ¶
func (r *RabbitMQBroker) Disconnect(ctx context.Context) error
Disconnect method
func (*RabbitMQBroker) GetConfiguration ¶
func (r *RabbitMQBroker) GetConfiguration() interface{}
GetConfiguration method
func (*RabbitMQBroker) GetPublisher ¶
func (r *RabbitMQBroker) GetPublisher() interfaces.Publisher
GetPublisher method
type RabbitMQOptionFunc ¶
type RabbitMQOptionFunc func(*RabbitMQBroker)
RabbitMQOptionFunc func type
func RabbitMQSetChannel ¶
func RabbitMQSetChannel(ch *amqp.Channel) RabbitMQOptionFunc
RabbitMQSetChannel set custom channel configuration
func RabbitMQSetPublisher ¶
func RabbitMQSetPublisher(pub interfaces.Publisher) RabbitMQOptionFunc
RabbitMQSetPublisher set custom publisher