stream

package
v0.33.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2025 License: MIT Imports: 50 Imported by: 0

Documentation

Index

Constants

View Source
const (
	InputTypeFile     = "file"
	InputTypeInMemory = "inMemory"
	InputTypeKinesis  = "kinesis"
	InputTypeRedis    = "redis"
	InputTypeSns      = "sns"
	InputTypeSqs      = "sqs"
	InputTypeKafka    = "kafka"
)
View Source
const (
	AttributeSqsMessageId     = "sqsMessageId"
	AttributeSqsReceiptHandle = "sqsReceiptHandle"
)
View Source
const (
	AttributeEncoding    = "encoding"
	AttributeCompression = "compression"
)
View Source
const (
	AttributeKafkaKey            = "KafkaKey"
	MetaDataKafkaOriginalMessage = "KafkaOriginal"
)
View Source
const (
	PrmHandlerName      = "stream_messages_per_runner"
	PerRunnerMetricName = "StreamMessages"
)
View Source
const (
	OutputTypeFile     = "file"
	OutputTypeInMemory = "inMemory"
	OutputTypeKinesis  = "kinesis"
	OutputTypeMultiple = "multiple"
	OutputTypeNoOp     = "noop"
	OutputTypeRedis    = "redis"
	OutputTypeSns      = "sns"
	OutputTypeSqs      = "sqs"
	OutputTypeKafka    = "kafka"
)
View Source
const (
	AttributeKinesisPartitionKey    = "gosoline.kinesis.partitionKey"
	AttributeKinesisExplicitHashKey = "gosoline.kinesis.explicitHashKey"
)
View Source
const (
	AttributeAggregate      = "goso.aggregate"
	AttributeAggregateCount = "goso.aggregate.count"
)
View Source
const (
	AttributeRetry   = "goso.retry"
	AttributeRetryId = "goso.retry.id"
)
View Source
const (
	UnmarshallerMsg = "msg"
	UnmarshallerRaw = "raw"
	UnmarshallerSns = "sns"
)
View Source
const (
	ConfigKeyStream = "stream"
)
View Source
const MetadataKeyProducers = "stream.producers"
View Source
const SqsOutputBatchSize = 10

Variables

This section is empty.

Functions

func AddDefaultEncodeHandler

func AddDefaultEncodeHandler(handler EncodeHandler)

func AddMessageBodyEncoder

func AddMessageBodyEncoder(encoding EncodingType, encoder MessageBodyEncoder)

func ByteChunkToInterfaces

func ByteChunkToInterfaces(chunk Chunk) []interface{}

func ByteChunkToStrings

func ByteChunkToStrings(chunk Chunk) []string

func CompressMessage

func CompressMessage(compression CompressionType, body []byte) ([]byte, error)

func ConfigurableConsumerKey

func ConfigurableConsumerKey(name string) string

func ConfigurableConsumerRetryKey

func ConfigurableConsumerRetryKey(name string) string

func ConfigurableInputKey

func ConfigurableInputKey(name string) string

func ConfigurableOutputKey

func ConfigurableOutputKey(name string) string

func ConfigurableProducerKey

func ConfigurableProducerKey(name string) string

func ConsumerFactory

func ConsumerFactory(callbacks ConsumerCallbackMap) (map[string]kernel.ModuleFactory, error)

func DecodeMessage

func DecodeMessage(encoding EncodingType, data []byte, out interface{}) error

func DecompressMessage

func DecompressMessage(compression CompressionType, body []byte) ([]byte, error)

func EncodeMessage

func EncodeMessage(encoding EncodingType, data interface{}) ([]byte, error)

func GetAllConsumerNames added in v0.26.2

func GetAllConsumerNames(config cfg.Config) []string

func GosoToKafkaMessage

func GosoToKafkaMessage(msg *Message) kafka.Message

func GosoToKafkaMessages

func GosoToKafkaMessages(msgs ...*Message) []kafka.Message

func KafkaHeadersToGosoAttributes added in v0.12.0

func KafkaHeadersToGosoAttributes(headers []kafka.Header) map[string]string

func MessagesPerRunnerHandlerFactory added in v0.22.0

func MessagesPerRunnerHandlerFactory(ctx context.Context, config cfg.Config, logger log.Logger, calculatorSettings *calculator.CalculatorSettings) (calculator.Handler, error)

func NewBaseConsumer

func NewBaseConsumer(ctx context.Context, config cfg.Config, logger log.Logger, name string, consumerCallback BaseConsumerCallback) (*baseConsumer, error)

func NewBaseConsumerWithInterfaces

func NewBaseConsumerWithInterfaces(
	uuidGen uuid.Uuid,
	logger log.Logger,
	metricWriter metric.Writer,
	tracer tracing.Tracer,
	input Input,
	encoder MessageEncoder,
	retryInput Input,
	retryHandler RetryHandler,
	consumerCallback any,
	settings ConsumerSettings,
	name string,
	appId cfg.AppId,
) *baseConsumer

func NewBatchConsumer

func NewBatchConsumer(name string, callbackFactory BatchConsumerCallbackFactory) func(ctx context.Context, config cfg.Config, logger log.Logger) (kernel.Module, error)

func NewConsumer

func NewConsumer(name string, callbackFactory ConsumerCallbackFactory) func(ctx context.Context, config cfg.Config, logger log.Logger) (kernel.Module, error)

func NewConsumerFactory

func NewConsumerFactory(callbacks ConsumerCallbackMap) kernel.ModuleMultiFactory

func NewKafkaMessage

func NewKafkaMessage(writable WritableMessage) kafka.Message

