Documentation ¶
Index ¶
Constants ¶
const DataTypeNumber = dataType("Number")
DataTypeNumber represents the Number datatype, use it when creating custom attributes
const DataTypeString = dataType("String")
DataTypeString represents the String datatype, use it when creating custom attributes
Variables ¶
var ErrBodyOverflow = newSQSErr("message surpasses sqs limit of 262144, please truncate body")
ErrBodyOverflow AWS SQS can only hold payloads of 262144 bytes. Messages must either be routed to s3 or truncated
var ErrGetMessage = newSQSErr("unable to retrieve message")
ErrGetMessage fires when a request to retrieve messages from sqs fails
var ErrInvalidCreds = newSQSErr("invalid aws credentials")
ErrInvalidCreds invalid credentials
var ErrInvalidVal = newSQSErr("value type does not match specified datatype")
ErrInvalidVal the custom attribute value must match the type of the custom attribute Datatype
var ErrMarshal = newSQSErr("unable to marshal request")
ErrMarshal unable to marshal request
var ErrMessageProcessing = newSQSErr("processing time exceeding limit")
ErrMessageProcessing occurs when a message has exceeded the consumption time limit set by aws SQS
var ErrNoRoute = newSQSErr("message received without a route")
ErrNoRoute message received without a route
var ErrPublish = newSQSErr("message publish failure. Retrying...")
ErrPublish If there is an error publishing a message. gosqs will wait 10 seconds and try again up to the configured retry count
var ErrQueueURL = newSQSErr("undefined queueURL")
ErrQueueURL undefined queueURL
var ErrUnableToDelete = newSQSErr("unable to delete item in queue")
ErrUnableToDelete unable to delete item
var ErrUnableToExtend = newSQSErr("unable to extend message processing time")
ErrUnableToExtend unable to extend message processing time
var ErrUndefinedPublisher = newSQSErr("sqs publisher is undefined")
ErrUndefinedPublisher invalid credentials
Functions ¶
Types ¶
type Adapter ¶
Adapter implements adapters in the context
func WithMiddleware ¶
WithMiddleware add middleware to the consumer service
func WithRecovery ¶
func WithRecovery(recovery func()) Adapter
WithRecovery is an adapter that logs a Panic error and recovers the service from a failed state
type Config ¶
type Config struct { // a way to provide custom session setup. A default based on key/secret will be used if not provided SessionProvider SessionProviderFunc // private key to access aws Key string // secret to access aws Secret string // region for aws and used for determining the topic ARN Region string // provided automatically by aws, but must be set for emulators or local testing Hostname string // account ID of the aws account, used for determining the topic ARN AWSAccountID string // environment name, used for determinig the topic ARN Env string // prefix of the topic, this is set as a prefix to the environment TopicPrefix string // optional address of the topic, if this is not provided it will be created using other variables TopicARN string // optional address of queue, if this is not provided it will be retrieved during setup QueueURL string // used to extend the allowed processing time of a message VisibilityTimeout int // used to determine how many attempts exponential backoff should use before logging an error RetryCount int // defines the total amount of goroutines that can be run by the consumer WorkerPool int // defines the total number of processing extensions that occur. Each proccessing extension will double the // visibilitytimeout counter, ensuring the handler has more time to process the message. Default is 2 extensions (1m30s processing time) // set to 0 to turn off extension processing ExtensionLimit *int // Add custom attributes to the message. This might be a correlationId or client meta information // custom attributes will be viewable on the sqs dashboard as meta data Attributes []customAttribute // Add a custom logger, the default will be log.Println Logger Logger }
Config defines the gosqs configuration
func (*Config) NewCustomAttribute ¶
NewCustomAttribute adds a custom attribute to SNS and SQS messages. This can include correlationIds, logIds, or any additional information you would like separate from the payload body. These attributes can be easily seen from the SQS console.
must use gosqs.DataTypeNumber of gosqs.DataTypeString for the datatype, the value must match the type provided
type Consumer ¶
type Consumer interface { // Consume polls for new messages and if it finds one, decodes it, sends it to the handler and deletes it // // A message is not considered dequeued until it has been sucessfully processed and deleted. There is a 30 Second // delay between receiving a single message and receiving the same message. This delay can be adjusted in the AWS // console and can also be extended during operation. If a message is successfully received 4 times but not deleted, // it will be considered unprocessable and sent to the DLQ automatically // // Consume uses long-polling to check and retrieve messages, if it is unable to make a connection, the aws-SDK will use its // advanced retrying mechanism (including exponential backoff), if all of the retries fail, then we will wait 10s before // trying again. // // When a new message is received, it runs in a separate go-routine that will handle the full consuming of the message, error reporting // and deleting Consume() // RegisterHandler registers an event listener and an associated handler. If the event matches, the handler will // be run RegisterHandler(name string, h Handler, adapters ...Adapter) // Message serves as the direct messaging capability within the consumer. A worker can send direct messages to other workers Message(ctx context.Context, queue, event string, body interface{}) // MessageSelf serves as the self messaging capability within the consumer, a worker can send messages to itself for continued // processing and resiliency MessageSelf(ctx context.Context, event string, body interface{}) }
Consumer provides an interface for receiving messages through AWS SQS and SNS
type Handler ¶
Handler provides a standardized handler method, this is the required function composition for event handlers
type Logger ¶
type Logger interface {
Println(v ...interface{})
}
Logger provides a simple interface to implement your own logging platform or use the default
type Message ¶
type Message interface { // Route returns the event name that is used for routing within a worker, e.g. post_published Route() string // Decode will unmarshal the message into a supplied output using json Decode(out interface{}) error // DecodeModified is used for decoding the modification message, it will populate the body with the actual message and a // map[string]interface{} to view original values from that message DecodeModified(out interface{}, changes interface{}) error // Attribute will return the custom attribute that was sent through out the request. Attribute(key string) string }
Message serves as the message interface for handling the message
type Notifier ¶
type Notifier interface {
ModelName() string
}
Notifier used for broadcasting messages
type Publisher ¶
type Publisher interface { // Create sends a message using a notifier, the modelname will be prepended to the static event, e.g post_created Create(n Notifier) // Delete sends a message using a notifier, the modelname will be prepended to the static event, e.g post_deleted Delete(n Notifier) // Update sends a message using a notifier, the modelname will be prepended to the static event, e.g post_updated Update(n Notifier) // Modify sends a message using a notifier, as a map of changes. The modelname will be prepended to the static event, e.g post_modified // // a special decoder will need to be used to process these events Modify(n Notifier, changes interface{}) // Dispatch sends a message using a notifier, the modelname will be prepended to the provided event, e.g post_published Dispatch(n Notifier, event string) // Message sends a direct message to an individual queue, the queueName(receiver) must be provided. The event will be sent // as is, no prepending will take place. No other queues will receive this message. Message(queue, message string, body interface{}) }
Publisher provides an interface for sending messages through AWS SQS and SNS
func Dispatcher ¶
Dispatcher retrieves the sqs dispatcher from the context for sending messeges
func MustDispatcher ¶
MustDispatcher retrieves the sqs dispatcher from the context for sending messeges or panics if the Dispatcher does not exist in the context
func NewPublisher ¶
NewPublisher creates a new SQS/SNS publisher instance
type SQSError ¶
type SQSError struct { Err string `json:"err"` // contains filtered or unexported fields }
SQSError defines the error handler for the gosqs package. SQSError satisfies the error interface and can be used safely with other error handlers
type SessionProviderFunc ¶
SessionProviderFunc can be used to add custom AWS session setup to the gosqs.Config. Callers simply need to implement this function type and set it as Config.SessionProvider. If Config.SessionProvider is not set (is nil), a default provider based on AWS Key/Secret will be used.