Documentation ¶
Index ¶
- Constants
- Variables
- func ExtractRabbitHeaders(ctx context.Context, carrier rabbitmq.Table) context.Context
- func InjectRabbitHeaders(ctx context.Context) rabbitmq.Table
- func NewError(errorMessage string, errorCode grpc.ErrorCode) *grpc.Error
- func WithConcurrentReplyConsumer(number int) func(options *Options)
- func WithHeader(header []HeaderValue) func(options *PublisherOptions)
- func WithLogger(logger rabbitmq.Logger) func(options *Options)
- func WithMultiplePublishers(number int) func(options *Options)
- type Configuration
- type Consumer
- func (cm *Consumer) NewChargePointConsumer(topic Topic, handler HandlerFunc) (*TopicConsumer, error)
- func (cm *Consumer) NewNotificationConsumer(topic Topic, queueName string, handler HandlerFunc, routines int) (*TopicConsumer, error)
- func (cm *Consumer) NewService1Consumer(topic Topic, handler HandlerFunc) (*TopicConsumer, error)
- func (cm *Consumer) NewServiceConsumer(topic Topic, handler HandlerFunc, durable bool, routines int) (*TopicConsumer, error)
- type Exchange
- type HandlerFunc
- type Header
- type HeaderKey
- type HeaderReplyType
- type HeaderValue
- type HeadersCarrier
- type Options
- type PublishRequest
- type Publisher
- type PublisherOptions
- type PublisherPool
- func (pp *PublisherPool) Publish(ctx context.Context, topic Topic, message proto.Message, ...) error
- func (pp *PublisherPool) PublishRPC(ctx context.Context, topic Topic, message proto.Message, ...) ([]byte, error)
- func (pp *PublisherPool) PublishRPCWithMultipleResponses(ctx context.Context, topic Topic, message proto.Message, nrResponses int, ...) (chan ReplyResponse, error)
- func (pp *PublisherPool) Respond(ctx context.Context, correlationID string, topic Topic, message proto.Message, ...) error
- func (pp *PublisherPool) RespondWithError(ctx context.Context, correlationID string, topic Topic, message *grpc.Error, ...) error
- type Rabbit
- type ReplyPool
- type ReplyRequest
- type ReplyResponse
- type Topic
- type TopicBuilder
- type TopicConsumer
- type TopicWord
Constants ¶
const ( TopicExchange exchangeType = "topic" DirectExchange exchangeType = "direct" FanoutExchange exchangeType = "fanout" )
Variables ¶
var ErrResponse = errors.New("responded with an error")
Functions ¶
func ExtractRabbitHeaders ¶
func InjectRabbitHeaders ¶
func WithConcurrentReplyConsumer ¶
WithConcurrentReplyConsumer sets the number of go routines that will handle the reply queue
func WithHeader ¶
func WithHeader(header []HeaderValue) func(options *PublisherOptions)
func WithLogger ¶
func WithLogger(logger rabbitmq.Logger) func(options *Options)
func WithMultiplePublishers ¶
WithMultiplePublishers sets the number of routines running a publisher, each has it's own TCP connection
Types ¶
type Configuration ¶
type Configuration struct { // Address is the address for connecting to the RabbitMQ instance Address string `validate:"required" json:"address" yaml:"address"` // Username for authentication to the RabbitMQ instance Username string `json:"username" yaml:"username"` // Password for authentication to the RabbitMQ instance Password string `json:"password" yaml:"password"` }
Configuration AMQP basic configuration for the message bus
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) NewChargePointConsumer ¶
func (cm *Consumer) NewChargePointConsumer(topic Topic, handler HandlerFunc) (*TopicConsumer, error)
func (*Consumer) NewNotificationConsumer ¶
func (cm *Consumer) NewNotificationConsumer(topic Topic, queueName string, handler HandlerFunc, routines int) (*TopicConsumer, error)
NewNotificationConsumer creates a new consumer of notifications for given topic, with routing key and handler function, the returned consumer should only be used for disconnecting - routines sets the number of go routines handling the consumer, 1 should suffice in general - returns the consumer and error, only initialize them if necessary - to achieve load balancing, queueName should be set in the form of <consumer-service>.<topic>
func (*Consumer) NewService1Consumer ¶
func (cm *Consumer) NewService1Consumer(topic Topic, handler HandlerFunc) (*TopicConsumer, error)
NewServiceConsumer creates a new service topic consumer for given topic and handler function If durable is set to true, the queue will be of quorum type which is durable and persists through restarts routines sets the number of go routines handling the consumer, 1 should suffice in general returns the consumer and error, only initialize them if necessary the returned consumer should only be used for disconnecting
func (*Consumer) NewServiceConsumer ¶
func (cm *Consumer) NewServiceConsumer(topic Topic, handler HandlerFunc, durable bool, routines int) (*TopicConsumer, error)
NewServiceConsumer creates a new service topic consumer for given topic and handler function If durable is set to true, the queue will be of quorum type which is durable and persists through restarts routines sets the number of go routines handling the consumer, 1 should suffice in general returns the consumer and error, only initialize them if necessary the returned consumer should only be used for disconnecting
type HandlerFunc ¶
type Header ¶
type Header struct {
// contains filtered or unexported fields
}
func (*Header) Build ¶
func (rh *Header) Build() []HeaderValue
func (*Header) WithField ¶
When adding custom fields, make sure the value is supported: https://pkg.go.dev/github.com/wagslane/go-rabbitmq#Table
func (*Header) WithMethod ¶
type HeaderReplyType ¶
type HeaderReplyType string
const ( HeaderReplyTypeAcknowledge HeaderReplyType = "acknowledge" HeaderReplyTypeInterceptedMessage HeaderReplyType = "intercepted_message" )
type HeaderValue ¶
type HeadersCarrier ¶
type HeadersCarrier rabbitmq.Table
func (HeadersCarrier) Get ¶
func (c HeadersCarrier) Get(key string) string
func (HeadersCarrier) Keys ¶
func (c HeadersCarrier) Keys() []string
func (HeadersCarrier) Set ¶
func (c HeadersCarrier) Set(key string, value string)
type PublishRequest ¶
type Publisher ¶
type Publisher struct { Publisher *rabbitmq.Publisher // contains filtered or unexported fields }
type PublisherOptions ¶
type PublisherOptions struct {
// contains filtered or unexported fields
}
type PublisherPool ¶
type PublisherPool struct {
// contains filtered or unexported fields
}
func (*PublisherPool) Publish ¶
func (pp *PublisherPool) Publish(ctx context.Context, topic Topic, message proto.Message, options ...func(*PublisherOptions)) error
Publish publishes a rabbit message Returns an error, only initialize it if needed, error already logged
func (*PublisherPool) PublishRPC ¶
func (pp *PublisherPool) PublishRPC(ctx context.Context, topic Topic, message proto.Message, options ...func(options *PublisherOptions)) ([]byte, error)
PublishRPC publishes a RPC message and waits for the reply
func (*PublisherPool) PublishRPCWithMultipleResponses ¶
func (pp *PublisherPool) PublishRPCWithMultipleResponses(ctx context.Context, topic Topic, message proto.Message, nrResponses int, options ...func(publisherOptions *PublisherOptions)) (chan ReplyResponse, error)
func (*PublisherPool) Respond ¶
func (pp *PublisherPool) Respond(ctx context.Context, correlationID string, topic Topic, message proto.Message, options ...func(*PublisherOptions)) error
Respond publishes a response to a rabbit message Set isError to true if the reply is an error, otherwise pass false to indicate valid response Returns an error, only initialize it if necessary
func (*PublisherPool) RespondWithError ¶
func (pp *PublisherPool) RespondWithError(ctx context.Context, correlationID string, topic Topic, message *grpc.Error, options ...func(*PublisherOptions)) error
type Rabbit ¶
type Rabbit struct { Consumer Consumer Publisher PublisherPool Exchange Exchange // contains filtered or unexported fields }
func New ¶
func New(configuration Configuration, serviceExchange Exchange, obs observability.Observability, optionFuncts ...func(*Options)) (*Rabbit, error)
New creates and returns a new rabbit client with given configuration
func (*Rabbit) Disconnect ¶
Disconnect disconnects all rabbit connections
type ReplyPool ¶
type ReplyPool struct { Request chan ReplyRequest Response chan ReplyResponse Cancel chan string Clients map[string]*client }
func NewReplyPool ¶
NewReplyPool creates and returns a new ReplyPool
type ReplyRequest ¶
type ReplyRequest struct { CorrelationId string RequestChan chan ReplyResponse // ExpectedResponsesNr represents the number of responses // expected for this client ExpectedResponsesNr int }
type ReplyResponse ¶
type TopicBuilder ¶
type TopicBuilder struct {
// contains filtered or unexported fields
}
func NewTopic ¶
func NewTopic(service Exchange) *TopicBuilder
NewTopic creates a new topic that starts with the exchange of the receiving service
func (*TopicBuilder) AddWord ¶
func (rh *TopicBuilder) AddWord(word TopicWord) *TopicBuilder
AddWord adds a new word to the topic
type TopicConsumer ¶
type TopicConsumer struct {
// contains filtered or unexported fields
}