Documentation ¶
Index ¶
- func Log(v ...interface{})
- func Logf(format string, a ...interface{})
- func Marshal(m proto.Message, optFns ...func(*Metadata)) ([]byte, error)
- func MessageName(m proto.Message) string
- func SetLogger(l Logger)
- func WithCorrelationID(id string) func(*Metadata)
- func WithErrorHandler(fn func(error)) func(*SubscriberOptions)
- func WithPrefixNaming(stage, service string) func(*RegistryOptions)
- func WithQueueRegistry(r *Registry) func(*SubscriberOptions)
- func WithStore(s Store) func(*RegistryOptions)
- func WithTopicRegistry(r *Registry) func(*PublisherOptions)
- type Handler
- type Logger
- type Message
- type Metadata
- type Publisher
- type PublisherOptions
- type QueueOptions
- type Registry
- type RegistryOptions
- type SNS
- type SQS
- type Store
- type Subscriber
- type SubscriberOptions
- type TopicOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MessageName ¶
MessageName returns the message name with hyphen separation, e.g. my.package.MessageName -> my-package-MessageName
func WithCorrelationID ¶
WithCorrelationID sets the message correlation id
func WithErrorHandler ¶
func WithErrorHandler(fn func(error)) func(*SubscriberOptions)
WithErrorHandler configures the subscriber to use the specified error handler func
func WithPrefixNaming ¶
func WithPrefixNaming(stage, service string) func(*RegistryOptions)
WithPrefixNaming configures the registry to use prefix naming to support complex message routing It applies the following format, assuming a protobuf type name of package.Message:
topic: stage-package-Message queue: stage-service-package-Message error: stage-service-package-Message_error
func WithQueueRegistry ¶
func WithQueueRegistry(r *Registry) func(*SubscriberOptions)
WithQueueRegistry configures the subscriber to use the specified registry to resolve queues, creating them if they do not exist
func WithStore ¶
func WithStore(s Store) func(*RegistryOptions)
WithStore configures the registry to use the specified store
func WithTopicRegistry ¶
func WithTopicRegistry(r *Registry) func(*PublisherOptions)
WithTopicRegistry configures the subscriber to use the specified registry to resolve topics, creating them if they do not exist
Types ¶
type Handler ¶
type Handler interface { Message() proto.Message Handle(ctx context.Context, m proto.Message, md Metadata) error }
Handler represents a message handler
type Logger ¶
type Logger interface { Print(v ...interface{}) Printf(format string, a ...interface{}) }
Logger represents a logger
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher represents a publisher
func NewPublisher ¶
func NewPublisher(client SNS, optFns ...func(*PublisherOptions)) *Publisher
NewPublisher returns a new publisher
type PublisherOptions ¶
PublisherOptions represents a set of publisher options
type QueueOptions ¶
type QueueOptions struct { NameFn func(proto.Message) string ErrorNameFn func(proto.Message) string MaxReceiveCount int }
QueueOptions represents a set of queue options
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry represents an infrastructure registry
func NewRegistry ¶
func NewRegistry(snsc SNS, sqsc SQS, optFns ...func(*RegistryOptions)) *Registry
NewRegistry returns a new registry
type RegistryOptions ¶
type RegistryOptions struct { Store Store Topic TopicOptions Queue QueueOptions }
RegistryOptions represents a set of registry options
type SNS ¶
type SNS interface { Publish(ctx context.Context, params *sns.PublishInput, optFns ...func(*sns.Options)) (*sns.PublishOutput, error) aws.SNS }
SNS represents an sns client interface
type SQS ¶
type SQS interface { ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) aws.SQS }
SQS represents an sqs client interface
type Store ¶
type Store interface { GetOrSetTopicARN(ctx context.Context, topicName string, fn func() (string, error)) (string, error) GetOrSetQueueURL(ctx context.Context, queueName string, fn func() (string, error)) (string, error) }
Store represents a key value store
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber represents a subscriber
func NewSubscriber ¶
func NewSubscriber(client SQS, optFns ...func(*SubscriberOptions)) *Subscriber
NewSubscriber returns a new subscriber
type SubscriberOptions ¶
type SubscriberOptions struct { QueueURLFn func(context.Context, proto.Message) (string, error) ErrorFn func(error) MaxNumberOfMessages int ReceiveInterval time.Duration WaitTimeSeconds int VisibilityTimeoutSeconds int }
SubscriberOptions represents a set of subscriber options
type TopicOptions ¶
TopicOptions represents a set of topic options