Documentation
¶
Index ¶
- Constants
- Variables
- func LogMessage(msg ...string) string
- func NewDispatcher(logger logging.Logger, channel AMQPChannel, ...) *dispatcher
- func NewPublisher(logger logging.Logger, configs *configs.Configs, channel AMQPChannel) *publisher
- func NewRabbitMQError(msg string) error
- func NewTopology(l logging.Logger) *topology
- type AMQPChannel
- type AMQPConnection
- type ConsumerDefinition
- type ConsumerHandler
- type Dispatcher
- type ExchangeBindingDefinition
- type ExchangeDefinition
- type ExchangeKind
- type Publisher
- type QueueBindingDefinition
- type QueueDefinition
- func (q *QueueDefinition) DLQName() string
- func (q *QueueDefinition) Delete(d bool) *QueueDefinition
- func (q *QueueDefinition) Durable(d bool) *QueueDefinition
- func (q *QueueDefinition) Exclusive(e bool) *QueueDefinition
- func (q *QueueDefinition) RetryName() string
- func (q *QueueDefinition) WithDQL() *QueueDefinition
- func (q *QueueDefinition) WithRetry(ttl time.Duration, retries int64) *QueueDefinition
- func (q *QueueDefinition) WithTTL(ttl time.Duration) *QueueDefinition
- type RabbitMQError
- type Topology
Constants ¶
View Source
const (
JsonContentType = "application/json"
)
Variables ¶
View Source
var ( NullableChannelError = NewRabbitMQError("channel cant be null") NotFoundQueueDefinitionError = NewRabbitMQError("not found queue definition") InvalidDispatchParamsError = NewRabbitMQError("register dispatch with invalid parameters") QueueDefinitionNotFoundError = NewRabbitMQError("any queue definition was founded to the given queue") ReceivedMessageWithUnformattedHeaderError = NewRabbitMQError("received message with unformatted headers") RetryableError = NewRabbitMQError("error to process this message, retry latter") )
Functions ¶
func LogMessage ¶ added in v1.0.258
func NewDispatcher ¶
func NewDispatcher(logger logging.Logger, channel AMQPChannel, queueDefinitions map[string]*QueueDefinition) *dispatcher
func NewPublisher ¶ added in v1.0.258
func NewPublisher(logger logging.Logger, configs *configs.Configs, channel AMQPChannel) *publisher
func NewRabbitMQError ¶ added in v1.0.258
func NewTopology ¶
Types ¶
type AMQPChannel ¶
type AMQPChannel interface { ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error ExchangeBind(destination, key, source string, noWait bool, args amqp.Table) error QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error }
func NewChannel ¶ added in v1.0.258
func NewChannel(cfg *configs.RabbitMQConfigs, logger logging.Logger) (AMQPChannel, error)
type AMQPConnection ¶
type ConsumerDefinition ¶ added in v1.0.258
type ConsumerDefinition struct {
// contains filtered or unexported fields
}
type ConsumerHandler ¶
type Dispatcher ¶
type ExchangeBindingDefinition ¶ added in v1.0.258
type ExchangeBindingDefinition struct {
// contains filtered or unexported fields
}
func NewExchangeBiding ¶ added in v1.0.258
func NewExchangeBiding() *ExchangeBindingDefinition
type ExchangeDefinition ¶ added in v1.0.258
type ExchangeDefinition struct {
// contains filtered or unexported fields
}
func NewDirectExchange ¶ added in v1.0.258
func NewDirectExchange(name string) *ExchangeDefinition
func NewDirectExchanges ¶ added in v1.0.258
func NewDirectExchanges(names []string) []*ExchangeDefinition
func NewFanoutExchange ¶ added in v1.0.258
func NewFanoutExchange(name string) *ExchangeDefinition
func NewFanoutExchanges ¶ added in v1.0.258
func NewFanoutExchanges(names []string) []*ExchangeDefinition
func (*ExchangeDefinition) Delete ¶ added in v1.0.258
func (e *ExchangeDefinition) Delete(d bool) *ExchangeDefinition
func (*ExchangeDefinition) Durable ¶ added in v1.0.258
func (e *ExchangeDefinition) Durable(d bool) *ExchangeDefinition
func (*ExchangeDefinition) Params ¶ added in v1.0.258
func (e *ExchangeDefinition) Params(p map[string]any) *ExchangeDefinition
type ExchangeKind ¶
type ExchangeKind string
var ( FanoutExchange ExchangeKind = "fanout" DirectExchange ExchangeKind = "direct" )
func (ExchangeKind) String ¶ added in v1.0.258
func (k ExchangeKind) String() string
type QueueBindingDefinition ¶ added in v1.0.258
type QueueBindingDefinition struct {
// contains filtered or unexported fields
}
func NewQueueBinding ¶ added in v1.0.258
func NewQueueBinding() *QueueBindingDefinition
func (*QueueBindingDefinition) Exchange ¶ added in v1.0.258
func (b *QueueBindingDefinition) Exchange(name string) *QueueBindingDefinition
func (*QueueBindingDefinition) Queue ¶ added in v1.0.258
func (b *QueueBindingDefinition) Queue(name string) *QueueBindingDefinition
func (*QueueBindingDefinition) RoutingKey ¶ added in v1.0.258
func (b *QueueBindingDefinition) RoutingKey(key string) *QueueBindingDefinition
type QueueDefinition ¶ added in v1.0.258
type QueueDefinition struct {
// contains filtered or unexported fields
}
func NewQueue ¶
func NewQueue(name string) *QueueDefinition
func (*QueueDefinition) DLQName ¶ added in v1.0.258
func (q *QueueDefinition) DLQName() string
func (*QueueDefinition) Delete ¶ added in v1.0.258
func (q *QueueDefinition) Delete(d bool) *QueueDefinition
func (*QueueDefinition) Durable ¶ added in v1.0.258
func (q *QueueDefinition) Durable(d bool) *QueueDefinition
func (*QueueDefinition) Exclusive ¶ added in v1.0.258
func (q *QueueDefinition) Exclusive(e bool) *QueueDefinition
func (*QueueDefinition) RetryName ¶ added in v1.0.258
func (q *QueueDefinition) RetryName() string
func (*QueueDefinition) WithDQL ¶ added in v1.0.258
func (q *QueueDefinition) WithDQL() *QueueDefinition
func (*QueueDefinition) WithRetry ¶ added in v1.0.258
func (q *QueueDefinition) WithRetry(ttl time.Duration, retries int64) *QueueDefinition
func (*QueueDefinition) WithTTL ¶ added in v1.0.258
func (q *QueueDefinition) WithTTL(ttl time.Duration) *QueueDefinition
type RabbitMQError ¶ added in v1.0.258
type RabbitMQError struct {
// contains filtered or unexported fields
}
func (*RabbitMQError) Error ¶ added in v1.0.258
func (e *RabbitMQError) Error() string
type Topology ¶
type Topology interface { Channel(c AMQPChannel) Topology Queue(q *QueueDefinition) Topology Queues(queues []*QueueDefinition) Topology Exchange(e *ExchangeDefinition) Topology Exchanges(e []*ExchangeDefinition) Topology ExchangeBinding(b *ExchangeBindingDefinition) Topology QueueBinding(b *QueueBindingDefinition) Topology GetQueuesDefinition() map[string]*QueueDefinition GetQueueDefinition(queueName string) (*QueueDefinition, error) Apply() error }
Click to show internal directories.
Click to hide internal directories.