func NewKafkaMessageAttrs

func NewKafkaMessageAttrs(key string) map[string]interface{}

func NewKafkaMessages

func NewKafkaMessages(ms []WritableMessage) []kafka.Message

func NewKinesisMessageHandler

func NewKinesisMessageHandler(channel chan *Message) kinesis.MessageHandler

func NewMessageEncoder

func NewMessageEncoder(config *MessageEncoderSettings) *messageEncoder

func NewMessagesPerRunnerHandlerWithInterfaces added in v0.22.0

func NewMessagesPerRunnerHandlerWithInterfaces(
	clock clock.Clock,
	cwClient gosoCloudwatch.Client,
	baseHandler calculator.PerRunnerMetricHandler,
	calculatorSettings *calculator.CalculatorSettings,
	handlerSettings *calculator.PerRunnerMetricSettings,
	queueNames []string,
) calculator.Handler

func NewOutputTracer

func NewOutputTracer(ctx context.Context, config cfg.Config, logger log.Logger, base Output, name string) (*outputTracer, error)

func NewOutputTracerWithInterfaces

func NewOutputTracerWithInterfaces(tracer tracing.Tracer, base Output, name string) *outputTracer

func NewProducer

func NewProducer(ctx context.Context, config cfg.Config, logger log.Logger, name string, handlers ...EncodeHandler) (*producer, error)

func NewProducerDaemon

func NewProducerDaemon(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*producerDaemon, error)

func NewProducerDaemonWithInterfaces

func NewProducerDaemonWithInterfaces(
	logger log.Logger,
	metric metric.Writer,
	aggregator ProducerDaemonAggregator,
	output Output,
	clock clock.Clock,
	name string,
	settings ProducerDaemonSettings,
) *producerDaemon

func NewProducerWithInterfaces

func NewProducerWithInterfaces(encoder MessageEncoder, output Output) *producer

func NewRetryHandler

func NewRetryHandler(ctx context.Context, config cfg.Config, logger log.Logger, consumerSettings *ConsumerRetrySettings, name string) (Input, RetryHandler, error)

func NewRetryHandlerSqs

func NewRetryHandlerSqs(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, RetryHandler, error)

func NewSnsInput

func NewSnsInput(ctx context.Context, config cfg.Config, logger log.Logger, settings *SnsInputSettings, targets []SnsInputTarget) (*snsInput, error)

func NewSnsInputWithInterfaces

func NewSnsInputWithInterfaces(sqsInput *sqsInput) *snsInput

func NewSqsInput

func NewSqsInput(ctx context.Context, config cfg.Config, logger log.Logger, settings *SqsInputSettings) (*sqsInput, error)

func NewSqsInputWithInterfaces

func NewSqsInputWithInterfaces(logger log.Logger, queue sqs.Queue, unmarshaller UnmarshallerFunc, settings *SqsInputSettings) *sqsInput

func ProducerDaemonFactory

func ProducerDaemonFactory(ctx context.Context, config cfg.Config, logger log.Logger) (map[string]kernel.ModuleFactory, error)

func ProvideProducerDaemon

func ProvideProducerDaemon(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*producerDaemon, error)

func ResetInMemoryInputs

func ResetInMemoryInputs()

func ResetInMemoryOutputs

func ResetInMemoryOutputs()

func SetInputFactory

func SetInputFactory(typ string, factory InputFactory)

func SnsMarshaller

func SnsMarshaller(msg *Message) (*string, error)

func WithDefaultMessageBodyEncoding

func WithDefaultMessageBodyEncoding(encoding EncodingType)

Types

type AcknowledgeableInput

type AcknowledgeableInput interface {
	Input
	// Ack acknowledges a single message. If possible, prefer calling AckBatch as it is more efficient.
	Ack(ctx context.Context, msg *Message, ack bool) error
	// AckBatch does the same as calling Ack for every single message would, but it might use fewer calls to an external
	// service.
	AckBatch(ctx context.Context, msgs []*Message, acks []bool) error
}

An AcknowledgeableInput is an Input with the additional ability to mark messages as successfully consumed. For example, an SQS queue would provide a message after its visibility timeout a second time if we didn't acknowledge it.

type AggregateFlush

type AggregateFlush struct {
	Attributes   map[string]string
	Body         string
	MessageCount int
}

type BaseConsumerCallback

type BaseConsumerCallback interface {
	GetModel(attributes map[string]string) any
}

type BaseOutputConfiguration

type BaseOutputConfiguration struct {
	Tracing BaseOutputConfigurationTracing `cfg:"tracing"`
}

func (*BaseOutputConfiguration) SetTracing

func (b *BaseOutputConfiguration) SetTracing(enabled bool)

type BaseOutputConfigurationAware

type BaseOutputConfigurationAware interface {
	SetTracing(enabled bool)
}

type BaseOutputConfigurationTracing

type BaseOutputConfigurationTracing struct {
	Enabled bool `cfg:"enabled" default:"true"`
}

type BatchConsumer

type BatchConsumer struct {
	// contains filtered or unexported fields
}

func NewBatchConsumerWithInterfaces

func NewBatchConsumerWithInterfaces(base *baseConsumer, callback BatchConsumerCallback, ticker *time.Ticker, settings *BatchConsumerSettings) *BatchConsumer

func (*BatchConsumer) Run

func (c *BatchConsumer) Run(kernelCtx context.Context) error

type BatchConsumerCallback

type BatchConsumerCallback interface {
	BaseConsumerCallback
	Consume(ctx context.Context, models []interface{}, attributes []map[string]string) ([]bool, error)
}

