Documentation
¶
Index ¶
- Variables
- type AfterBatchFunc
- type Client
- type DecodeRequestFunc
- type DecoderError
- type HandlerError
- type InputFactory
- type Option
- func UseDecodeRequest(decodeRequest DecodeRequestFunc) Option
- func UseHandler(handler endpoint.Endpoint) Option
- func UseInputFactory(inputFactory InputFactory) Option
- func UseResponseHandler(responseHandler ...ResponseFunc) Option
- func WithAfterBatch(afterBatch AfterBatchFunc) Option
- func WithBaseContext(ctxFac func() context.Context) Option
- func WithBefore(before ...RequestFunc) Option
- func WithErrorHandler(errorHandler transport.ErrorHandler) Option
- func WithRunner(runner Runner) Option
- type RequestFunc
- type ResponseFunc
- type Runner
- type Subscriber
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var (
ErrAlreadyStarted = errors.New("already started")
)
Functions ¶
This section is empty.
Types ¶
type AfterBatchFunc ¶ added in v0.2.0
type Client ¶
type Client interface { ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) }
type DecodeRequestFunc ¶
type DecoderError ¶ added in v0.4.0
func (*DecoderError) Error ¶ added in v0.4.0
func (obj *DecoderError) Error() string
type HandlerError ¶ added in v0.3.0
HandlerError is used for triggering the error handler when the handler returns an error.
func (*HandlerError) Error ¶ added in v0.3.0
func (obj *HandlerError) Error() string
type InputFactory ¶ added in v0.5.0
type InputFactory func() (params *sqs.ReceiveMessageInput, optFns []func(*sqs.Options))
type Option ¶ added in v0.5.0
type Option func(*Subscriber)
func UseDecodeRequest ¶ added in v0.5.0
func UseDecodeRequest(decodeRequest DecodeRequestFunc) Option
UseDecodeRequest sets the decoder function which is required.
func UseHandler ¶ added in v0.5.0
UseHandler sets the handler function which is required.
func UseInputFactory ¶ added in v0.5.0
func UseInputFactory(inputFactory InputFactory) Option
UseInputFactory sets the InputFactory function which is required. It must return a non-nil params. It can return nil for optFns.
func UseResponseHandler ¶ added in v0.5.0
func UseResponseHandler(responseHandler ...ResponseFunc) Option
UseResponseHandler adds a ResponseHandler function which is required. Can be called multiple times. Any actions required after executing handler can take place here. Like deleting the message after being successfully processed.
func WithAfterBatch ¶ added in v0.5.0
func WithAfterBatch(afterBatch AfterBatchFunc) Option
WithAfterBatch adds an AfterBatch function which is optional. It is called after a batch of messages passed to the Runner.
func WithBaseContext ¶ added in v0.5.0
WithBaseContext sets the base context for the subscriber. If not provided, will be context.Background.
func WithBefore ¶ added in v0.5.0
func WithBefore(before ...RequestFunc) Option
WithBefore adds RequestFunc which is optional. Can be called multiple times. Can be used for starting a keep-in-flight hearbeat - an example. They run before DecodeRequest and can put additional data inside the context. If returns a nil context, it causes a panic.
func WithErrorHandler ¶ added in v0.5.0
func WithErrorHandler(errorHandler transport.ErrorHandler) Option
WithErrorHandler sets the error handler which is optional.
func WithRunner ¶ added in v0.5.0
WithRunner sets the Runner. If not provided, the default runner will be used. All the Befor functions, decoding the message, handling the message and handling the response are executed by the Runner.
type ResponseFunc ¶ added in v0.3.0
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber is a go-kit sqs transport. Before, DecodeRequest, Handler, and ResponseHandler run inside the same anonymous function. This anonymous function creates a context, which uses the BaseContext as the parent, and is canceled when the function execution is finished. This anonymous function is passed to the runner. AfterBatch is run after each batch of messages.
Example ¶
client := mockClient10msec() ctx, cancel := context.WithCancel(context.Background()) defer cancel() handled := make(chan struct{}) sut := New( UseHandler(func(ctx context.Context, request interface{}) (response interface{}, err error) { select { case <-handled: return nil, nil default: } defer close(handled) // processing the request inside the endpoint fmt.Println(request) return nil, nil }), UseInputFactory(defaultInputFactory), UseDecodeRequest(nopDecodeRequest), UseResponseHandler(nopResponseHandler...), ) go func() { _ = sut.Serve(ctx, client) }() <-handled
Output: MSG-1
func New ¶ added in v0.5.0
func New(options ...Option) *Subscriber
New returns a new Subscriber. Mandatory options start with "Use...". (maybe they should be explicit, might change later ¯\_(ツ)_/¯)
func (*Subscriber) Serve ¶
func (obj *Subscriber) Serve(ctx context.Context, l Client) error
Serve starts receiving messages from the queue and calling the handler on each.
func (*Subscriber) Shutdown ¶
func (obj *Subscriber) Shutdown()