Documentation ¶
Index ¶
- Constants
- func GenerateKeyDeleteRedisPubSubMessage(topic string, message interface{}) string
- func GetDefaultKafkaConfig(additionalConfigFunc ...func(*sarama.Config)) *sarama.Config
- func NewKafkaPublisher(client sarama.Client, async bool) interfaces.Publisher
- func NewRabbitMQPublisher(conn *amqp.Connection) interfaces.Publisher
- type Broker
- type KafkaBroker
- type KafkaOptionFunc
- type RabbitMQBroker
- type RabbitMQOptionFunc
- type RedisBroker
- func (r *RedisBroker) Disconnect(ctx context.Context) error
- func (r *RedisBroker) GetConfiguration() interface{}
- func (r *RedisBroker) GetName() types.Worker
- func (r *RedisBroker) GetPublisher() interfaces.Publisher
- func (r *RedisBroker) Health() map[string]error
- func (r *RedisBroker) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error)
- type RedisMessage
- type RedisOptionFunc
Constants ¶
const (
// RabbitMQDelayHeader header key, value in millisecond
RabbitMQDelayHeader = "x-delay"
)
Variables ¶
This section is empty.
Functions ¶
func GenerateKeyDeleteRedisPubSubMessage ¶ added in v1.14.9
GenerateKeyDeleteRedisPubSubMessage delete redis key pubsub message pattern
func GetDefaultKafkaConfig ¶ added in v1.11.23
GetDefaultKafkaConfig construct default kafka config
func NewKafkaPublisher ¶
func NewKafkaPublisher(client sarama.Client, async bool) interfaces.Publisher
NewKafkaPublisher setup only kafka publisher with client connection
func NewRabbitMQPublisher ¶
func NewRabbitMQPublisher(conn *amqp.Connection) interfaces.Publisher
NewRabbitMQPublisher setup only rabbitmq publisher with client connection
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker model
func InitBrokers ¶
func InitBrokers(brokers ...interfaces.Broker) *Broker
InitBrokers register all broker for publisher or consumer
* for Kafka, pass NewKafkaBroker(...KafkaOptionFunc) in param, init kafka broker configuration from env KAFKA_BROKERS, KAFKA_CLIENT_ID, KAFKA_CLIENT_VERSION
* for RabbitMQ, pass NewRabbitMQBroker(...RabbitMQOptionFunc) in param, init rabbitmq broker configuration from env RABBITMQ_BROKER, RABBITMQ_CONSUMER_GROUP, RABBITMQ_EXCHANGE_NAME
func (*Broker) Disconnect ¶
Disconnect disconnect all registered broker
func (*Broker) GetBrokers ¶
func (b *Broker) GetBrokers() map[types.Worker]interfaces.Broker
GetBrokers get all registered broker
func (*Broker) RegisterBroker ¶
func (b *Broker) RegisterBroker(brokerName types.Worker, bk interfaces.Broker)
RegisterBroker register new broker
type KafkaBroker ¶
type KafkaBroker struct {
// contains filtered or unexported fields
}
KafkaBroker configuration
func NewKafkaBroker ¶
func NewKafkaBroker(opts ...KafkaOptionFunc) *KafkaBroker
NewKafkaBroker setup kafka configuration for publisher or consumer, empty option param for default configuration
func (*KafkaBroker) Disconnect ¶
func (k *KafkaBroker) Disconnect(ctx context.Context) error
Disconnect method
func (*KafkaBroker) GetConfiguration ¶
func (k *KafkaBroker) GetConfiguration() interface{}
GetConfiguration method
func (*KafkaBroker) GetName ¶ added in v1.7.2
func (k *KafkaBroker) GetName() types.Worker
GetName method
func (*KafkaBroker) GetPublisher ¶
func (k *KafkaBroker) GetPublisher() interfaces.Publisher
GetPublisher method
type KafkaOptionFunc ¶
type KafkaOptionFunc func(*KafkaBroker)
KafkaOptionFunc func type
func KafkaSetBrokerHost ¶ added in v1.7.2
func KafkaSetBrokerHost(brokers []string) KafkaOptionFunc
KafkaSetBrokerHost set custom broker host
func KafkaSetConfig ¶
func KafkaSetConfig(cfg *sarama.Config) KafkaOptionFunc
KafkaSetConfig set custom sarama configuration
func KafkaSetPublisher ¶
func KafkaSetPublisher(pub interfaces.Publisher) KafkaOptionFunc
KafkaSetPublisher set custom publisher
type RabbitMQBroker ¶
type RabbitMQBroker struct {
// contains filtered or unexported fields
}
RabbitMQBroker broker
func NewRabbitMQBroker ¶
func NewRabbitMQBroker(opts ...RabbitMQOptionFunc) *RabbitMQBroker
NewRabbitMQBroker setup rabbitmq configuration for publisher or consumer, default connection from RABBITMQ_BROKER environment
func (*RabbitMQBroker) Disconnect ¶
func (r *RabbitMQBroker) Disconnect(ctx context.Context) error
Disconnect method
func (*RabbitMQBroker) GetConfiguration ¶
func (r *RabbitMQBroker) GetConfiguration() interface{}
GetConfiguration method
func (*RabbitMQBroker) GetName ¶ added in v1.7.2
func (r *RabbitMQBroker) GetName() types.Worker
GetName method
func (*RabbitMQBroker) GetPublisher ¶
func (r *RabbitMQBroker) GetPublisher() interfaces.Publisher
GetPublisher method
type RabbitMQOptionFunc ¶
type RabbitMQOptionFunc func(*RabbitMQBroker)
RabbitMQOptionFunc func type
func RabbitMQSetBrokerHost ¶ added in v1.7.2
func RabbitMQSetBrokerHost(brokers string) RabbitMQOptionFunc
RabbitMQSetBrokerHost set custom broker host
func RabbitMQSetChannel ¶
func RabbitMQSetChannel(ch *amqp.Channel) RabbitMQOptionFunc
RabbitMQSetChannel set custom channel configuration
func RabbitMQSetPublisher ¶
func RabbitMQSetPublisher(pub interfaces.Publisher) RabbitMQOptionFunc
RabbitMQSetPublisher set custom publisher
type RedisBroker ¶ added in v1.14.9
type RedisBroker struct {
// contains filtered or unexported fields
}
func NewRedisBroker ¶ added in v1.14.9
func NewRedisBroker(pool *redis.Pool, opts ...RedisOptionFunc) *RedisBroker
NewRedisBroker setup redis for publish message
func (*RedisBroker) Disconnect ¶ added in v1.14.9
func (r *RedisBroker) Disconnect(ctx context.Context) error
Disconnect method
func (*RedisBroker) GetConfiguration ¶ added in v1.14.9
func (r *RedisBroker) GetConfiguration() interface{}
GetConfiguration method, return redis pubsub connection
func (*RedisBroker) GetName ¶ added in v1.14.9
func (r *RedisBroker) GetName() types.Worker
GetName method
func (*RedisBroker) GetPublisher ¶ added in v1.14.9
func (r *RedisBroker) GetPublisher() interfaces.Publisher
GetPublisher method
func (*RedisBroker) Health ¶ added in v1.14.9
func (r *RedisBroker) Health() map[string]error
Health method
func (*RedisBroker) PublishMessage ¶ added in v1.14.9
func (r *RedisBroker) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error)
PublishMessage method
type RedisMessage ¶ added in v1.14.9
type RedisMessage struct { HandlerName string `json:"h"` Message string `json:"message"` EventID string `json:"id,omitempty"` }
RedisMessage messaging model for redis subscriber key
func ParseRedisPubSubKeyTopic ¶ added in v1.14.9
func ParseRedisPubSubKeyTopic(key []byte) (redisMessage RedisMessage)
ParseRedisPubSubKeyTopic parse key to redis message
type RedisOptionFunc ¶ added in v1.14.9
type RedisOptionFunc func(*RedisBroker)
RedisOptionFunc func type
func RedisSetConfigCommands ¶ added in v1.14.9
func RedisSetConfigCommands(commands ...string) RedisOptionFunc
RedisSetConfigCommands set config commands
func RedisSetSubscribeChannels ¶ added in v1.14.9
func RedisSetSubscribeChannels(channels ...string) RedisOptionFunc
RedisSetSubscribeChannels set channels