type BatchConsumerCallbackFactory

type BatchConsumerCallbackFactory func(ctx context.Context, config cfg.Config, logger log.Logger) (BatchConsumerCallback, error)

type BatchConsumerSettings

type BatchConsumerSettings struct {
	IdleTimeout time.Duration `cfg:"idle_timeout" default:"10s"`
	BatchSize   int           `cfg:"batch_size" default:"1"`
}

type Chunk

type Chunk [][]byte

type Chunks

type Chunks []Chunk

func BuildChunks

func BuildChunks(batch []WritableMessage, size int) (Chunks, error)

type CompressionType

type CompressionType string
const (
	CompressionNone CompressionType = "none"
	CompressionGZip CompressionType = "application/gzip"
)

func GetCompressionAttribute

func GetCompressionAttribute(attributes map[string]string) *CompressionType

GetCompressionAttribute returns the compression attribute if one is set, nil if none is set, and an error if the set value is of the wrong type.

func (CompressionType) String

func (s CompressionType) String() string

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumerWithInterfaces

func NewConsumerWithInterfaces(base *baseConsumer, callback ConsumerCallback) *Consumer

func (*Consumer) Run

func (c *Consumer) Run(kernelCtx context.Context) error

type ConsumerAcknowledge

type ConsumerAcknowledge struct {
	// contains filtered or unexported fields
}

func NewConsumerAcknowledgeWithInterfaces

func NewConsumerAcknowledgeWithInterfaces(logger log.Logger, input Input) ConsumerAcknowledge

func (*ConsumerAcknowledge) Acknowledge

func (c *ConsumerAcknowledge) Acknowledge(ctx context.Context, cdata *consumerData, ack bool)

func (*ConsumerAcknowledge) AcknowledgeBatch

func (c *ConsumerAcknowledge) AcknowledgeBatch(ctx context.Context, cdata []*consumerData, acks []bool)

type ConsumerCallback

type ConsumerCallback interface {
	BaseConsumerCallback
	Consume(ctx context.Context, model interface{}, attributes map[string]string) (bool, error)
}

type ConsumerCallbackFactory

type ConsumerCallbackFactory func(ctx context.Context, config cfg.Config, logger log.Logger) (ConsumerCallback, error)

type ConsumerCallbackMap

type ConsumerCallbackMap map[string]ConsumerCallbackFactory

type ConsumerMetadata

type ConsumerMetadata struct {
	Name         string `json:"name"`
	RetryEnabled bool   `json:"retry_enabled"`
	RetryType    string `json:"retry_type"`
	RunnerCount  int    `json:"runner_count"`
}

type ConsumerRetrySettings

type ConsumerRetrySettings struct {
	Enabled   bool          `cfg:"enabled"`
	Type      string        `cfg:"type" default:"sqs"`
	GraceTime time.Duration `cfg:"grace_time" default:"10s"`
}

type ConsumerSettings

type ConsumerSettings struct {
	Input       string                `cfg:"input" default:"consumer" validate:"required"`
	RunnerCount int                   `cfg:"runner_count" default:"1" validate:"min=1"`
	Encoding    EncodingType          `cfg:"encoding" default:"application/json"`
	IdleTimeout time.Duration         `cfg:"idle_timeout" default:"10s"`
	Retry       ConsumerRetrySettings `cfg:"retry"`
}

func ReadConsumerSettings added in v0.28.3

func ReadConsumerSettings(config cfg.Config, name string) ConsumerSettings

type EncodeHandler

type EncodeHandler interface {
	Encode(ctx context.Context, data interface{}, attributes map[string]string) (context.Context, map[string]string, error)
	Decode(ctx context.Context, data interface{}, attributes map[string]string) (context.Context, map[string]string, error)
}

type EncodingType

type EncodingType string
const (
	EncodingJson     EncodingType = "application/json"
	EncodingProtobuf EncodingType = "application/x-protobuf"
)

func GetEncodingAttribute

func GetEncodingAttribute(attributes map[string]string) *EncodingType

GetEncodingAttribute returns the encoding attribute if one is set, nil if none is set, and an error if the set value is of the wrong type.

func (EncodingType) String

func (s EncodingType) String() string

type FileOutputMode

type FileOutputMode string
const (
	FileOutputModeAppend   FileOutputMode = "append"
	FileOutputModeSingle   FileOutputMode = "single"
	FileOutputModeTruncate FileOutputMode = "truncate"
)

type FileOutputSettings

type FileOutputSettings struct {
	Filename string         `cfg:"filename"`
	Mode     FileOutputMode `cfg:"mode"     default:"append"`
}

type FileSettings

type FileSettings struct {
	Filename string `cfg:"filename"`
	Blocking bool   `cfg:"blocking"`
}

type InMemoryInput

type InMemoryInput struct {
	// contains filtered or unexported fields
}

func NewInMemoryInput

func NewInMemoryInput(settings *InMemorySettings) *InMemoryInput

func ProvideInMemoryInput

func ProvideInMemoryInput(name string, settings *InMemorySettings) *InMemoryInput

func (*InMemoryInput) Data

func (i *InMemoryInput) Data() <-chan *Message

func (*InMemoryInput) Publish

func (i *InMemoryInput) Publish(messages ...*Message)

func (*InMemoryInput) Reset

func (i *InMemoryInput) Reset()

func (*InMemoryInput) Run

func (i *InMemoryInput) Run(ctx context.Context) error

func (*InMemoryInput) Stop

func (i *InMemoryInput) Stop()

type InMemoryOutput

