async

package
v0.61.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2021 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package async provides async consumer abstractions and component.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DetermineDecoder

func DetermineDecoder(contentType string) (encoding.DecodeRawFunc, error)

DetermineDecoder determines the decoder based on the content type.

Types

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder gathers all required properties in order to construct a component

func New

func New(name string, cf ConsumerFactory, proc ProcessorFunc) *Builder

New initializes a new builder for a component with the given name by default the failStrategy will be NackExitStrategy.

func (*Builder) Create

func (cb *Builder) Create() (*Component, error)

Create constructs the Component applying

func (*Builder) WithConcurrency added in v0.48.0

func (cb *Builder) WithConcurrency(concurrency uint) *Builder

WithConcurrency specifies the number of worker goroutines for processing messages in parallel default value is '1' do NOT enable concurrency value for in-order consumers, such as Kafka or FIFO SQS

func (*Builder) WithFailureStrategy

func (cb *Builder) WithFailureStrategy(fs FailStrategy) *Builder

WithFailureStrategy defines the failure strategy to be used default value is NackExitStrategy it will append an error to the builder if the strategy is not one of the pre-defined ones.

func (*Builder) WithRetries

func (cb *Builder) WithRetries(retries uint) *Builder

WithRetries specifies the retry events number for the component default value is '0'.

func (*Builder) WithRetryWait

func (cb *Builder) WithRetryWait(retryWait time.Duration) *Builder

WithRetryWait specifies the duration for the component to wait between retries default value is '0' it will append an error to the builder if the value is smaller than '0'.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implementation of a async component.

func (*Component) Run

func (c *Component) Run(ctx context.Context) error

Run starts the consumer processing loop messages.

type Consumer

type Consumer interface {
	Consume(context.Context) (<-chan Message, <-chan error, error)
	OutOfOrder() bool
	Close() error
}

Consumer interface which every specific consumer has to implement.

type ConsumerFactory

type ConsumerFactory interface {
	Create() (Consumer, error)
}

ConsumerFactory interface for creating consumers.

type FailStrategy

type FailStrategy int

FailStrategy type definition.

const (
	// NackExitStrategy does not acknowledge the message and exits the application on error.
	NackExitStrategy FailStrategy = iota
	// NackStrategy does not acknowledge the message, leaving it for reprocessing, and continues.
	NackStrategy
	// AckStrategy acknowledges message and continues.
	AckStrategy
)

type Message

type Message interface {
	Context() context.Context
	Decode(v interface{}) error
	Ack() error
	Nack() error
	Source() string
	Payload() []byte
	Raw() interface{}
}

Message interface for defining messages that are handled by the async component.

type ProcessorFunc

type ProcessorFunc func(Message) error

ProcessorFunc definition of a async processor.

Directories

Path Synopsis
Package amqp provides consumer implementation with included tracing capabilities.
Package amqp provides consumer implementation with included tracing capabilities.
Package kafka provides consumer abstractions and base functionality with included tracing capabilities.
Package kafka provides consumer abstractions and base functionality with included tracing capabilities.
group
Package group provides a consumer group implementation.
Package group provides a consumer group implementation.
simple
Package simple provides a simple consumer implementation without consumer groups.
Package simple provides a simple consumer implementation without consumer groups.
Package sqs provides consumer implementation with included tracing capabilities.
Package sqs provides consumer implementation with included tracing capabilities.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL