Documentation ¶
Index ¶
- Constants
- func AddDefaultEncodeHandler(handler EncodeHandler)
- func AddMessageBodyEncoder(encoding EncodingType, encoder MessageBodyEncoder)
- func ByteChunkToInterfaces(chunk Chunk) []interface{}
- func ByteChunkToStrings(chunk Chunk) []string
- func CompressMessage(compression CompressionType, body []byte) ([]byte, error)
- func ConfigurableConsumerKey(name string) string
- func ConfigurableConsumerRetryKey(name string) string
- func ConfigurableInputKey(name string) string
- func ConfigurableOutputKey(name string) string
- func ConfigurableProducerKey(name string) string
- func ConsumerFactory(callbacks ConsumerCallbackMap) (map[string]kernel.ModuleFactory, error)
- func DecodeMessage(encoding EncodingType, data []byte, out interface{}) error
- func DecompressMessage(compression CompressionType, body []byte) ([]byte, error)
- func EncodeMessage(encoding EncodingType, data interface{}) ([]byte, error)
- func GosoToKafkaMessage(msg *Message) kafka.Message
- func GosoToKafkaMessages(msgs ...*Message) []kafka.Message
- func KafkaToGosoAttributes(headers []kafka.Header, attributes map[string]interface{}) map[string]interface{}
- func MessagesPerRunnerMetricWriterFactory(_ context.Context, config cfg.Config, _ log.Logger) (map[string]kernel.ModuleFactory, error)
- func NewBaseConsumer(ctx context.Context, config cfg.Config, logger log.Logger, name string, ...) (*baseConsumer, error)
- func NewBaseConsumerWithInterfaces(uuidGen uuid.Uuid, logger log.Logger, metricWriter metric.Writer, ...) *baseConsumer
- func NewBatchConsumer(name string, callbackFactory BatchConsumerCallbackFactory) ...
- func NewConsumer(name string, callbackFactory ConsumerCallbackFactory) ...
- func NewConsumerFactory(callbacks ConsumerCallbackMap) kernel.ModuleMultiFactory
- func NewKafkaMessage(writable WritableMessage) kafka.Message
- func NewKafkaMessageAttrs(key string) map[string]interface{}
- func NewKafkaMessages(ms []WritableMessage) []kafka.Message
- func NewKinesisMessageHandler(channel chan *Message) kinesis.MessageHandler
- func NewMessageEncoder(config *MessageEncoderSettings) *messageEncoder
- func NewMessagesPerRunnerMetricWriter(settings *MessagesPerRunnerMetricSettings) kernel.ModuleFactory
- func NewOutputTracer(config cfg.Config, logger log.Logger, base Output, name string) (*outputTracer, error)
- func NewOutputTracerWithInterfaces(tracer tracing.Tracer, base Output, name string) *outputTracer
- func NewProducer(ctx context.Context, config cfg.Config, logger log.Logger, name string, ...) (*producer, error)
- func NewProducerDaemon(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*producerDaemon, error)
- func NewProducerDaemonWithInterfaces(logger log.Logger, metric metric.Writer, aggregator ProducerDaemonAggregator, ...) *producerDaemon
- func NewProducerWithInterfaces(encoder MessageEncoder, output Output) *producer
- func NewSnsInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (*snsInput, error)
- func NewSnsInputWithInterfaces(sqsInput *sqsInput) *snsInput
- func NewSqsInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (*sqsInput, error)
- func NewSqsInputWithInterfaces(logger log.Logger, queue sqs.Queue, unmarshaller UnmarshallerFunc, ...) *sqsInput
- func ProducerDaemonFactory(ctx context.Context, config cfg.Config, logger log.Logger) (map[string]kernel.ModuleFactory, error)
- func ProvideProducerDaemon(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*producerDaemon, error)
- func ResetInMemoryInputs()
- func ResetInMemoryOutputs()
- func ResetProducerDaemons()
- func SetInputFactory(typ string, factory InputFactory)
- func SnsMarshaller(msg *Message) (*string, error)
- func WithDefaultMessageBodyEncoding(encoding EncodingType)
- type AcknowledgeableInput
- type AggregateFlush
- type BaseConsumerCallback
- type BaseOutputConfiguration
- type BaseOutputConfigurationAware
- type BaseOutputConfigurationTracing
- type BatchConsumer
- type BatchConsumerCallback
- type BatchConsumerCallbackFactory
- type BatchConsumerSettings
- type Chunk
- type Chunks
- type CompressionType
- type Consumer
- type ConsumerAcknowledge
- type ConsumerCallback
- type ConsumerCallbackFactory
- type ConsumerCallbackMap
- type ConsumerMetadata
- type ConsumerRetrySettings
- type ConsumerSettings
- type EncodeHandler
- type EncodingType
- type FileOutputMode
- type FileOutputSettings
- type FileSettings
- type InMemoryInput
- type InMemoryOutput
- func (o *InMemoryOutput) Clear()
- func (o *InMemoryOutput) ContainsBody(body string) bool
- func (o *InMemoryOutput) Get(i int) (*Message, bool)
- func (o *InMemoryOutput) Len() int
- func (o *InMemoryOutput) Size() int
- func (o *InMemoryOutput) Write(_ context.Context, batch []WritableMessage) error
- func (o *InMemoryOutput) WriteOne(ctx context.Context, msg WritableMessage) error
- type InMemoryOutputConfiguration
- type InMemorySettings
- type Input
- func NewConfigurableInput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, error)
- func NewFileInput(_ cfg.Config, logger log.Logger, settings FileSettings) Input
- func NewFileInputWithInterfaces(logger log.Logger, settings FileSettings) Input
- func NewKinesisInput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Input, error)
- func NewRedisListInput(config cfg.Config, logger log.Logger, settings *RedisListInputSettings) (Input, error)
- func NewRedisListInputWithInterfaces(logger log.Logger, client redis.Client, mw metric.Writer, ...) Input
- func ProvideConfigurableInput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Input, error)
- type InputFactory
- type KafkaInput
- type KafkaOutput
- type KafkaSourceMessage
- type KinesisInputConfiguration
- type KinesisOutputConfiguration
- type KinesisOutputSettings
- type Message
- func BuildAggregateMessage(aggregateBody string, attributes ...map[string]interface{}) *Message
- func KafkaToGosoMessage(k kafka.Message) *Message
- func MarshalJsonMessage(body interface{}, attributes ...map[string]interface{}) (*Message, error)
- func MarshalProtobufMessage(body ProtobufEncodable, attributes ...map[string]interface{}) (*Message, error)
- func MessageUnmarshaller(data *string) (*Message, error)
- func NewJsonMessage(body string, attributes ...map[string]interface{}) *Message
- func NewMessage(body string, attributes ...map[string]interface{}) *Message
- func NewProtobufMessage(body string, attributes ...map[string]interface{}) *Message
- func RawUnmarshaller(data *string) (*Message, error)
- func SnsUnmarshaller(data *string) (*Message, error)
- type MessageBodyCompressor
- type MessageBodyEncoder
- type MessageEncoder
- type MessageEncoderSettings
- type MessagesPerRunnerCwNamingSettings
- type MessagesPerRunnerCwServiceNamingSettings
- type MessagesPerRunnerDdbNamingSettings
- type MessagesPerRunnerDdbServiceNamingSettings
- type MessagesPerRunnerEcsSettings
- type MessagesPerRunnerMetricSettings
- type MessagesPerRunnerMetricWriter
- type MessagesPerRunnerMetricWriterSettings
- type ModelMsg
- type NoOpOutput
- type Output
- func NewConfigurableMultiOutput(ctx context.Context, config cfg.Config, logger log.Logger, base string) (Output, error)
- func NewConfigurableOutput(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Output, error)
- func NewFileOutput(_ cfg.Config, logger log.Logger, settings *FileOutputSettings) Output
- func NewKinesisOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewKinesisOutputWithInterfaces(recordWriter gosoKinesis.RecordWriter) Output
- func NewRedisListOutput(config cfg.Config, logger log.Logger, settings *RedisListOutputSettings) (Output, error)
- func NewRedisListOutputWithInterfaces(logger log.Logger, mw metric.Writer, client redis.Client, ...) Output
- func NewSnsOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewSnsOutputWithInterfaces(logger log.Logger, topic sns.Topic) Output
- func NewSqsOutput(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Output, error)
- func NewSqsOutputWithInterfaces(logger log.Logger, queue sqs.Queue, settings *SqsOutputSettings) Output
- type OutputChannel
- type OutputFactory
- type PartitionedOutput
- type PartitionerRand
- type Producer
- type ProducerDaemonAggregator
- func NewProducerDaemonAggregator(settings ProducerDaemonSettings, compression CompressionType, ...) (ProducerDaemonAggregator, error)
- func NewProducerDaemonPartitionedAggregator(logger log.Logger, settings ProducerDaemonSettings, ...) (ProducerDaemonAggregator, error)
- func NewProducerDaemonPartitionedAggregatorWithInterfaces(logger log.Logger, rand PartitionerRand, aggregators int, ...) (ProducerDaemonAggregator, error)
- type ProducerDaemonBatcher
- type ProducerDaemonSettings
- type ProducerMetadata
- type ProducerSettings
- type ProtobufEncodable
- type RawMessage
- type RedisListInputSettings
- type RedisListOutputSettings
- type RetryHandler
- func NewRetryHandler(ctx context.Context, config cfg.Config, logger log.Logger, ...) (RetryHandler, error)
- func NewRetryHandlerNoop(ctx context.Context, config cfg.Config, logger log.Logger, name string) (RetryHandler, error)
- func NewRetryHandlerSqs(ctx context.Context, config cfg.Config, logger log.Logger, name string) (RetryHandler, error)
- type RetryHandlerFactory
- type RetryHandlerNoop
- type RetryHandlerSettings
- type RetryHandlerSqs
- type RetryHandlerSqsSettings
- type RetryableInput
- type RunnableBatchConsumerCallback
- type RunnableCallback
- type RunnableConsumerCallback
- type SizeRestrictedOutput
- type SnsInputConfiguration
- type SnsInputSettings
- type SnsInputTarget
- type SnsInputTargetConfiguration
- type SnsOutputConfiguration
- type SnsOutputSettings
- type SqsInputSettings
- type SqsOutputConfiguration
- type SqsOutputSettings
- type UnmarshallerFunc
- type WritableMessage
Constants ¶
const ( InputTypeFile = "file" InputTypeInMemory = "inMemory" InputTypeKinesis = "kinesis" InputTypeRedis = "redis" InputTypeSns = "sns" InputTypeSqs = "sqs" InputTypeKafka = "kafka" )
const ( AttributeSqsMessageId = "sqsMessageId" AttributeSqsReceiptHandle = "sqsReceiptHandle" )
const ( AttributeEncoding = "encoding" AttributeCompression = "compression" )
const ( AttributeKafkaOriginalMessage = "KafkaOriginal" AttributeKafkaKey = "KafkaKey" )
const ( OutputTypeFile = "file" OutputTypeInMemory = "inMemory" OutputTypeKinesis = "kinesis" OutputTypeMultiple = "multiple" OutputTypeNoOp = "noop" OutputTypeRedis = "redis" OutputTypeSns = "sns" OutputTypeSqs = "sqs" OutputTypeKafka = "kafka" )
const ( AttributeKinesisPartitionKey = "gosoline.kinesis.partitionKey" AttributeKinesisExplicitHashKey = "gosoline.kinesis.explicitHashKey" )
const ( AttributeAggregate = "goso.aggregate" AttributeAggregateCount = "goso.aggregate.count" )
const ( AttributeRetry = "goso.retry" AttributeRetryId = "goso.retry.id" )
const ( UnmarshallerMsg = "msg" UnmarshallerRaw = "raw" UnmarshallerSns = "sns" )
const (
ConfigKeyStream = "stream"
)
const MetadataKeyProducers = "stream.producers"
const SqsOutputBatchSize = 10
Variables ¶
This section is empty.
Functions ¶
func AddDefaultEncodeHandler ¶
func AddDefaultEncodeHandler(handler EncodeHandler)
func AddMessageBodyEncoder ¶
func AddMessageBodyEncoder(encoding EncodingType, encoder MessageBodyEncoder)
func ByteChunkToInterfaces ¶
func ByteChunkToInterfaces(chunk Chunk) []interface{}
func ByteChunkToStrings ¶
func CompressMessage ¶
func CompressMessage(compression CompressionType, body []byte) ([]byte, error)
func ConfigurableConsumerKey ¶
func ConfigurableInputKey ¶
func ConfigurableOutputKey ¶
func ConfigurableProducerKey ¶
func ConsumerFactory ¶
func ConsumerFactory(callbacks ConsumerCallbackMap) (map[string]kernel.ModuleFactory, error)
func DecodeMessage ¶
func DecodeMessage(encoding EncodingType, data []byte, out interface{}) error
func DecompressMessage ¶
func DecompressMessage(compression CompressionType, body []byte) ([]byte, error)
func EncodeMessage ¶
func EncodeMessage(encoding EncodingType, data interface{}) ([]byte, error)
func GosoToKafkaMessage ¶
func GosoToKafkaMessage(msg *Message) kafka.Message
func GosoToKafkaMessages ¶
func GosoToKafkaMessages(msgs ...*Message) []kafka.Message
func KafkaToGosoAttributes ¶
func NewBaseConsumer ¶
func NewBaseConsumerWithInterfaces ¶
func NewBaseConsumerWithInterfaces( uuidGen uuid.Uuid, logger log.Logger, metricWriter metric.Writer, tracer tracing.Tracer, input Input, encoder MessageEncoder, retryHandler RetryHandler, consumerCallback interface{}, settings *ConsumerSettings, name string, appId cfg.AppId, ) *baseConsumer
func NewBatchConsumer ¶
func NewConsumer ¶
func NewConsumerFactory ¶
func NewConsumerFactory(callbacks ConsumerCallbackMap) kernel.ModuleMultiFactory
func NewKafkaMessage ¶
func NewKafkaMessage(writable WritableMessage) kafka.Message
func NewKafkaMessageAttrs ¶
func NewKafkaMessages ¶
func NewKafkaMessages(ms []WritableMessage) []kafka.Message
func NewKinesisMessageHandler ¶
func NewKinesisMessageHandler(channel chan *Message) kinesis.MessageHandler
func NewMessageEncoder ¶
func NewMessageEncoder(config *MessageEncoderSettings) *messageEncoder
func NewMessagesPerRunnerMetricWriter ¶
func NewMessagesPerRunnerMetricWriter(settings *MessagesPerRunnerMetricSettings) kernel.ModuleFactory
func NewOutputTracer ¶
func NewProducer ¶
func NewProducerDaemon ¶
func NewProducerDaemonWithInterfaces ¶
func NewProducerDaemonWithInterfaces(logger log.Logger, metric metric.Writer, aggregator ProducerDaemonAggregator, output Output, clock clock.Clock, name string, settings ProducerDaemonSettings) *producerDaemon
func NewProducerWithInterfaces ¶
func NewProducerWithInterfaces(encoder MessageEncoder, output Output) *producer
func NewSnsInput ¶
func NewSnsInput(ctx context.Context, config cfg.Config, logger log.Logger, settings *SnsInputSettings, targets []SnsInputTarget) (*snsInput, error)
func NewSnsInputWithInterfaces ¶
func NewSnsInputWithInterfaces(sqsInput *sqsInput) *snsInput
func NewSqsInput ¶
func NewSqsInputWithInterfaces ¶
func NewSqsInputWithInterfaces(logger log.Logger, queue sqs.Queue, unmarshaller UnmarshallerFunc, settings *SqsInputSettings) *sqsInput
func ProducerDaemonFactory ¶
func ProvideProducerDaemon ¶
func ResetInMemoryInputs ¶
func ResetInMemoryInputs()
func ResetInMemoryOutputs ¶
func ResetInMemoryOutputs()
func ResetProducerDaemons ¶
func ResetProducerDaemons()
func SetInputFactory ¶
func SetInputFactory(typ string, factory InputFactory)
func SnsMarshaller ¶
func WithDefaultMessageBodyEncoding ¶
func WithDefaultMessageBodyEncoding(encoding EncodingType)
Types ¶
type AcknowledgeableInput ¶
type AcknowledgeableInput interface { Input // Ack acknowledges a single message. If possible, prefer calling AckBatch as it is more efficient. Ack(ctx context.Context, msg *Message, ack bool) error // AckBatch does the same as calling Ack for every single message would, but it might use fewer calls to an external // service. AckBatch(ctx context.Context, msgs []*Message, acks []bool) error }
An AcknowledgeableInput is an Input with the additional ability to mark messages as successfully consumed. For example, an SQS queue would provide a message after its visibility timeout a second time if we didn't acknowledge it.
type AggregateFlush ¶
type BaseConsumerCallback ¶
type BaseConsumerCallback interface {
GetModel(attributes map[string]interface{}) interface{}
}
type BaseOutputConfiguration ¶
type BaseOutputConfiguration struct {
Tracing BaseOutputConfigurationTracing `cfg:"tracing"`
}
func (*BaseOutputConfiguration) SetTracing ¶
func (b *BaseOutputConfiguration) SetTracing(enabled bool)
type BaseOutputConfigurationAware ¶
type BaseOutputConfigurationAware interface {
SetTracing(enabled bool)
}
type BaseOutputConfigurationTracing ¶
type BaseOutputConfigurationTracing struct {
Enabled bool `cfg:"enabled" default:"true"`
}
type BatchConsumer ¶
type BatchConsumer struct {
// contains filtered or unexported fields
}
func NewBatchConsumerWithInterfaces ¶
func NewBatchConsumerWithInterfaces(base *baseConsumer, callback BatchConsumerCallback, ticker *time.Ticker, settings *BatchConsumerSettings) *BatchConsumer
type BatchConsumerCallback ¶
type BatchConsumerSettings ¶
type Chunks ¶
type Chunks []Chunk
func BuildChunks ¶
func BuildChunks(batch []WritableMessage, size int) (Chunks, error)
type CompressionType ¶
type CompressionType string
const ( CompressionNone CompressionType = "none" CompressionGZip CompressionType = "application/gzip" )
func GetCompressionAttribute ¶
func GetCompressionAttribute(attributes map[string]interface{}) (*CompressionType, error)
GetCompressionAttribute returns the compression attribute if one is set, nil if none is set, and an error if the set value is of the wrong type.
func (CompressionType) String ¶
func (s CompressionType) String() string
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumerWithInterfaces ¶
func NewConsumerWithInterfaces(base *baseConsumer, callback ConsumerCallback) *Consumer
type ConsumerAcknowledge ¶
type ConsumerAcknowledge struct {
// contains filtered or unexported fields
}
func NewConsumerAcknowledgeWithInterfaces ¶
func NewConsumerAcknowledgeWithInterfaces(logger log.Logger, input Input) ConsumerAcknowledge
func (*ConsumerAcknowledge) Acknowledge ¶
func (c *ConsumerAcknowledge) Acknowledge(ctx context.Context, cdata *consumerData, ack bool)
func (*ConsumerAcknowledge) AcknowledgeBatch ¶
func (c *ConsumerAcknowledge) AcknowledgeBatch(ctx context.Context, cdata []*consumerData, acks []bool)
type ConsumerCallback ¶
type ConsumerCallbackFactory ¶
type ConsumerCallbackMap ¶
type ConsumerCallbackMap map[string]ConsumerCallbackFactory
type ConsumerMetadata ¶
type ConsumerRetrySettings ¶
type ConsumerSettings ¶
type ConsumerSettings struct { Input string `cfg:"input" default:"consumer" validate:"required"` RunnerCount int `cfg:"runner_count" default:"1" validate:"min=1"` Encoding EncodingType `cfg:"encoding" default:"application/json"` IdleTimeout time.Duration `cfg:"idle_timeout" default:"10s"` Retry ConsumerRetrySettings `cfg:"retry"` }
type EncodeHandler ¶
type EncodingType ¶
type EncodingType string
const ( EncodingJson EncodingType = "application/json" EncodingProtobuf EncodingType = "application/x-protobuf" )
func GetEncodingAttribute ¶
func GetEncodingAttribute(attributes map[string]interface{}) (*EncodingType, error)
GetEncodingAttribute returns the encoding attribute if one is set, nil if none is set, and an error if the set value is of the wrong type.
func (EncodingType) String ¶
func (s EncodingType) String() string
type FileOutputMode ¶
type FileOutputMode string
const ( FileOutputModeAppend FileOutputMode = "append" FileOutputModeSingle FileOutputMode = "single" FileOutputModeTruncate FileOutputMode = "truncate" )
type FileOutputSettings ¶
type FileOutputSettings struct { Filename string `cfg:"filename"` Mode FileOutputMode `cfg:"mode" default:"append"` }
type FileSettings ¶
type InMemoryInput ¶
type InMemoryInput struct {
// contains filtered or unexported fields
}
func NewInMemoryInput ¶
func NewInMemoryInput(settings *InMemorySettings) *InMemoryInput
func ProvideInMemoryInput ¶
func ProvideInMemoryInput(name string, settings *InMemorySettings) *InMemoryInput
func (*InMemoryInput) Data ¶
func (i *InMemoryInput) Data() <-chan *Message
func (*InMemoryInput) Publish ¶
func (i *InMemoryInput) Publish(messages ...*Message)
func (*InMemoryInput) Reset ¶
func (i *InMemoryInput) Reset()
func (*InMemoryInput) Stop ¶
func (i *InMemoryInput) Stop()
type InMemoryOutput ¶
type InMemoryOutput struct {
// contains filtered or unexported fields
}
func NewInMemoryOutput ¶
func NewInMemoryOutput() *InMemoryOutput
func ProvideInMemoryOutput ¶
func ProvideInMemoryOutput(name string) *InMemoryOutput
func (*InMemoryOutput) Clear ¶
func (o *InMemoryOutput) Clear()
func (*InMemoryOutput) ContainsBody ¶
func (o *InMemoryOutput) ContainsBody(body string) bool
func (*InMemoryOutput) Len ¶
func (o *InMemoryOutput) Len() int
func (*InMemoryOutput) Size ¶
func (o *InMemoryOutput) Size() int
func (*InMemoryOutput) Write ¶
func (o *InMemoryOutput) Write(_ context.Context, batch []WritableMessage) error
func (*InMemoryOutput) WriteOne ¶
func (o *InMemoryOutput) WriteOne(ctx context.Context, msg WritableMessage) error
type InMemoryOutputConfiguration ¶
type InMemoryOutputConfiguration struct { BaseOutputConfiguration Type string `cfg:"type" default:"inMemory"` }
type InMemorySettings ¶
type InMemorySettings struct {
Size int `cfg:"size" default:"1"`
}
type Input ¶
type Input interface { // Run provides a steady stream of messages, returned via Data. Run does not return until Stop is called and thus // should be called in its own go routine. The only exception to this is if we either fail to produce messages and // return an error or if the input is depleted (like an InMemoryInput). // // Run should only be called once, not all inputs can be resumed. Run(ctx context.Context) error // Stop causes Run to return as fast as possible. Calling Stop is preferable to canceling the context passed to Run // as it allows Run to shut down cleaner (and might take a bit longer, e.g., to finish processing the current batch // of messages). Stop() // Data returns a channel containing the messages produced by this input. Data() <-chan *Message }
An Input provides you with a steady stream of messages until you Stop it.
func NewConfigurableInput ¶
func NewFileInput ¶
func NewFileInputWithInterfaces ¶
func NewFileInputWithInterfaces(logger log.Logger, settings FileSettings) Input
func NewKinesisInput ¶
func NewRedisListInput ¶
type InputFactory ¶
type KafkaInput ¶
type KafkaInput struct {
// contains filtered or unexported fields
}
func NewKafkaInput ¶
func NewKafkaInputWithInterfaces ¶
func NewKafkaInputWithInterfaces(consumer *kafkaConsumer.Consumer) (*KafkaInput, error)
func (*KafkaInput) Ack ¶
Ack acknowledges a message. If possible, prefer calling Ack with a batch as it is more efficient.
func (*KafkaInput) AckBatch ¶
AckBatch does the same as calling Ack for every single message would, but it might use fewer calls to an external service.
func (*KafkaInput) Data ¶
func (i *KafkaInput) Data() <-chan *Message
Data returns a channel containing the messages produced by this input.
func (*KafkaInput) Run ¶
func (i *KafkaInput) Run(ctx context.Context) error
Run provides a steady stream of messages, returned via Data. Run does not return until Stop is called and thus should be called in its own go routine. The only exception to this is if we either fail to produce messages and return an error or if the input is depleted (like an InMemoryInput).
Run should only be called once, not all inputs can be resumed.
func (*KafkaInput) Stop ¶
func (i *KafkaInput) Stop()
Stop causes Run to return as fast as possible. Calling Stop is preferable to canceling the context passed to Run as it allows Run to shut down cleaner (and might take a bit longer, e.g., to finish processing the current batch of messages).
type KafkaOutput ¶
type KafkaOutput struct {
// contains filtered or unexported fields
}
func NewKafkaOutput ¶
func NewKafkaOutputWithInterfaces ¶
func NewKafkaOutputWithInterfaces(ctx context.Context, producer *kafkaProducer.Producer) (*KafkaOutput, error)
func (*KafkaOutput) Write ¶
func (o *KafkaOutput) Write(ctx context.Context, ms []WritableMessage) error
func (*KafkaOutput) WriteOne ¶
func (o *KafkaOutput) WriteOne(ctx context.Context, m WritableMessage) error
type KafkaSourceMessage ¶
type KafkaSourceMessage struct {
kafka.Message
}
func (KafkaSourceMessage) MarshalJSON ¶
func (k KafkaSourceMessage) MarshalJSON() ([]byte, error)
type KinesisOutputConfiguration ¶
type KinesisOutputConfiguration struct { BaseOutputConfiguration Type string `cfg:"type" default:"kinesis"` Project string `cfg:"project"` Family string `cfg:"family"` Group string `cfg:"group"` Application string `cfg:"application"` ClientName string `cfg:"client_name" default:"default"` StreamName string `cfg:"stream_name"` }
type KinesisOutputSettings ¶
func (KinesisOutputSettings) GetAppId ¶
func (s KinesisOutputSettings) GetAppId() cfg.AppId
func (KinesisOutputSettings) GetClientName ¶
func (s KinesisOutputSettings) GetClientName() string
func (KinesisOutputSettings) GetStreamName ¶
func (s KinesisOutputSettings) GetStreamName() string
type Message ¶
type Message struct { Attributes map[string]interface{} `json:"attributes"` Body string `json:"body"` }
func BuildAggregateMessage ¶
func KafkaToGosoMessage ¶
func KafkaToGosoMessage(k kafka.Message) *Message
func MarshalJsonMessage ¶
func MarshalProtobufMessage ¶
func MarshalProtobufMessage(body ProtobufEncodable, attributes ...map[string]interface{}) (*Message, error)
func MessageUnmarshaller ¶
func NewJsonMessage ¶
func NewMessage ¶
func NewProtobufMessage ¶
func RawUnmarshaller ¶
func SnsUnmarshaller ¶
func (*Message) GetAttributes ¶
func (*Message) MarshalToBytes ¶
func (*Message) MarshalToString ¶
func (*Message) UnmarshalFromBytes ¶
func (*Message) UnmarshalFromString ¶
type MessageBodyCompressor ¶
type MessageBodyEncoder ¶
type MessageBodyEncoder interface { Encode(data interface{}) ([]byte, error) Decode(data []byte, out interface{}) error }
func NewJsonEncoder ¶
func NewJsonEncoder() MessageBodyEncoder
func NewProtobufEncoder ¶
func NewProtobufEncoder() MessageBodyEncoder
type MessageEncoder ¶
type MessageEncoderSettings ¶
type MessageEncoderSettings struct { Encoding EncodingType Compression CompressionType EncodeHandlers []EncodeHandler }
type MessagesPerRunnerCwNamingSettings ¶
type MessagesPerRunnerCwNamingSettings struct {
Pattern string `cfg:"pattern,nodecode" default:"{project}/{env}/{family}/{group}-{app}"`
}
type MessagesPerRunnerCwServiceNamingSettings ¶
type MessagesPerRunnerCwServiceNamingSettings struct {
Naming MessagesPerRunnerCwNamingSettings `cfg:"naming"`
}
type MessagesPerRunnerDdbNamingSettings ¶
type MessagesPerRunnerDdbNamingSettings struct {
Pattern string `cfg:"pattern,nodecode" default:"{project}-{env}-{family}-{modelId}"`
}
type MessagesPerRunnerDdbServiceNamingSettings ¶
type MessagesPerRunnerDdbServiceNamingSettings struct {
Naming MessagesPerRunnerDdbNamingSettings `cfg:"naming"`
}
type MessagesPerRunnerMetricSettings ¶
type MessagesPerRunnerMetricSettings struct { Enabled bool `cfg:"enabled"` Ecs MessagesPerRunnerEcsSettings `cfg:"ecs"` LeaderElection string `cfg:"leader_election" default:"streamMprMetrics"` MaxIncreasePercent float64 `cfg:"max_increase_percent" default:"200"` MaxIncreasePeriod time.Duration `cfg:"max_increase_period" default:"5m"` DynamoDb MessagesPerRunnerDdbServiceNamingSettings `cfg:"dynamodb"` Cloudwatch MessagesPerRunnerCwServiceNamingSettings `cfg:"cloudwatch"` Period time.Duration `cfg:"period" default:"1m"` TargetValue float64 `cfg:"target_value" default:"0"` }
type MessagesPerRunnerMetricWriter ¶
type MessagesPerRunnerMetricWriter struct { kernel.EssentialModule kernel.ServiceStage // contains filtered or unexported fields }
func NewMessagesPerRunnerMetricWriterWithInterfaces ¶
func NewMessagesPerRunnerMetricWriterWithInterfaces(logger log.Logger, leaderElection ddb.LeaderElection, cwClient gosoCloudwatch.Client, metricWriter metric.Writer, clock clock.Clock, ticker clock.Ticker, settings *MessagesPerRunnerMetricWriterSettings) (*MessagesPerRunnerMetricWriter, error)
type ModelMsg ¶
func CreateModelMsg ¶
type NoOpOutput ¶
type NoOpOutput struct{}
func (*NoOpOutput) Write ¶
func (o *NoOpOutput) Write(_ context.Context, _ []WritableMessage) error
func (*NoOpOutput) WriteOne ¶
func (o *NoOpOutput) WriteOne(_ context.Context, _ WritableMessage) error
type Output ¶
type Output interface { WriteOne(ctx context.Context, msg WritableMessage) error Write(ctx context.Context, batch []WritableMessage) error }
func NewConfigurableOutput ¶
func NewFileOutput ¶
func NewKinesisOutput ¶
func NewKinesisOutputWithInterfaces ¶
func NewKinesisOutputWithInterfaces(recordWriter gosoKinesis.RecordWriter) Output
func NewRedisListOutput ¶
func NewSnsOutput ¶
func NewSqsOutput ¶
type OutputChannel ¶
type OutputChannel interface { Read() ([]WritableMessage, bool) Write(msg []WritableMessage) Close() }
func NewOutputChannel ¶
func NewOutputChannel(logger log.Logger, bufferSize int) OutputChannel
type OutputFactory ¶
type PartitionedOutput ¶
type PartitionerRand ¶
type Producer ¶
type Producer interface { WriteOne(ctx context.Context, model interface{}, attributeSets ...map[string]interface{}) error Write(ctx context.Context, models interface{}, attributeSets ...map[string]interface{}) error }
func NewMultiProducer ¶
type ProducerDaemonAggregator ¶
type ProducerDaemonAggregator interface { Write(ctx context.Context, msg *Message) ([]AggregateFlush, error) Flush() ([]AggregateFlush, error) }
func NewProducerDaemonAggregator ¶
func NewProducerDaemonAggregator(settings ProducerDaemonSettings, compression CompressionType, attributeSets ...map[string]interface{}) (ProducerDaemonAggregator, error)
func NewProducerDaemonPartitionedAggregator ¶
func NewProducerDaemonPartitionedAggregator(logger log.Logger, settings ProducerDaemonSettings, compression CompressionType) (ProducerDaemonAggregator, error)
func NewProducerDaemonPartitionedAggregatorWithInterfaces ¶
func NewProducerDaemonPartitionedAggregatorWithInterfaces(logger log.Logger, rand PartitionerRand, aggregators int, createAggregator func(attributes map[string]interface{}) (ProducerDaemonAggregator, error)) (ProducerDaemonAggregator, error)
type ProducerDaemonBatcher ¶
type ProducerDaemonBatcher interface { Append(msg *Message) ([]WritableMessage, error) Flush() []WritableMessage }
func NewProducerDaemonBatcher ¶
func NewProducerDaemonBatcher(settings ProducerDaemonSettings) ProducerDaemonBatcher
type ProducerDaemonSettings ¶
type ProducerDaemonSettings struct { Enabled bool `cfg:"enabled" default:"false"` // Amount of time spend waiting for messages before sending out a batch. Interval time.Duration `cfg:"interval" default:"1m"` // Size of the buffer channel, i.e., how many messages can be in-flight at once? Generally it is a good idea to match // this with the number of runners. BufferSize int `cfg:"buffer_size" default:"10" validate:"min=1"` // Number of daemons running in the background, writing complete batches to the output. RunnerCount int `cfg:"runner_count" default:"10" validate:"min=1"` // How many SQS messages do we submit in a single batch? SQS can accept up to 10 messages per batch. // SNS doesn't support batching, so the value doesn't matter for SNS. BatchSize int `cfg:"batch_size" default:"10" validate:"min=1"` // How large may the sum of all messages in the aggregation be? For SQS you can't send more than 256 KB in one batch, // for SNS a single message can't be larger than 256 KB. We use 252 KB as default to leave some room for request // encoding and overhead. BatchMaxSize int `cfg:"batch_max_size" default:"258048" validate:"min=0"` // How many stream.Messages do we pack together in a single batch (one message in SQS) at once? AggregationSize int `cfg:"aggregation_size" default:"1" validate:"min=1"` // Maximum size in bytes of a batch. Defaults to 64 KB to leave some room for encoding overhead. // Set to 0 to disable limiting the maximum size for a batch (it will still not put more than BatchSize messages // in a batch). // // Note: Gosoline can't ensure your messages stay below this size if your messages are quite large (especially when // using compression). Imagine you already aggregated 40kb of compressed messages (around 53kb when base64 encoded) // and are now writing a message that compresses to 20 kb. Now your buffer reaches 60 kb and 80 kb base64 encoded. // Gosoline will not already output a 53 kb message if you requested 64 kb messages (it would accept a 56 kb message), // but after writing the next message AggregationMaxSize int `cfg:"aggregation_max_size" default:"65536" validate:"min=0"` // If you are writing to an output using a partition key, we ensure messages are still distributed to a partition // according to their partition key (although not necessary the same partition as without the producer daemon). // For this, we split the messages into buckets while collecting them, thus potentially aggregating more messages in // memory (depending on the number of buckets you configure). // // Note: This still does not guarantee that your messages are perfectly ordered - this is impossible as soon as you // have more than once producer. However, messages with the same partition key will end up in the same shard, so if // you are reading two different shards and one is much further behind than the other, you will not see messages // *massively* out of order - it should be roughly bounded by the time you buffer messages (the Interval setting) and // thus be not much more than a minute (using the default setting) instead of hours (if one shard is half a day behind // while the other is up-to-date). // // Second note: If you change the amount of partitions, messages might move between buckets and thus end up in different // shards than before. Thus, only do this if you can handle it (e.g., because no shard is currently lagging behind). PartitionBucketCount int `cfg:"partition_bucket_count" default:"128" validate:"min=1"` // Additional attributes we append to each message MessageAttributes map[string]interface{} `cfg:"message_attributes"` }
type ProducerMetadata ¶
type ProducerSettings ¶
type ProducerSettings struct { Output string `cfg:"output"` Encoding EncodingType `cfg:"encoding"` Compression CompressionType `cfg:"compression" default:"none"` Daemon ProducerDaemonSettings `cfg:"daemon"` }
type ProtobufEncodable ¶
type RawMessage ¶
type RawMessage struct { Body interface{} Encoder MessageBodyEncoder }
func NewRawJsonMessage ¶
func NewRawJsonMessage(body interface{}) *RawMessage
NewRawJsonMessage works like NewRawMessage with the encoder set to marshal the body as JSON.
func NewRawMessage ¶
func NewRawMessage(body interface{}, encoder MessageBodyEncoder) *RawMessage
NewRawMessage creates a new RawMessage. It uses the provided encoder to encode the message body.
func (*RawMessage) MarshalToBytes ¶
func (m *RawMessage) MarshalToBytes() ([]byte, error)
func (*RawMessage) MarshalToString ¶
func (m *RawMessage) MarshalToString() (string, error)
type RedisListInputSettings ¶
type RedisListOutputSettings ¶
type RetryHandler ¶
func NewRetryHandler ¶
func NewRetryHandler(ctx context.Context, config cfg.Config, logger log.Logger, consumerSettings *ConsumerRetrySettings, name string) (RetryHandler, error)
func NewRetryHandlerNoop ¶
type RetryHandlerFactory ¶
type RetryHandlerNoop ¶
type RetryHandlerNoop struct {
// contains filtered or unexported fields
}
func NewRetryHandlerNoopWithInterfaces ¶
func NewRetryHandlerNoopWithInterfaces() *RetryHandlerNoop
func (*RetryHandlerNoop) Data ¶
func (r *RetryHandlerNoop) Data() <-chan *Message
func (*RetryHandlerNoop) Put ¶
func (r *RetryHandlerNoop) Put(ctx context.Context, msg *Message) error
func (*RetryHandlerNoop) Stop ¶
func (r *RetryHandlerNoop) Stop()
type RetryHandlerSettings ¶
type RetryHandlerSqs ¶
type RetryHandlerSqs struct { AcknowledgeableInput // contains filtered or unexported fields }
func NewRetryHandlerSqsWithInterfaces ¶
func NewRetryHandlerSqsWithInterfaces(input AcknowledgeableInput, output Output, settings *RetryHandlerSqsSettings) *RetryHandlerSqs
type RetryHandlerSqsSettings ¶
type RetryHandlerSqsSettings struct { cfg.AppId RetryHandlerSettings ClientName string `cfg:"client_name" default:"default"` MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"` WaitTime int32 `cfg:"wait_time" default:"10"` RunnerCount int `cfg:"runner_count" default:"1"` QueueId string `cfg:"queue_id"` }
type RetryableInput ¶
type RetryableInput interface {
HasRetry() bool
}
type RunnableBatchConsumerCallback ¶
type RunnableBatchConsumerCallback interface { BatchConsumerCallback RunnableCallback }
type RunnableCallback ¶
type RunnableConsumerCallback ¶
type RunnableConsumerCallback interface { ConsumerCallback RunnableCallback }
type SizeRestrictedOutput ¶
type SizeRestrictedOutput interface { Output // GetMaxMessageSize returns the maximum size of a message for this output (or nil if there is no limit on message size). GetMaxMessageSize() *int // GetMaxBatchSize returns the maximum number of messages we can write at once to the output (or nil if there is no limit). GetMaxBatchSize() *int }
type SnsInputConfiguration ¶
type SnsInputConfiguration struct { Type string `cfg:"type" default:"sns"` ConsumerId string `cfg:"id" validate:"required"` Family string `cfg:"family" default:""` Group string `cfg:"group" default:""` Application string `cfg:"application" default:""` Targets []SnsInputTargetConfiguration `cfg:"targets" validate:"min=1"` MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"` WaitTime int32 `cfg:"wait_time" default:"3" validate:"min=1"` VisibilityTimeout int `cfg:"visibility_timeout" default:"30" validate:"min=1"` RunnerCount int `cfg:"runner_count" default:"1" validate:"min=1"` RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"` ClientName string `cfg:"client_name" default:"default"` }
type SnsInputSettings ¶
type SnsInputSettings struct { cfg.AppId QueueId string `cfg:"queue_id"` MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"` WaitTime int32 `cfg:"wait_time"` RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"` VisibilityTimeout int `cfg:"visibility_timeout"` RunnerCount int `cfg:"runner_count"` ClientName string `cfg:"client_name"` }
func (SnsInputSettings) GetAppId ¶
func (s SnsInputSettings) GetAppId() cfg.AppId
func (SnsInputSettings) GetClientName ¶
func (s SnsInputSettings) GetClientName() string
func (SnsInputSettings) GetQueueId ¶
func (s SnsInputSettings) GetQueueId() string
func (SnsInputSettings) IsFifoEnabled ¶
func (s SnsInputSettings) IsFifoEnabled() bool
type SnsInputTarget ¶
type SnsInputTarget struct { cfg.AppId TopicId string Attributes map[string]interface{} ClientName string }
func (SnsInputTarget) GetAppId ¶
func (t SnsInputTarget) GetAppId() cfg.AppId
func (SnsInputTarget) GetClientName ¶
func (t SnsInputTarget) GetClientName() string
func (SnsInputTarget) GetTopicId ¶
func (t SnsInputTarget) GetTopicId() string
type SnsInputTargetConfiguration ¶
type SnsInputTargetConfiguration struct { Family string `cfg:"family"` Group string `cfg:"group" validate:"required"` Application string `cfg:"application" validate:"required"` TopicId string `cfg:"topic_id" validate:"required"` Attributes map[string]interface{} `cfg:"attributes"` ClientName string `cfg:"client_name" default:"default"` }
type SnsOutputConfiguration ¶
type SnsOutputConfiguration struct { BaseOutputConfiguration Type string `cfg:"type" default:"sns"` Project string `cfg:"project"` Family string `cfg:"family"` Group string `cfg:"group"` Application string `cfg:"application"` TopicId string `cfg:"topic_id" validate:"required"` ClientName string `cfg:"client_name" default:"default"` }
type SnsOutputSettings ¶
func (SnsOutputSettings) GetAppId ¶
func (s SnsOutputSettings) GetAppId() cfg.AppId
func (SnsOutputSettings) GetClientName ¶
func (s SnsOutputSettings) GetClientName() string
func (SnsOutputSettings) GetTopicId ¶
func (s SnsOutputSettings) GetTopicId() string
type SqsInputSettings ¶
type SqsInputSettings struct { cfg.AppId QueueId string `cfg:"queue_id"` MaxNumberOfMessages int32 `cfg:"max_number_of_messages" default:"10" validate:"min=1,max=10"` WaitTime int32 `cfg:"wait_time"` VisibilityTimeout int `cfg:"visibility_timeout"` RunnerCount int `cfg:"runner_count"` Fifo sqs.FifoSettings `cfg:"fifo"` RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"` ClientName string `cfg:"client_name"` Unmarshaller string `cfg:"unmarshaller" default:"msg"` }
func (SqsInputSettings) GetAppId ¶
func (s SqsInputSettings) GetAppId() cfg.AppId
func (SqsInputSettings) GetClientName ¶
func (s SqsInputSettings) GetClientName() string
func (SqsInputSettings) GetQueueId ¶
func (s SqsInputSettings) GetQueueId() string
func (SqsInputSettings) IsFifoEnabled ¶
func (s SqsInputSettings) IsFifoEnabled() bool
type SqsOutputConfiguration ¶
type SqsOutputConfiguration struct { BaseOutputConfiguration Type string `cfg:"type" default:"sqs"` Project string `cfg:"project"` Family string `cfg:"family"` Group string `cfg:"group"` Application string `cfg:"application"` QueueId string `cfg:"queue_id" validate:"required"` VisibilityTimeout int `cfg:"visibility_timeout" default:"30" validate:"gt=0"` RedrivePolicy sqs.RedrivePolicy `cfg:"redrive_policy"` Fifo sqs.FifoSettings `cfg:"fifo"` ClientName string `cfg:"client_name" default:"default"` }
type SqsOutputSettings ¶
type SqsOutputSettings struct { cfg.AppId ClientName string Fifo sqs.FifoSettings QueueId string QueueNamePattern string RedrivePolicy sqs.RedrivePolicy VisibilityTimeout int }
func (SqsOutputSettings) GetAppId ¶
func (s SqsOutputSettings) GetAppId() cfg.AppId
func (SqsOutputSettings) GetClientName ¶
func (s SqsOutputSettings) GetClientName() string
func (SqsOutputSettings) GetQueueId ¶
func (s SqsOutputSettings) GetQueueId() string
func (SqsOutputSettings) IsFifoEnabled ¶
func (s SqsOutputSettings) IsFifoEnabled() bool
type UnmarshallerFunc ¶
type WritableMessage ¶
type WritableMessage interface { MarshalToBytes() ([]byte, error) MarshalToString() (string, error) }
func MessagesToWritableMessages ¶
func MessagesToWritableMessages(batch []*Message) []WritableMessage
Source Files ¶
- chunking.go
- compression.go
- config.go
- consumer.go
- consumer_acknowledge.go
- consumer_base.go
- consumer_batch.go
- consumer_module_factory.go
- encoding.go
- encoding_json.go
- encoding_protobuf.go
- input.go
- input_configurable.go
- input_file.go
- input_in_memory.go
- input_kafka.go
- input_kinesis.go
- input_redis_list.go
- input_sns.go
- input_sqs.go
- message.go
- message_builder.go
- message_encoding.go
- message_kafka.go
- message_model.go
- mpr_config_postprocessor.go
- mpr_metric_module.go
- mpr_queue_names.go
- mpr_settings.go
- multi_producer.go
- output.go
- output_channel.go
- output_configurable.go
- output_configurable_multiple.go
- output_file.go
- output_in_memory.go
- output_kafka.go
- output_kinesis.go
- output_noop.go
- output_redis_list.go
- output_sns.go
- output_sqs.go
- output_tracer.go
- producer.go
- producer_daemon.go
- producer_daemon_aggregator.go
- producer_daemon_batcher.go
- producer_daemon_config_postprocessor.go
- producer_daemon_factory.go
- producer_daemon_partitioned_aggregator.go
- raw_message.go
- retry.go
- retry_noop.go
- retry_sqs.go
- unmarshal.go