type InMemoryOutput struct {
	// contains filtered or unexported fields
}

func NewInMemoryOutput

func NewInMemoryOutput() *InMemoryOutput

func ProvideInMemoryOutput

func ProvideInMemoryOutput(name string) *InMemoryOutput

func (*InMemoryOutput) Clear

func (o *InMemoryOutput) Clear()

func (*InMemoryOutput) ContainsBody

func (o *InMemoryOutput) ContainsBody(body string) bool

func (*InMemoryOutput) Get

func (o *InMemoryOutput) Get(i int) (*Message, bool)

func (*InMemoryOutput) Len

func (o *InMemoryOutput) Len() int

func (*InMemoryOutput) Size

func (o *InMemoryOutput) Size() int

func (*InMemoryOutput) Write

func (o *InMemoryOutput) Write(_ context.Context, batch []WritableMessage) error

func (*InMemoryOutput) WriteOne

func (o *InMemoryOutput) WriteOne(ctx context.Context, msg WritableMessage) error

type InMemoryOutputConfiguration

type InMemoryOutputConfiguration struct {
	BaseOutputConfiguration
	Type string `cfg:"type" default:"inMemory"`
}

type InMemorySettings

type InMemorySettings struct {
	Size int `cfg:"size" default:"1"`
}

type Input

type Input interface {
	// Run provides a steady stream of messages, returned via Data. Run does not return until Stop is called and thus
	// should be called in its own go routine. The only exception to this is if we either fail to produce messages and
	// return an error or if the input is depleted (like an InMemoryInput).
	//
	// Run should only be called once, not all inputs can be resumed.
	Run(ctx context.Context) error
	// Stop causes Run to return as fast as possible. Calling Stop is preferable to canceling the context passed to Run
	// as it allows Run to shut down cleaner (and might take a bit longer, e.g., to finish processing the current batch
	// of messages).
	Stop()
	// Data returns a channel containing the messages produced by this input.
	Data() <-chan *Message
}

An Input provides you with a steady stream of messages until you Stop it.

func NewConfigurableInput

func NewConfigurableInput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, error)

func NewFileInput

func NewFileInput(_ cfg.Config, logger log.Logger, settings FileSettings) Input

func NewFileInputWithInterfaces

func NewFileInputWithInterfaces(logger log.Logger, settings FileSettings) Input

func NewKinesisInput

func NewKinesisInput(ctx context.Context, config cfg.Config, logger log.Logger, settings kinesis.Settings) (Input, error)

func NewNoopInput added in v0.14.0

func NewNoopInput() Input

func NewRedisListInput

func NewRedisListInput(ctx context.Context, config cfg.Config, logger log.Logger, settings *RedisListInputSettings) (Input, error)

func NewRedisListInputWithInterfaces

func NewRedisListInputWithInterfaces(logger log.Logger, client redis.Client, mw metric.Writer, settings *RedisListInputSettings) Input

func ProvideConfigurableInput

func ProvideConfigurableInput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, error)

type InputFactory

type InputFactory func(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, error)

type KafkaInput

type KafkaInput struct {
	// contains filtered or unexported fields
}

func NewKafkaInput

func NewKafkaInput(ctx context.Context, config cfg.Config, logger log.Logger, key string) (*KafkaInput, error)

func NewKafkaInputWithInterfaces

func NewKafkaInputWithInterfaces(consumer *kafkaConsumer.Consumer) (*KafkaInput, error)

func (*KafkaInput) Ack

func (i *KafkaInput) Ack(ctx context.Context, msg *Message, _ bool) error

Ack acknowledges a message. If possible, prefer calling Ack with a batch as it is more efficient.

func (*KafkaInput) AckBatch

func (i *KafkaInput) AckBatch(ctx context.Context, msgs []*Message, _ []bool) error

AckBatch does the same as calling Ack for every single message would, but it might use fewer calls to an external service.

func (*KafkaInput) Data

func (i *KafkaInput) Data() <-chan *Message

Data returns a channel containing the messages produced by this input.

func (*KafkaInput) Run

func (i *KafkaInput) Run(ctx context.Context) error

Run provides a steady stream of messages, returned via Data. Run does not return until Stop is called and thus should be called in its own go routine. The only exception to this is if we either fail to produce messages and return an error or if the input is depleted (like an InMemoryInput).

Run should only be called once, not all inputs can be resumed.

func (*KafkaInput) Stop

func (i *KafkaInput) Stop()

Stop causes Run to return as fast as possible. Calling Stop is preferable to canceling the context passed to Run as it allows Run to shut down cleaner (and might take a bit longer, e.g., to finish processing the current batch of messages).

type KafkaOutput

type KafkaOutput struct {
	// contains filtered or unexported fields
}

func NewKafkaOutput

func NewKafkaOutput(ctx context.Context, config cfg.Config, logger log.Logger, key string) (*KafkaOutput, error)

func NewKafkaOutputWithInterfaces

func NewKafkaOutputWithInterfaces(ctx context.Context, producer *kafkaProducer.Producer) (*KafkaOutput, error)

func (*KafkaOutput) Write

func (o *KafkaOutput) Write(ctx context.Context, ms []WritableMessage) error

func (*KafkaOutput) WriteOne

func (o *KafkaOutput) WriteOne(ctx context.Context, m WritableMessage) error

type KafkaSourceMessage

type KafkaSourceMessage struct {
	kafka.Message
}

func (KafkaSourceMessage) MarshalJSON

func (k KafkaSourceMessage) MarshalJSON() ([]byte, error)

type KinesisInputConfiguration

