Documentation ¶
Index ¶
- Variables
- type CallbackFunction
- type CallbackRegistry
- type Consumer
- type ConsumerBackend
- type Decoder
- type Encoder
- type EncoderDecoder
- type GetLoggerFunc
- type Instrumenter
- type ListenRequest
- type Logger
- type LoggingFields
- type Message
- type MessageRouting
- type MessageTypeMajorVersion
- type MetaAttributes
- type Publisher
- type PublisherBackend
- type QueueConsumer
- func (c *QueueConsumer) ListenForMessages(ctx context.Context, request ListenRequest) error
- func (c *QueueConsumer) RequeueDLQ(ctx context.Context, request ListenRequest) error
- func (c *QueueConsumer) WithInstrumenter(instrumenter Instrumenter) *QueueConsumer
- func (c *QueueConsumer) WithUseTransportMessageAttributes(useTransportMessageAttributes bool)
- type ReceivedMessage
- type StdLogger
Constants ¶
This section is empty.
Variables ¶
var ErrRetry = errors.New("Retry error")
ErrRetry should cause the task to retry, but not treat the retry as an error
Functions ¶
This section is empty.
Types ¶
type CallbackFunction ¶
CallbackFunction is the function signature for a hedwig callback function
type CallbackRegistry ¶
type CallbackRegistry map[MessageTypeMajorVersion]CallbackFunction
CallbackRegistry is a map of message type and major versions to callback functions
type ConsumerBackend ¶ added in v0.8.0
type ConsumerBackend interface { // Receive messages from configured queue(s) and provide it through the channel. This should run indefinitely // until the context is canceled. Provider metadata should include all info necessary to ack/nack a message. // The channel must not be closed by the backend. Receive(ctx context.Context, numMessages uint32, visibilityTimeout time.Duration, messageCh chan<- ReceivedMessage) error // NackMessage nacks a message on the queue NackMessage(ctx context.Context, providerMetadata interface{}) error // AckMessage acknowledges a message on the queue AckMessage(ctx context.Context, providerMetadata interface{}) error // RequeueDLQ re-queues everything in the Hedwig DLQ back into the Hedwig queue RequeueDLQ(ctx context.Context, numMessages uint32, visibilityTimeout time.Duration) error }
ConsumerBackend is used for consuming messages from a transport
type Decoder ¶ added in v0.8.0
type Decoder interface { // DecodeData validates and decodes data DecodeData(messageType string, version *semver.Version, data interface{}) (interface{}, error) // ExtractData extracts data from the on-the-wire payload when not using message transport ExtractData(messagePayload []byte, attributes map[string]string) (MetaAttributes, interface{}, error) // DecodeMessageType decodes message type from meta attributes DecodeMessageType(schema string) (string, *semver.Version, error) }
Decoder is responsible for decoding the message payload in appropriate format from over the wire transport format
type Encoder ¶ added in v0.8.0
type Encoder interface { // EncodeData encodes the message with appropriate format for transport over the wire EncodeData(data interface{}, useMessageTransport bool, metaAttrs MetaAttributes) ([]byte, error) // EncodeMessageType encodes the message type with appropriate format for transport over the wire EncodeMessageType(messageType string, version *semver.Version) string // VerifyKnownMinorVersion checks that message version is known to us VerifyKnownMinorVersion(messageType string, version *semver.Version) error }
Encoder is responsible for encoding the message payload in appropriate format for over the wire transport
type EncoderDecoder ¶ added in v0.9.0
EncoderDecoder can both encode and decode messages
type GetLoggerFunc ¶
GetLoggerFunc returns the logger object
func LogrusGetLoggerFunc ¶
func LogrusGetLoggerFunc(fn func(ctx context.Context) *logrus.Entry) GetLoggerFunc
type Instrumenter ¶ added in v0.7.0
type Instrumenter interface { // OnReceive is called as soon as possible after a message is received from the backend. Caller must call // the returned finalized function when processing for the message is finished (typically done via defer). // The context must be replaced with the returned context for the remainder of the operation. // This is where a new span must be started. OnReceive(ctx context.Context, attributes map[string]string) (context.Context, func()) // OnMessageDeserialized is called when a message has been received from the backend and decoded // This is where span attributes, such as name, may be updated. OnMessageDeserialized(ctx context.Context, message *Message) // OnPublish is called right before a message is published. Caller must call // the returned finalized function when publishing for the message is finished (typically done via defer). // The attributes may be updated to include trace id for downstream consumers. OnPublish(ctx context.Context, message *Message, attributes map[string]string) (context.Context, map[string]string, func()) }
Instrumenter defines the interface for Hedwig's instrumentation
type ListenRequest ¶
type ListenRequest struct { // How many messages to fetch at one time NumMessages uint32 // default 1 // How long should the message be hidden from other consumers? VisibilityTimeout time.Duration // defaults to queue configuration // How many goroutines to spin for processing messages concurrently NumConcurrency uint32 // default 1 }
ListenRequest represents a request to listen for messages
type Logger ¶ added in v0.8.0
type Logger interface { // Error logs an error with a message. `fields` can be used as additional metadata for structured logging. // You can generally expect one of these fields to be available: message_sqs_id, message_sns_id. // By default fields are logged as a map using fmt.Sprintf Error(err error, message string, fields LoggingFields) // Warn logs a warn level log with a message. `fields` param works the same as `Error`. Warn(err error, message string, fields LoggingFields) // Info logs a debug level log with a message. `fields` param works the same as `Error`. Info(message string, fields LoggingFields) // Debug logs a debug level log with a message. `fields` param works the same as `Error`. Debug(message string, fields LoggingFields) }
Logger represents an logging interface that this library expects
type LoggingFields ¶
type LoggingFields map[string]interface{}
type Message ¶
type Message struct { Data interface{} Type string DataSchemaVersion *semver.Version ID string Metadata metadata }
Message model for hedwig messages.
type MessageRouting ¶ added in v0.2.0
type MessageRouting map[MessageTypeMajorVersion]string
MessageRouting is a map of message type and major versions to Hedwig topics
type MessageTypeMajorVersion ¶ added in v0.2.0
type MessageTypeMajorVersion struct { // Message type MessageType string // Message major version MajorVersion uint }
MessageTypeMajorVersion is a tuple of message typa and major version
type MetaAttributes ¶
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher handles hedwig publishing
func NewPublisher ¶
func NewPublisher(backend PublisherBackend, encoderDecoder EncoderDecoder, routing MessageRouting) *Publisher
NewPublisher creates a new Publisher.
`messageRouting`: Maps message type and major version to topic names
<message type>, <message version> => topic name
An entry is required for every message type that the app wants to publish. It is recommended that major versions of a message be published on separate topics.
func (*Publisher) WithInstrumenter ¶ added in v0.8.0
func (p *Publisher) WithInstrumenter(instrumenter Instrumenter)
func (*Publisher) WithUseTransportMessageAttributes ¶ added in v0.8.0
type PublisherBackend ¶ added in v0.8.0
type PublisherBackend interface { // Publish a message represented by the payload, with specified attributes to the specific topic Publish(ctx context.Context, message *Message, payload []byte, attributes map[string]string, topic string) (string, error) }
PublisherBackend is used to publish messages to a transport
type QueueConsumer ¶ added in v0.8.0
type QueueConsumer struct {
Consumer
}
func NewQueueConsumer ¶
func NewQueueConsumer(backend ConsumerBackend, decoder Decoder, getLogger GetLoggerFunc, registry CallbackRegistry) *QueueConsumer
func (*QueueConsumer) ListenForMessages ¶ added in v0.8.0
func (c *QueueConsumer) ListenForMessages(ctx context.Context, request ListenRequest) error
ListenForMessages starts a hedwig listener for the provided message types
func (*QueueConsumer) RequeueDLQ ¶ added in v0.8.0
func (c *QueueConsumer) RequeueDLQ(ctx context.Context, request ListenRequest) error
RequeueDLQ re-queues everything in the Hedwig DLQ back into the Hedwig queue
func (*QueueConsumer) WithInstrumenter ¶ added in v0.8.0
func (c *QueueConsumer) WithInstrumenter(instrumenter Instrumenter) *QueueConsumer
func (*QueueConsumer) WithUseTransportMessageAttributes ¶ added in v0.8.0
func (c *QueueConsumer) WithUseTransportMessageAttributes(useTransportMessageAttributes bool)
type ReceivedMessage ¶ added in v0.9.0
type ReceivedMessage struct { Payload []byte Attributes map[string]string ProviderMetadata interface{} }
ReceivedMessage is the message as received by a transport backend.
type StdLogger ¶ added in v0.8.0
type StdLogger struct{}
func (*StdLogger) Debug ¶ added in v0.8.0
func (s *StdLogger) Debug(message string, fields LoggingFields)
func (*StdLogger) Error ¶ added in v0.8.0
func (s *StdLogger) Error(err error, message string, fields LoggingFields)
func (*StdLogger) Info ¶ added in v0.8.0
func (s *StdLogger) Info(message string, fields LoggingFields)