Documentation
¶
Index ¶
- Variables
- func InitializeContext(ctx context.Context) context.Context
- func WaitForWithWantedErrorNormalizer(wantErr bool, err error, propertiesType string) error
- type AMQPService
- func (a AMQPService) ChannelClose(channel *amqp.Channel) error
- func (a AMQPService) ChannelConsume(channel *amqp.Channel, queue string) (<-chan amqp.Delivery, error)
- func (a AMQPService) ChannelExchangeDeclare(channel *amqp.Channel, name string) error
- func (a AMQPService) ChannelPublish(channel *amqp.Channel, exchange string, msg amqp.Publishing) error
- func (a AMQPService) ChannelQueueBind(channel *amqp.Channel, name, exchange string) error
- func (a AMQPService) ChannelQueueDeclare(channel *amqp.Channel) (amqp.Queue, error)
- func (a AMQPService) ConnectionChannel(connection *amqp.Connection) (*amqp.Channel, error)
- func (a AMQPService) Dial(url string) (*amqp.Connection, error)
- type AMQPServiceFuncMock
- func (a AMQPServiceFuncMock) ChannelClose(channel *amqp.Channel) error
- func (a AMQPServiceFuncMock) ChannelConsume(channel *amqp.Channel, queue string) (<-chan amqp.Delivery, error)
- func (a AMQPServiceFuncMock) ChannelExchangeDeclare(channel *amqp.Channel, name string) error
- func (a AMQPServiceFuncMock) ChannelPublish(channel *amqp.Channel, exchange string, msg amqp.Publishing) error
- func (a AMQPServiceFuncMock) ChannelQueueBind(channel *amqp.Channel, name, exchange string) error
- func (a AMQPServiceFuncMock) ChannelQueueDeclare(channel *amqp.Channel) (amqp.Queue, error)
- func (a AMQPServiceFuncMock) ConnectionChannel(c *amqp.Connection) (*amqp.Channel, error)
- func (a AMQPServiceFuncMock) Dial(url string) (*amqp.Connection, error)
- type AMQPServiceFunctions
- type ContextKey
- type Logger
- type Session
- func (s *Session) ConfigureConnection(ctx context.Context, uri string) error
- func (s *Session) ConfigureHeaders(ctx context.Context, t *godog.Table) error
- func (s *Session) ConfigureStandardProperties(ctx context.Context, t *godog.Table) error
- func (s *Session) PublishJSONMessage(ctx context.Context, topic string, t *godog.Table) error
- func (s *Session) PublishTextMessage(ctx context.Context, topic, message string) error
- func (s *Session) SubscribeTopic(ctx context.Context, topic string) error
- func (s *Session) Unsubscribe(ctx context.Context) error
- func (s *Session) ValidateMessageHeaders(ctx context.Context, t *godog.Table) error
- func (s *Session) ValidateMessageJSONBody(ctx context.Context, t *godog.Table, pos int) error
- func (s *Session) ValidateMessageStandardProperties(props amqp.Delivery) error
- func (s *Session) ValidateMessageTextBody(ctx context.Context, expectedMsg string) error
- func (s *Session) WaitForJSONMessageWithProperties(ctx context.Context, timeout time.Duration, t *godog.Table, wantErr bool) error
- func (s *Session) WaitForMessagesWithStandardProperties(ctx context.Context, timeout time.Duration, count int, strictly bool, ...) error
- func (s *Session) WaitForTextMessage(ctx context.Context, timeout time.Duration, expectedMsg string) error
- type Steps
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func InitializeContext ¶
InitializeContext adds the rabbit session to the context. The new context is returned because context is immutable.
Types ¶
type AMQPService ¶ added in v0.16.0
type AMQPService struct{}
func NewAMQPService ¶ added in v0.16.0
func NewAMQPService() *AMQPService
func (AMQPService) ChannelClose ¶ added in v0.16.0
func (a AMQPService) ChannelClose(channel *amqp.Channel) error
func (AMQPService) ChannelConsume ¶ added in v0.16.0
func (AMQPService) ChannelExchangeDeclare ¶ added in v0.16.0
func (a AMQPService) ChannelExchangeDeclare(channel *amqp.Channel, name string) error
func (AMQPService) ChannelPublish ¶ added in v0.16.0
func (a AMQPService) ChannelPublish(channel *amqp.Channel, exchange string, msg amqp.Publishing, ) error
func (AMQPService) ChannelQueueBind ¶ added in v0.16.0
func (a AMQPService) ChannelQueueBind(channel *amqp.Channel, name, exchange string) error
func (AMQPService) ChannelQueueDeclare ¶ added in v0.16.0
func (AMQPService) ConnectionChannel ¶ added in v0.16.0
func (a AMQPService) ConnectionChannel(connection *amqp.Connection) (*amqp.Channel, error)
func (AMQPService) Dial ¶ added in v0.16.0
func (a AMQPService) Dial(url string) (*amqp.Connection, error)
type AMQPServiceFuncMock ¶ added in v0.16.0
type AMQPServiceFuncMock struct{}
func (AMQPServiceFuncMock) ChannelClose ¶ added in v0.16.0
func (a AMQPServiceFuncMock) ChannelClose(channel *amqp.Channel) error
func (AMQPServiceFuncMock) ChannelConsume ¶ added in v0.16.0
func (AMQPServiceFuncMock) ChannelExchangeDeclare ¶ added in v0.16.0
func (a AMQPServiceFuncMock) ChannelExchangeDeclare(channel *amqp.Channel, name string) error
func (AMQPServiceFuncMock) ChannelPublish ¶ added in v0.16.0
func (a AMQPServiceFuncMock) ChannelPublish(channel *amqp.Channel, exchange string, msg amqp.Publishing, ) error
func (AMQPServiceFuncMock) ChannelQueueBind ¶ added in v0.16.0
func (a AMQPServiceFuncMock) ChannelQueueBind(channel *amqp.Channel, name, exchange string) error
func (AMQPServiceFuncMock) ChannelQueueDeclare ¶ added in v0.16.0
func (AMQPServiceFuncMock) ConnectionChannel ¶ added in v0.16.0
func (a AMQPServiceFuncMock) ConnectionChannel(c *amqp.Connection) (*amqp.Channel, error)
func (AMQPServiceFuncMock) Dial ¶ added in v0.16.0
func (a AMQPServiceFuncMock) Dial(url string) (*amqp.Connection, error)
type AMQPServiceFunctions ¶ added in v0.16.0
type AMQPServiceFunctions interface { Dial(url string) (*amqp.Connection, error) ConnectionChannel(c *amqp.Connection) (*amqp.Channel, error) ChannelExchangeDeclare(channel *amqp.Channel, name string) error ChannelQueueDeclare(channel *amqp.Channel) (amqp.Queue, error) ChannelQueueBind(channel *amqp.Channel, name, exchange string) error ChannelConsume(channel *amqp.Channel, queue string, ) (<-chan amqp.Delivery, error) ChannelClose(channel *amqp.Channel) error ChannelPublish(channel *amqp.Channel, exchange string, msg amqp.Publishing, ) error }
type ContextKey ¶
type ContextKey string
ContextKey defines a type to store the rabbit session in context.Context.
type Logger ¶
Logger logs in a configurable file.
func GetLogger ¶
func GetLogger() *Logger
GetLogger returns the logger for rabbit messages in publish/subscribe. If the logger is not created yet, it creates a new instance of Logger.
func (Logger) LogPublishedMessage ¶
LogPublishedMessage logs a rabbit message published to a topic.
func (Logger) LogReceivedMessage ¶
LogReceivedMessage logs a rabbit message received from a topic.
func (Logger) LogSubscribedTopic ¶
LogSubscribedTopic logs the subscription to a rabbit topic.
type Session ¶
type Session struct { Connection *amqp.Connection // Messages received from the publish/subscribe channel Messages []amqp.Delivery // Messages consumed from the publish/subscribe channel ConsumedMessages []amqp.Delivery // Correlator is used to correlate the messages for a specific session Correlator string // ampq service AMQPServiceClient AMQPServiceFunctions // contains filtered or unexported fields }
Session contains the information of a rabbit session.
func GetSession ¶
GetSession returns the rabbit session stored in context. Note that the context should be previously initialized with InitializeContext function.
func (*Session) ConfigureConnection ¶
ConfigureConnection creates a rabbit connection based on the URI.
func (*Session) ConfigureHeaders ¶
ConfigureHeaders stores a table of rabbit headers in the application context.
func (*Session) ConfigureStandardProperties ¶
ConfigureStandardProperties stores a table of rabbit properties in the application context.
func (*Session) PublishJSONMessage ¶
PublishJSONMessage publishes a JSON message in a rabbit topic.
func (*Session) PublishTextMessage ¶
PublishTextMessage publishes a text message in a rabbit topic.
func (*Session) SubscribeTopic ¶
SubscribeTopic subscribes to a rabbit topic to receive messages via a channel.
func (*Session) Unsubscribe ¶
Unsubscribe unsubscribes from rabbit closing the channel associated. If this method is not invoked, then the goroutine created with SubscribeTopic is never closed and will permanently processing messages from the topic until the program is finished.
func (*Session) ValidateMessageHeaders ¶
ValidateMessageHeaders checks if the message rabbit headers are equal the expected values.
func (*Session) ValidateMessageJSONBody ¶
ValidateMessageJSONBody checks if the message json body properties of message in position 'pos' are equal the expected values. if pos == -1 then it means last message stored, that is the one stored in s.msg
func (*Session) ValidateMessageStandardProperties ¶
ValidateMessageStandardProperties checks if the message standard rabbit properties are equal the expected values.
func (*Session) ValidateMessageTextBody ¶
ValidateMessageTextBody checks if the message text body is equal to the expected value.
func (*Session) WaitForJSONMessageWithProperties ¶
func (s *Session) WaitForJSONMessageWithProperties(ctx context.Context, timeout time.Duration, t *godog.Table, wantErr bool, ) error
WaitForJSONMessageWithProperties waits up to timeout and verifies if there is a message received in the topic with the requested properties. When wantErr is set to true function returns error if message is found with the JSON properties and returns no error when message is not found after timeout.
func (*Session) WaitForMessagesWithStandardProperties ¶
func (s *Session) WaitForMessagesWithStandardProperties( ctx context.Context, timeout time.Duration, count int, strictly bool, t *godog.Table, wantErr bool, ) error
WaitForMessagesWithStandardProperties waits for 'count' messages with standard rabbit properties that are equal to the expected values. If strictly is set to true, function loops through all received messages to check that the amount exactly matches the expected When wantErr is set to true function returns error if message is found with the JSON properties and returns no error when message is not found after timeout.