type KinesisInputConfiguration struct {
	kinesis.Settings
	Type string `cfg:"type" default:"kinesis"`
}

type KinesisOutputConfiguration

type KinesisOutputConfiguration struct {
	BaseOutputConfiguration
	Type        string `cfg:"type" default:"kinesis"`
	Project     string `cfg:"project"`
	Family      string `cfg:"family"`
	Group       string `cfg:"group"`
	Application string `cfg:"application"`
	ClientName  string `cfg:"client_name" default:"default"`
	StreamName  string `cfg:"stream_name"`
}

type KinesisOutputSettings

type KinesisOutputSettings struct {
	cfg.AppId
	ClientName string
	StreamName string
}

func (KinesisOutputSettings) GetAppId

func (s KinesisOutputSettings) GetAppId() cfg.AppId

func (KinesisOutputSettings) GetClientName

func (s KinesisOutputSettings) GetClientName() string

func (KinesisOutputSettings) GetStreamName

func (s KinesisOutputSettings) GetStreamName() string

type Message

type Message struct {
	Attributes map[string]string `json:"attributes"`
	Body       string            `json:"body"`
	// contains filtered or unexported fields
}

func BuildAggregateMessage

func BuildAggregateMessage(aggregateBody string, attributes ...map[string]string) *Message

func KafkaToGosoMessage

func KafkaToGosoMessage(k kafka.Message) *Message

func MarshalJsonMessage

func MarshalJsonMessage(body interface{}, attributes ...map[string]string) (*Message, error)

func MarshalProtobufMessage

func MarshalProtobufMessage(body ProtobufEncodable, attributes ...map[string]string) (*Message, error)

func MessageUnmarshaller

func MessageUnmarshaller(data *string) (*Message, error)

func NewJsonMessage

func NewJsonMessage(body string, attributes ...map[string]string) *Message

func NewMessage

func NewMessage(body string, attributes ...map[string]string) *Message

func NewProtobufMessage

func NewProtobufMessage(body string, attributes ...map[string]string) *Message

func RawUnmarshaller

func RawUnmarshaller(data *string) (*Message, error)

func SnsUnmarshaller

func SnsUnmarshaller(data *string) (*Message, error)

func (*Message) GetAttributes

func (m *Message) GetAttributes() map[string]string

func (*Message) MarshalToBytes

func (m *Message) MarshalToBytes() ([]byte, error)

func (*Message) MarshalToString

func (m *Message) MarshalToString() (string, error)

func (*Message) UnmarshalFromBytes

func (m *Message) UnmarshalFromBytes(data []byte) error

func (*Message) UnmarshalFromString

func (m *Message) UnmarshalFromString(data string) error

type MessageBodyCompressor

type MessageBodyCompressor interface {
	Compress(body []byte) ([]byte, error)
	Decompress(body []byte) ([]byte, error)
}

type MessageBodyEncoder

type MessageBodyEncoder interface {
	Encode(data interface{}) ([]byte, error)
	Decode(data []byte, out interface{}) error
}

func NewJsonEncoder

func NewJsonEncoder() MessageBodyEncoder

func NewProtobufEncoder

func NewProtobufEncoder() MessageBodyEncoder

type MessageEncoder

type MessageEncoder interface {
	Encode(ctx context.Context, data interface{}, attributeSets ...map[string]string) (*Message, error)
	Decode(ctx context.Context, msg *Message, out interface{}) (context.Context, map[string]string, error)
}

type MessageEncoderSettings

type MessageEncoderSettings struct {
	Encoding       EncodingType
	Compression    CompressionType
	EncodeHandlers []EncodeHandler
}

type ModelMsg

type ModelMsg struct {
	CrudType string
	Version  int
	ModelId  string
	Body     string
}

func CreateModelMsg

func CreateModelMsg(raw *Message) (*ModelMsg, error)

type NoOpOutput

type NoOpOutput struct{}

func (*NoOpOutput) Write

func (o *NoOpOutput) Write(_ context.Context, _ []WritableMessage) error

func (*NoOpOutput) WriteOne

func (o *NoOpOutput) WriteOne(_ context.Context, _ WritableMessage) error

type Output

type Output interface {
	WriteOne(ctx context.Context, msg WritableMessage) error
	Write(ctx context.Context, batch []WritableMessage) error
}

func NewConfigurableMultiOutput

func NewConfigurableMultiOutput(ctx context.Context, config cfg.Config, logger log.Logger, base string) (Output, error)

func NewConfigurableOutput

func NewConfigurableOutput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Output, error)

func NewFileOutput

func NewFileOutput(_ cfg.Config, logger log.Logger, settings *FileOutputSettings) Output

func NewKinesisOutput

func NewKinesisOutput(ctx context.Context, config cfg.Config, logger log.Logger, settings *KinesisOutputSettings) (Output, error)

func NewKinesisOutputWithInterfaces

func NewKinesisOutputWithInterfaces(recordWriter gosoKinesis.RecordWriter) Output

func NewRedisListOutput

func NewRedisListOutput(ctx context.Context, config cfg.Config, logger log.Logger, settings *RedisListOutputSettings) (Output, error)

func NewRedisListOutputWithInterfaces

func NewRedisListOutputWithInterfaces(logger log.Logger, mw metric.Writer, client redis.Client, settings *RedisListOutputSettings) Output

func NewSnsOutput

func NewSnsOutput(ctx context.Context, config cfg.Config, logger log.Logger, settings *SnsOutputSettings) (Output, error)

func NewSnsOutputWithInterfaces

func NewSnsOutputWithInterfaces(logger log.Logger, topic sns.Topic) Output

func NewSqsOutput

func NewSqsOutput(ctx context.Context, config cfg.Config, logger log.Logger, settings *SqsOutputSettings) (Output, error)

func NewSqsOutputWithInterfaces

func NewSqsOutputWithInterfaces(logger log.Logger, queue sqs.Queue, settings *SqsOutputSettings) Output

type OutputChannel

type OutputChannel interface {
	Read() ([]WritableMessage, bool)
	Write(msg []WritableMessage)
	Close()
	IsClosed() bool
}

func NewOutputChannel

func NewOutputChannel(logger log.Logger, bufferSize int) OutputChannel

type OutputFactory

type OutputFactory func(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Output, error)

type PartitionedOutput

type PartitionedOutput interface {
	Output
	// IsPartitionedOutput returns true if the output is writing to more than one shard/partition/bucket, and we need to
	// take care about writing messages to the correct partition.
	IsPartitionedOutput() bool
}

type PartitionerRand

type PartitionerRand interface {
	Intn(n int) int
}

type Producer

type Producer interface {
	WriteOne(ctx context.Context, model interface{}, attributeSets ...map[string]string) error
	Write(ctx context.Context, models interface{}, attributeSets ...map[string]string) error
}

func NewMultiProducer

func NewMultiProducer(producers ...Producer) Producer

type ProducerDaemonAggregator

type ProducerDaemonAggregator interface {
	Write(ctx context.Context, msg *Message) ([]AggregateFlush, error)
	Flush() ([]AggregateFlush, error)
}

func NewProducerDaemonAggregator

func NewProducerDaemonAggregator(settings ProducerDaemonSettings, compression CompressionType, attributeSets ...map[string]string) (ProducerDaemonAggregator, error)

func NewProducerDaemonPartitionedAggregator

func NewProducerDaemonPartitionedAggregator(logger log.Logger, settings ProducerDaemonSettings, compression CompressionType) (ProducerDaemonAggregator, error)

func NewProducerDaemonPartitionedAggregatorWithInterfaces

func NewProducerDaemonPartitionedAggregatorWithInterfaces(logger log.Logger, rand PartitionerRand, aggregators int, createAggregator func(attributes map[string]string) (ProducerDaemonAggregator, error)) (ProducerDaemonAggregator, error)

type ProducerDaemonBatcher

type ProducerDaemonBatcher interface {
	Append(msg *Message) ([]WritableMessage, error)
	Flush() []WritableMessage
}

func NewProducerDaemonBatcher

func NewProducerDaemonBatcher(settings ProducerDaemonSettings) ProducerDaemonBatcher

type ProducerDaemonSettings

type ProducerDaemonSettings struct {
	Enabled bool `cfg:"enabled" default:"false"`

	// Amount of time spend waiting for messages before sending out a batch.
	Interval time.Duration `cfg:"interval" default:"1m"`

	// Size of the buffer channel, i.e., how many messages can be in-flight at once? Generally it is a good idea to match
	// this with the number of runners.
	BufferSize int `cfg:"buffer_size" default:"10" validate:"min=1"`

	// Number of daemons running in the background, writing complete batches to the output.
	RunnerCount int `cfg:"runner_count" default:"10" validate:"min=1"`

	// How many SQS messages do we submit in a single batch? SQS can accept up to 10 messages per batch.
	// SNS doesn't support batching, so the value doesn't matter for SNS.
	BatchSize int `cfg:"batch_size" default:"10" validate:"min=1"`

	// How large may the sum of all messages in the aggregation be? For SQS you can't send more than 256 KB in one batch,
	// for SNS a single message can't be larger than 256 KB. We use 252 KB as default to leave some room for request
	// encoding and overhead.
	BatchMaxSize int `cfg:"batch_max_size" default:"258048" validate:"min=0"`

	// How many stream.Messages do we pack together in a single batch (one message in SQS) at once?
	AggregationSize int `cfg:"aggregation_size" default:"1" validate:"min=1"`

	// Maximum size in bytes of a batch. Defaults to 64 KB to leave some room for encoding overhead.
	// Set to 0 to disable limiting the maximum size for a batch (it will still not put more than BatchSize messages
	// in a batch).
	//
	// Note: Gosoline can't ensure your messages stay below this size if your messages are quite large (especially when
	// using compression). Imagine you already aggregated 40kb of compressed messages (around 53kb when base64 encoded)
	// and are now writing a message that compresses to 20 kb. Now your buffer reaches 60 kb and 80 kb base64 encoded.
	// Gosoline will not already output a 53 kb message if you requested 64 kb messages (it would accept a 56 kb message),
	// but after writing the next message
	AggregationMaxSize int `cfg:"aggregation_max_size" default:"65536" validate:"min=0"`

	// If you are writing to an output using a partition key, we ensure messages are still distributed to a partition
	// according to their partition key (although not necessary the same partition as without the producer daemon).
	// For this, we split the messages into buckets while collecting them, thus potentially aggregating more messages in
	// memory (depending on the number of buckets you configure).
	//
	// Note: This still does not guarantee that your messages are perfectly ordered - this is impossible as soon as you
	// have more than once producer. However, messages with the same partition key will end up in the same shard, so if
	// you are reading two different shards and one is much further behind than the other, you will not see messages
	// *massively* out of order - it should be roughly bounded by the time you buffer messages (the Interval setting) and
	// thus be not much more than a minute (using the default setting) instead of hours (if one shard is half a day behind
	// while the other is up-to-date).
	//
	// Second note: If you change the amount of partitions, messages might move between buckets and thus end up in different
	// shards than before. Thus, only do this if you can handle it (e.g., because no shard is currently lagging behind).
	PartitionBucketCount int `cfg:"partition_bucket_count" default:"128" validate:"min=1"`

	// Additional attributes we append to each message
	MessageAttributes map[string]string `cfg:"message_attributes"`
}

