rabbit

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TopicExchange  exchangeType = "topic"
	DirectExchange exchangeType = "direct"
	FanoutExchange exchangeType = "fanout"
)

Variables

View Source
var ErrResponse = errors.New("responded with an error")

Functions

func ExtractRabbitHeaders

func ExtractRabbitHeaders(ctx context.Context, carrier rabbitmq.Table) context.Context

func InjectRabbitHeaders

func InjectRabbitHeaders(ctx context.Context) rabbitmq.Table

func NewError

func NewError(errorMessage string, errorCode grpc.ErrorCode) *grpc.Error

func WithConcurrentReplyConsumer

func WithConcurrentReplyConsumer(number int) func(options *Options)

WithConcurrentReplyConsumer sets the number of go routines that will handle the reply queue

func WithHeader

func WithHeader(header []HeaderValue) func(options *PublisherOptions)

func WithLogger

func WithLogger(logger rabbitmq.Logger) func(options *Options)

func WithMultiplePublishers

func WithMultiplePublishers(number int) func(options *Options)

WithMultiplePublishers sets the number of routines running a publisher, each has it's own TCP connection

Types

type Configuration

type Configuration struct {
	// Address is the address for connecting to the RabbitMQ instance
	Address string `validate:"required" json:"address" yaml:"address"`

	// Username for authentication to the RabbitMQ instance
	Username string `json:"username" yaml:"username"`

	// Password for authentication to the RabbitMQ instance
	Password string `json:"password" yaml:"password"`
}

Configuration AMQP basic configuration for the message bus

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) NewChargePointConsumer

func (cm *Consumer) NewChargePointConsumer(topic Topic, handler HandlerFunc) (*TopicConsumer, error)

func (*Consumer) NewNotificationConsumer

func (cm *Consumer) NewNotificationConsumer(topic Topic, queueName string, handler HandlerFunc, routines int) (*TopicConsumer, error)

NewNotificationConsumer creates a new consumer of notifications for given topic, with routing key and handler function, the returned consumer should only be used for disconnecting - routines sets the number of go routines handling the consumer, 1 should suffice in general - returns the consumer and error, only initialize them if necessary - to achieve load balancing, queueName should be set in the form of <consumer-service>.<topic>

func (*Consumer) NewService1Consumer

func (cm *Consumer) NewService1Consumer(topic Topic, handler HandlerFunc) (*TopicConsumer, error)

NewServiceConsumer creates a new service topic consumer for given topic and handler function If durable is set to true, the queue will be of quorum type which is durable and persists through restarts routines sets the number of go routines handling the consumer, 1 should suffice in general returns the consumer and error, only initialize them if necessary the returned consumer should only be used for disconnecting

func (*Consumer) NewServiceConsumer

func (cm *Consumer) NewServiceConsumer(topic Topic, handler HandlerFunc, durable bool, routines int) (*TopicConsumer, error)

NewServiceConsumer creates a new service topic consumer for given topic and handler function If durable is set to true, the queue will be of quorum type which is durable and persists through restarts routines sets the number of go routines handling the consumer, 1 should suffice in general returns the consumer and error, only initialize them if necessary the returned consumer should only be used for disconnecting

type Exchange

type Exchange string
const (
	CentralExchange Exchange = "CENTRAL"

	GlobalNotificationExchange Exchange = "NOTIFICATION"
)

Exchanges

func (Exchange) String

func (t Exchange) String() string

type HandlerFunc

type HandlerFunc func(ctx context.Context, d rabbitmq.Delivery) (action rabbitmq.Action)
type Header struct {
	// contains filtered or unexported fields
}

func NewHeader

func NewHeader() *Header

func (*Header) Build

func (rh *Header) Build() []HeaderValue

func (*Header) WithError

func (rh *Header) WithError(err bool) *Header

func (*Header) WithField

func (rh *Header) WithField(key HeaderKey, value any) *Header

When adding custom fields, make sure the value is supported: https://pkg.go.dev/github.com/wagslane/go-rabbitmq#Table

func (*Header) WithMethod

func (rh *Header) WithMethod(method TopicWord) *Header

type HeaderKey

type HeaderKey string
const (
	HeaderKeyError     HeaderKey = "error"
	HeaderKeyMethod    HeaderKey = "method"
	HeaderKeyReplyType HeaderKey = "reply_type"
)

type HeaderReplyType

type HeaderReplyType string
const (
	HeaderReplyTypeAcknowledge        HeaderReplyType = "acknowledge"
	HeaderReplyTypeInterceptedMessage HeaderReplyType = "intercepted_message"
)

type HeaderValue

type HeaderValue struct {
	Key   HeaderKey
	Value any
}

type HeadersCarrier

type HeadersCarrier rabbitmq.Table

func (HeadersCarrier) Get

func (c HeadersCarrier) Get(key string) string

func (HeadersCarrier) Keys

func (c HeadersCarrier) Keys() []string

func (HeadersCarrier) Set

func (c HeadersCarrier) Set(key string, value string)

type Options

type Options struct {
	// contains filtered or unexported fields
}

type PublishRequest

type PublishRequest struct {
	Ctx             context.Context
	Topic           Topic
	CorrelationId   string
	Message         proto.Message
	Options         []func(*PublisherOptions)
	ResponseChannel chan error
}

type Publisher

type Publisher struct {
	Publisher *rabbitmq.Publisher
	// contains filtered or unexported fields
}

func (*Publisher) Publish

func (pb *Publisher) Publish(ctx context.Context, topic string, message proto.Message, correlationID string, replyTopic Topic, optionFuncs ...func(*PublisherOptions)) error

type PublisherOptions

type PublisherOptions struct {
	// contains filtered or unexported fields
}

type PublisherPool

type PublisherPool struct {
	// contains filtered or unexported fields
}

func (*PublisherPool) Publish

func (pp *PublisherPool) Publish(ctx context.Context, topic Topic, message proto.Message, options ...func(*PublisherOptions)) error

Publish publishes a rabbit message Returns an error, only initialize it if needed, error already logged

func (*PublisherPool) PublishRPC

func (pp *PublisherPool) PublishRPC(ctx context.Context, topic Topic, message proto.Message, options ...func(options *PublisherOptions)) ([]byte, error)

PublishRPC publishes a RPC message and waits for the reply

func (*PublisherPool) PublishRPCWithMultipleResponses

func (pp *PublisherPool) PublishRPCWithMultipleResponses(ctx context.Context, topic Topic, message proto.Message, nrResponses int, options ...func(publisherOptions *PublisherOptions)) (chan ReplyResponse, error)

func (*PublisherPool) Respond

func (pp *PublisherPool) Respond(ctx context.Context, correlationID string, topic Topic, message proto.Message, options ...func(*PublisherOptions)) error

Respond publishes a response to a rabbit message Set isError to true if the reply is an error, otherwise pass false to indicate valid response Returns an error, only initialize it if necessary

func (*PublisherPool) RespondWithError

func (pp *PublisherPool) RespondWithError(ctx context.Context, correlationID string, topic Topic, message *grpc.Error, options ...func(*PublisherOptions)) error

type Rabbit

type Rabbit struct {
	Consumer  Consumer
	Publisher PublisherPool

	Exchange Exchange
	// contains filtered or unexported fields
}

func New

func New(configuration Configuration, serviceExchange Exchange, obs observability.Observability, optionFuncts ...func(*Options)) (*Rabbit, error)

New creates and returns a new rabbit client with given configuration

func (*Rabbit) Disconnect

func (c *Rabbit) Disconnect() error

Disconnect disconnects all rabbit connections

func (*Rabbit) Name

func (c *Rabbit) Name() string

func (*Rabbit) Pass

func (c *Rabbit) Pass() bool

type ReplyPool

type ReplyPool struct {
	Request  chan ReplyRequest
	Response chan ReplyResponse
	Cancel   chan string
	Clients  map[string]*client
}

func NewReplyPool

func NewReplyPool(bufferSize int) ReplyPool

NewReplyPool creates and returns a new ReplyPool

type ReplyRequest

type ReplyRequest struct {
	CorrelationId string
	RequestChan   chan ReplyResponse
	// ExpectedResponsesNr represents the number of responses
	// expected for this client
	ExpectedResponsesNr int
}

type ReplyResponse

type ReplyResponse struct {
	CorrelationId string
	Body          []byte
	Error         bool
	Headers       map[string]any
}

type Topic

type Topic string

func (Topic) String

func (t Topic) String() string

type TopicBuilder

type TopicBuilder struct {
	// contains filtered or unexported fields
}

func NewTopic

func NewTopic(service Exchange) *TopicBuilder

NewTopic creates a new topic that starts with the exchange of the receiving service

func (*TopicBuilder) AddWord

func (rh *TopicBuilder) AddWord(word TopicWord) *TopicBuilder

AddWord adds a new word to the topic

func (*TopicBuilder) Build

func (rh *TopicBuilder) Build() Topic

Build returns a topic string

type TopicConsumer

type TopicConsumer struct {
	// contains filtered or unexported fields
}

func (*TopicConsumer) Close

func (tc *TopicConsumer) Close()

Close closes the consumer

type TopicWord

type TopicWord string

func (TopicWord) String

func (t TopicWord) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL