Documentation ¶
Index ¶
- Constants
- func GenerateKeyDeleteRedisPubSubMessage(topic string, message any) string
- func GetDefaultKafkaConfig(additionalConfigFunc ...func(*sarama.Config)) *sarama.Config
- func NewKafkaPublisher(client sarama.Client, async bool) interfaces.Publisher
- type Broker
- type KafkaBroker
- type KafkaOptionFunc
- type RabbitMQBroker
- type RabbitMQOptionFunc
- func RabbitMQSetBrokerHost(brokers string) RabbitMQOptionFunc
- func RabbitMQSetChannel(ch *amqp.Channel) RabbitMQOptionFunc
- func RabbitMQSetExchange(exchange string) RabbitMQOptionFunc
- func RabbitMQSetPublisher(pub interfaces.Publisher) RabbitMQOptionFunc
- func RabbitMQSetWorkerType(workerType types.Worker) RabbitMQOptionFunc
- type RabbitMQPublisher
- type RedisBroker
- func (r *RedisBroker) Disconnect(ctx context.Context) error
- func (r *RedisBroker) GetName() types.Worker
- func (r *RedisBroker) GetPublisher() interfaces.Publisher
- func (r *RedisBroker) Health() map[string]error
- func (r *RedisBroker) InitPubSubConn() *redis.PubSubConn
- func (r *RedisBroker) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error)
- type RedisMessage
- type RedisOptionFunc
Constants ¶
const (
// RabbitMQDelayHeader header key, value in millisecond
RabbitMQDelayHeader = "x-delay"
)
const (
// RedisBrokerKey key constant
RedisBrokerKey = "dynamic_scheduling"
)
Variables ¶
This section is empty.
Functions ¶
func GenerateKeyDeleteRedisPubSubMessage ¶ added in v1.14.9
GenerateKeyDeleteRedisPubSubMessage delete redis key pubsub message pattern
func GetDefaultKafkaConfig ¶ added in v1.11.23
GetDefaultKafkaConfig construct default kafka config
func NewKafkaPublisher ¶
func NewKafkaPublisher(client sarama.Client, async bool) interfaces.Publisher
NewKafkaPublisher setup only kafka publisher with client connection
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker model
func InitBrokers ¶
func InitBrokers(brokers ...interfaces.Broker) *Broker
InitBrokers register all broker for publisher or consumer
* for Kafka, pass NewKafkaBroker(...KafkaOptionFunc) in param, init kafka broker configuration from env KAFKA_BROKERS, KAFKA_CLIENT_ID, KAFKA_CLIENT_VERSION
* for RabbitMQ, pass NewRabbitMQBroker(...RabbitMQOptionFunc) in param, init rabbitmq broker configuration from env RABBITMQ_BROKER, RABBITMQ_CONSUMER_GROUP, RABBITMQ_EXCHANGE_NAME
func (*Broker) Disconnect ¶
Disconnect disconnect all registered broker
func (*Broker) GetBrokers ¶
func (b *Broker) GetBrokers() map[types.Worker]interfaces.Broker
GetBrokers get all registered broker
func (*Broker) RegisterBroker ¶
func (b *Broker) RegisterBroker(brokerName types.Worker, bk interfaces.Broker)
RegisterBroker register new broker
type KafkaBroker ¶
type KafkaBroker struct { WorkerType types.Worker BrokerHost []string Config *sarama.Config Client sarama.Client // contains filtered or unexported fields }
KafkaBroker configuration
func NewKafkaBroker ¶
func NewKafkaBroker(opts ...KafkaOptionFunc) *KafkaBroker
NewKafkaBroker setup kafka configuration for publisher or consumer, empty option param for default configuration (with default worker type is types.Kafka)
func (*KafkaBroker) Disconnect ¶
func (k *KafkaBroker) Disconnect(ctx context.Context) error
Disconnect method
func (*KafkaBroker) GetName ¶ added in v1.7.2
func (k *KafkaBroker) GetName() types.Worker
GetName method
func (*KafkaBroker) GetPublisher ¶
func (k *KafkaBroker) GetPublisher() interfaces.Publisher
GetPublisher method
type KafkaOptionFunc ¶
type KafkaOptionFunc func(*KafkaBroker)
KafkaOptionFunc func type
func KafkaSetBrokerHost ¶ added in v1.7.2
func KafkaSetBrokerHost(brokers []string) KafkaOptionFunc
KafkaSetBrokerHost set custom broker host
func KafkaSetConfig ¶
func KafkaSetConfig(cfg *sarama.Config) KafkaOptionFunc
KafkaSetConfig set custom sarama configuration
func KafkaSetPublisher ¶
func KafkaSetPublisher(pub interfaces.Publisher) KafkaOptionFunc
KafkaSetPublisher set custom publisher
func KafkaSetWorkerType ¶ added in v1.17.2
func KafkaSetWorkerType(workerType types.Worker) KafkaOptionFunc
KafkaSetWorkerType set worker type
type RabbitMQBroker ¶
type RabbitMQBroker struct { WorkerType types.Worker BrokerHost string Exchange string Conn *amqp.Connection Channel *amqp.Channel // contains filtered or unexported fields }
RabbitMQBroker broker
func NewRabbitMQBroker ¶
func NewRabbitMQBroker(opts ...RabbitMQOptionFunc) *RabbitMQBroker
NewRabbitMQBroker setup rabbitmq configuration for publisher or consumer, default connection from RABBITMQ_BROKER environment (with default worker type is types.RabbitMQ)
func (*RabbitMQBroker) Disconnect ¶
func (r *RabbitMQBroker) Disconnect(ctx context.Context) error
Disconnect method
func (*RabbitMQBroker) GetName ¶ added in v1.7.2
func (r *RabbitMQBroker) GetName() types.Worker
GetName method
func (*RabbitMQBroker) GetPublisher ¶
func (r *RabbitMQBroker) GetPublisher() interfaces.Publisher
GetPublisher method
type RabbitMQOptionFunc ¶
type RabbitMQOptionFunc func(*RabbitMQBroker)
RabbitMQOptionFunc func type
func RabbitMQSetBrokerHost ¶ added in v1.7.2
func RabbitMQSetBrokerHost(brokers string) RabbitMQOptionFunc
RabbitMQSetBrokerHost set custom broker host
func RabbitMQSetChannel ¶
func RabbitMQSetChannel(ch *amqp.Channel) RabbitMQOptionFunc
RabbitMQSetChannel set custom channel configuration
func RabbitMQSetExchange ¶ added in v1.17.2
func RabbitMQSetExchange(exchange string) RabbitMQOptionFunc
RabbitMQSetExchange set exchange
func RabbitMQSetPublisher ¶
func RabbitMQSetPublisher(pub interfaces.Publisher) RabbitMQOptionFunc
RabbitMQSetPublisher set custom publisher
func RabbitMQSetWorkerType ¶ added in v1.17.2
func RabbitMQSetWorkerType(workerType types.Worker) RabbitMQOptionFunc
RabbitMQSetWorkerType set worker type
type RabbitMQPublisher ¶ added in v1.17.12
type RabbitMQPublisher struct {
// contains filtered or unexported fields
}
RabbitMQPublisher rabbitmq
func NewRabbitMQPublisher ¶
func NewRabbitMQPublisher(conn *amqp.Connection, exchange string) *RabbitMQPublisher
NewRabbitMQPublisher setup only rabbitmq publisher with client connection
func (*RabbitMQPublisher) PublishMessage ¶ added in v1.17.12
func (r *RabbitMQPublisher) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error)
PublishMessage method
type RedisBroker ¶ added in v1.14.9
type RedisBroker struct { WorkerType types.Worker Pool *redis.Pool // contains filtered or unexported fields }
func NewRedisBroker ¶ added in v1.14.9
func NewRedisBroker(pool *redis.Pool, opts ...RedisOptionFunc) *RedisBroker
NewRedisBroker setup redis for publish message (with default worker type is types.RedisSubscriber)
func (*RedisBroker) Disconnect ¶ added in v1.14.9
func (r *RedisBroker) Disconnect(ctx context.Context) error
Disconnect method
func (*RedisBroker) GetName ¶ added in v1.14.9
func (r *RedisBroker) GetName() types.Worker
GetName method
func (*RedisBroker) GetPublisher ¶ added in v1.14.9
func (r *RedisBroker) GetPublisher() interfaces.Publisher
GetPublisher method
func (*RedisBroker) Health ¶ added in v1.14.9
func (r *RedisBroker) Health() map[string]error
Health method
func (*RedisBroker) InitPubSubConn ¶ added in v1.17.2
func (r *RedisBroker) InitPubSubConn() *redis.PubSubConn
InitPubSubConn method, return redis pubsub connection
func (*RedisBroker) PublishMessage ¶ added in v1.14.9
func (r *RedisBroker) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error)
PublishMessage method
type RedisMessage ¶ added in v1.14.9
type RedisMessage struct { HandlerName string `json:"h"` Key string `json:"key"` Message string `json:"message,omitempty"` EventID string `json:"id,omitempty"` }
RedisMessage messaging model for redis subscriber key
func ParseRedisPubSubKeyTopic ¶ added in v1.14.9
func ParseRedisPubSubKeyTopic(key []byte) (redisMessage RedisMessage)
ParseRedisPubSubKeyTopic parse key to redis message
type RedisOptionFunc ¶ added in v1.14.9
type RedisOptionFunc func(*RedisBroker)
RedisOptionFunc func type
func RedisSetConfigCommands ¶ added in v1.14.9
func RedisSetConfigCommands(commands ...string) RedisOptionFunc
RedisSetConfigCommands set config commands
func RedisSetSubscribeChannels ¶ added in v1.14.9
func RedisSetSubscribeChannels(channels ...string) RedisOptionFunc
RedisSetSubscribeChannels set channels
func RedisSetWorkerType ¶ added in v1.17.2
func RedisSetWorkerType(workerType types.Worker) RedisOptionFunc
RedisSetWorkerType set worker type