type ProducerMetadata

type ProducerMetadata struct {
	Name          string `json:"name"`
	DaemonEnabled bool   `json:"daemon_enabled"`
}

type ProducerSettings

type ProducerSettings struct {
	Output      string                 `cfg:"output"`
	Encoding    EncodingType           `cfg:"encoding"`
	Compression CompressionType        `cfg:"compression" default:"none"`
	Daemon      ProducerDaemonSettings `cfg:"daemon"`
}

type ProtobufEncodable

type ProtobufEncodable interface {
	ToMessage() (proto.Message, error)
	EmptyMessage() proto.Message
	FromMessage(message proto.Message) error
}

type RawMessage

type RawMessage struct {
	Body    interface{}
	Encoder MessageBodyEncoder
}

func NewRawJsonMessage

func NewRawJsonMessage(body interface{}) *RawMessage

NewRawJsonMessage works like NewRawMessage with the encoder set to marshal the body as JSON.

func NewRawMessage

func NewRawMessage(body interface{}, encoder MessageBodyEncoder) *RawMessage

NewRawMessage creates a new RawMessage. It uses the provided encoder to encode the message body.

func (*RawMessage) MarshalToBytes

func (m *RawMessage) MarshalToBytes() ([]byte, error)

func (*RawMessage) MarshalToString

func (m *RawMessage) MarshalToString() (string, error)

type RedisListInputSettings

type RedisListInputSettings struct {
	cfg.AppId
	ServerName string
	Key        string
	WaitTime   time.Duration
}

type RedisListOutputSettings

type RedisListOutputSettings struct {
	cfg.AppId
	ServerName string
	Key        string
	BatchSize  int
}

type RetryHandler

type RetryHandler interface {
	Put(ctx context.Context, msg *Message) error
}

func NewManualSqsRetryHandler added in v0.14.0

func NewManualSqsRetryHandler(logger log.Logger, queue sqs.Queue, settings *SqsOutputSettings) RetryHandler

func NewManualSqsRetryHandlerFromInterfaces added in v0.14.0

func NewManualSqsRetryHandlerFromInterfaces(output Output) RetryHandler

type RetryHandlerFactory

type RetryHandlerFactory func(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, RetryHandler, error)

type RetryHandlerNoop

type RetryHandlerNoop struct{}

func NewRetryHandlerNoopWithInterfaces

func NewRetryHandlerNoopWithInterfaces() RetryHandlerNoop

func (RetryHandlerNoop) Put

type RetryHandlerSettings

type RetryHandlerSettings struct {
	After       time.Duration `cfg:"after" default:"1m"`
	MaxAttempts int           `cfg:"max_attempts" default:"3"`
}

type RetryHandlerSqs

type RetryHandlerSqs struct {
	// contains filtered or unexported fields
}

func NewRetryHandlerSqsWithInterfaces

func NewRetryHandlerSqsWithInterfaces(output Output, settings *RetryHandlerSqsSettings) *RetryHandlerSqs

func (*RetryHandlerSqs) Put

func (r *RetryHandlerSqs) Put(ctx context.Context, msg *Message) error

type RetryHandlerSqsSettings

type RetryHandlerSqsSettings struct {
	cfg.AppId
	RetryHandlerSettings
	ClientName          string `cfg:"client_name" default:"default"`
	MaxNumberOfMessages int32  `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
	WaitTime            int32  `cfg:"wait_time" default:"10"`
	RunnerCount         int    `cfg:"runner_count" default:"1"`
	QueueId             string `cfg:"queue_id"`
}

type RetryingInput added in v0.14.0

type RetryingInput interface {
	GetRetryHandler() (Input, RetryHandler)
}

type RunnableBatchConsumerCallback

type RunnableBatchConsumerCallback interface {
	BatchConsumerCallback
	RunnableCallback
}

type RunnableCallback

type RunnableCallback interface {
	Run(ctx context.Context) error
}

type RunnableConsumerCallback

type RunnableConsumerCallback interface {
	ConsumerCallback
	RunnableCallback
}

type SizeRestrictedOutput

type SizeRestrictedOutput interface {
	Output
	// GetMaxMessageSize returns the maximum size of a message for this output (or nil if there is no limit on message size).
	GetMaxMessageSize() *int
	// GetMaxBatchSize returns the maximum number of messages we can write at once to the output (or nil if there is no limit).
	GetMaxBatchSize() *int
}

type SnsInputConfiguration

type SnsInputConfiguration struct {
	Type                string                        `cfg:"type" default:"sns"`
	ConsumerId          string                        `cfg:"id" validate:"required"`
	Family              string                        `cfg:"family" default:""`
	Group               string                        `cfg:"group" default:""`
	Application         string                        `cfg:"application" default:""`
	Targets             []SnsInputTargetConfiguration `cfg:"targets" validate:"min=1"`
	MaxNumberOfMessages int32                         `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
	WaitTime            int32                         `cfg:"wait_time" default:"3" validate:"min=1"`
	VisibilityTimeout   int                           `cfg:"visibility_timeout" default:"30" validate:"min=1"`
	RunnerCount         int                           `cfg:"runner_count" default:"1" validate:"min=1"`
	RedrivePolicy       sqs.RedrivePolicy             `cfg:"redrive_policy"`
	ClientName          string                        `cfg:"client_name" default:"default"`
}

