Documentation ¶
Index ¶
Constants ¶
const (
// RabbitMQDelayHeader header key, value in millisecond
RabbitMQDelayHeader = "x-delay"
)
Variables ¶
This section is empty.
Functions ¶
func NewKafkaPublisher ¶
func NewKafkaPublisher(client sarama.Client, async bool) interfaces.Publisher
NewKafkaPublisher setup only kafka publisher with client connection
func NewRabbitMQPublisher ¶
func NewRabbitMQPublisher(conn *amqp.Connection) interfaces.Publisher
NewRabbitMQPublisher setup only rabbitmq publisher with client connection
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker model
func InitBrokers ¶
func InitBrokers(brokers ...interfaces.Broker) *Broker
InitBrokers register all broker for publisher or consumer
* for Kafka, pass NewKafkaBroker(...KafkaOptionFunc) in param, init kafka broker configuration from env KAFKA_BROKERS, KAFKA_CLIENT_ID, KAFKA_CLIENT_VERSION
* for RabbitMQ, pass NewRabbitMQBroker(...RabbitMQOptionFunc) in param, init rabbitmq broker configuration from env RABBITMQ_BROKER, RABBITMQ_CONSUMER_GROUP, RABBITMQ_EXCHANGE_NAME
func (*Broker) Disconnect ¶
Disconnect disconnect all registered broker
func (*Broker) GetBrokers ¶
func (b *Broker) GetBrokers() map[types.Worker]interfaces.Broker
GetBrokers get all registered broker
func (*Broker) RegisterBroker ¶
func (b *Broker) RegisterBroker(brokerName types.Worker, bk interfaces.Broker)
RegisterBroker register new broker
type KafkaBroker ¶
type KafkaBroker struct {
// contains filtered or unexported fields
}
KafkaBroker configuration
func NewKafkaBroker ¶
func NewKafkaBroker(opts ...KafkaOptionFunc) *KafkaBroker
NewKafkaBroker setup kafka configuration for publisher or consumer, empty option param for default configuration
func (*KafkaBroker) Disconnect ¶
func (k *KafkaBroker) Disconnect(ctx context.Context) error
Disconnect method
func (*KafkaBroker) GetConfiguration ¶
func (k *KafkaBroker) GetConfiguration() interface{}
GetConfiguration method
func (*KafkaBroker) GetName ¶ added in v1.7.2
func (k *KafkaBroker) GetName() types.Worker
GetName method
func (*KafkaBroker) GetPublisher ¶
func (k *KafkaBroker) GetPublisher() interfaces.Publisher
GetPublisher method
type KafkaOptionFunc ¶
type KafkaOptionFunc func(*KafkaBroker)
KafkaOptionFunc func type
func KafkaSetBrokerHost ¶ added in v1.7.2
func KafkaSetBrokerHost(brokers []string) KafkaOptionFunc
KafkaSetBrokerHost set custom broker host
func KafkaSetConfig ¶
func KafkaSetConfig(cfg *sarama.Config) KafkaOptionFunc
KafkaSetConfig set custom sarama configuration
func KafkaSetPublisher ¶
func KafkaSetPublisher(pub interfaces.Publisher) KafkaOptionFunc
KafkaSetPublisher set custom publisher
type RabbitMQBroker ¶
type RabbitMQBroker struct {
// contains filtered or unexported fields
}
RabbitMQBroker broker
func NewRabbitMQBroker ¶
func NewRabbitMQBroker(opts ...RabbitMQOptionFunc) *RabbitMQBroker
NewRabbitMQBroker setup rabbitmq configuration for publisher or consumer, default connection from RABBITMQ_BROKER environment
func (*RabbitMQBroker) Disconnect ¶
func (r *RabbitMQBroker) Disconnect(ctx context.Context) error
Disconnect method
func (*RabbitMQBroker) GetConfiguration ¶
func (r *RabbitMQBroker) GetConfiguration() interface{}
GetConfiguration method
func (*RabbitMQBroker) GetName ¶ added in v1.7.2
func (r *RabbitMQBroker) GetName() types.Worker
GetName method
func (*RabbitMQBroker) GetPublisher ¶
func (r *RabbitMQBroker) GetPublisher() interfaces.Publisher
GetPublisher method
type RabbitMQOptionFunc ¶
type RabbitMQOptionFunc func(*RabbitMQBroker)
RabbitMQOptionFunc func type
func RabbitMQSetBrokerHost ¶ added in v1.7.2
func RabbitMQSetBrokerHost(brokers string) RabbitMQOptionFunc
RabbitMQSetBrokerHost set custom broker host
func RabbitMQSetChannel ¶
func RabbitMQSetChannel(ch *amqp.Channel) RabbitMQOptionFunc
RabbitMQSetChannel set custom channel configuration
func RabbitMQSetPublisher ¶
func RabbitMQSetPublisher(pub interfaces.Publisher) RabbitMQOptionFunc
RabbitMQSetPublisher set custom publisher