type SnsInputSettings

type SnsInputSettings struct {
	cfg.AppId
	QueueId             string            `cfg:"queue_id"`
	MaxNumberOfMessages int32             `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
	WaitTime            int32             `cfg:"wait_time"`
	RedrivePolicy       sqs.RedrivePolicy `cfg:"redrive_policy"`
	VisibilityTimeout   int               `cfg:"visibility_timeout"`
	RunnerCount         int               `cfg:"runner_count"`
	ClientName          string            `cfg:"client_name"`
}

func (SnsInputSettings) GetAppId

func (s SnsInputSettings) GetAppId() cfg.AppId

func (SnsInputSettings) GetClientName

func (s SnsInputSettings) GetClientName() string

func (SnsInputSettings) GetQueueId

func (s SnsInputSettings) GetQueueId() string

func (SnsInputSettings) IsFifoEnabled

func (s SnsInputSettings) IsFifoEnabled() bool

type SnsInputTarget

type SnsInputTarget struct {
	cfg.AppId
	TopicId    string
	Attributes map[string]string
	ClientName string
}

func (SnsInputTarget) GetAppId

func (t SnsInputTarget) GetAppId() cfg.AppId

func (SnsInputTarget) GetClientName

func (t SnsInputTarget) GetClientName() string

func (SnsInputTarget) GetTopicId

func (t SnsInputTarget) GetTopicId() string

type SnsInputTargetConfiguration

type SnsInputTargetConfiguration struct {
	Family      string            `cfg:"family"`
	Group       string            `cfg:"group" validate:"required"`
	Application string            `cfg:"application" validate:"required"`
	TopicId     string            `cfg:"topic_id" validate:"required"`
	Attributes  map[string]string `cfg:"attributes"`
	ClientName  string            `cfg:"client_name" default:"default"`
}

type SnsOutputConfiguration

type SnsOutputConfiguration struct {
	BaseOutputConfiguration
	Type        string `cfg:"type" default:"sns"`
	Project     string `cfg:"project"`
	Family      string `cfg:"family"`
	Group       string `cfg:"group"`
	Application string `cfg:"application"`
	TopicId     string `cfg:"topic_id" validate:"required"`
	ClientName  string `cfg:"client_name" default:"default"`
}

type SnsOutputSettings

type SnsOutputSettings struct {
	cfg.AppId
	TopicId    string
	ClientName string
}

func (SnsOutputSettings) GetAppId

func (s SnsOutputSettings) GetAppId() cfg.AppId

func (SnsOutputSettings) GetClientName

func (s SnsOutputSettings) GetClientName() string

func (SnsOutputSettings) GetTopicId

func (s SnsOutputSettings) GetTopicId() string

type SqsInputSettings

type SqsInputSettings struct {
	cfg.AppId
	QueueId             string            `cfg:"queue_id"`
	MaxNumberOfMessages int32             `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"`
	WaitTime            int32             `cfg:"wait_time"`
	VisibilityTimeout   int               `cfg:"visibility_timeout"`
	RunnerCount         int               `cfg:"runner_count"`
	Fifo                sqs.FifoSettings  `cfg:"fifo"`
	RedrivePolicy       sqs.RedrivePolicy `cfg:"redrive_policy"`
	ClientName          string            `cfg:"client_name"`
	Unmarshaller        string            `cfg:"unmarshaller" default:"msg"`
}

func (SqsInputSettings) GetAppId

func (s SqsInputSettings) GetAppId() cfg.AppId

func (SqsInputSettings) GetClientName

func (s SqsInputSettings) GetClientName() string

func (SqsInputSettings) GetQueueId

func (s SqsInputSettings) GetQueueId() string

func (SqsInputSettings) IsFifoEnabled

func (s SqsInputSettings) IsFifoEnabled() bool

type SqsOutputConfiguration

type SqsOutputConfiguration struct {
	BaseOutputConfiguration
	Type              string            `cfg:"type" default:"sqs"`
	Project           string            `cfg:"project"`
	Family            string            `cfg:"family"`
	Group             string            `cfg:"group"`
	Application       string            `cfg:"application"`
	QueueId           string            `cfg:"queue_id" validate:"required"`
	VisibilityTimeout int               `cfg:"visibility_timeout" default:"30" validate:"gt=0"`
	RedrivePolicy     sqs.RedrivePolicy `cfg:"redrive_policy"`
	Fifo              sqs.FifoSettings  `cfg:"fifo"`
	ClientName        string            `cfg:"client_name" default:"default"`
}

type SqsOutputSettings

type SqsOutputSettings struct {
	cfg.AppId
	ClientName        string
	Fifo              sqs.FifoSettings
	QueueId           string
	RedrivePolicy     sqs.RedrivePolicy
	VisibilityTimeout int
}

func (SqsOutputSettings) GetAppId

func (s SqsOutputSettings) GetAppId() cfg.AppId

func (SqsOutputSettings) GetClientName

func (s SqsOutputSettings) GetClientName() string

func (SqsOutputSettings) GetQueueId

func (s SqsOutputSettings) GetQueueId() string

func (SqsOutputSettings) IsFifoEnabled

func (s SqsOutputSettings) IsFifoEnabled() bool

type UnmarshallerFunc

type UnmarshallerFunc func(data *string) (*Message, error)

type WritableMessage

type WritableMessage interface {
	MarshalToBytes() ([]byte, error)
	MarshalToString() (string, error)
}

func MessagesToWritableMessages

func MessagesToWritableMessages(batch []*Message) []WritableMessage

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL