Documentation ¶
Index ¶
- Constants
- Variables
- func NewClient(ctx context.Context, config cfg.Config, logger log.Logger, name string, ...) (*kinesis.Client, error)
- func NewNoSuchStreamError(stream Stream) error
- func NewStreamBusyError(stream Stream) error
- func ProvideClient(ctx context.Context, config cfg.Config, logger log.Logger, name string, ...) (*kinesis.Client, error)
- type BackoffDelayer
- type BaseRecord
- type Checkpoint
- type CheckpointRecord
- type CheckpointWithoutRelease
- type Client
- type ClientConfig
- type ClientId
- type ClientOption
- type ClientRecord
- type ClientSettings
- type FullRecord
- type Kinsumer
- type KinsumerMetadata
- type MessageHandler
- type MetadataRepository
- type NoSuchStreamError
- type Record
- type RecordWriter
- type RecordWriterMetadata
- type RecordWriterSettings
- type SequenceNumber
- type Settings
- type SettingsInitialPosition
- type ShardId
- type ShardReader
- type Stream
- type StreamBusyError
- type StreamDescription
- type StreamNameSettingsAware
- type StreamNamingSettings
Constants ¶
const (
MetadataKeyKinsumers = "cloud.aws.kinesis.kinsumers"
)
const ShardTimeout = time.Hour * 24 * 8
ShardTimeout describes the timeout until we remove records about shards from ddb. kinesis has a maximum retention time of 7 days, so we are safe to remove old data from ddb after 8 days (or 7 days and 1 second...). We need this to clean up the database if there are old shards for a stream (i.e., after a stream was scaled and the number of shards was reduced or a shard was replaced by different shards).
Variables ¶
Functions ¶
func NewNoSuchStreamError ¶
func NewStreamBusyError ¶
Types ¶
type BackoffDelayer ¶
type BackoffDelayer struct { *gosoAws.BackoffDelayer // contains filtered or unexported fields }
func NewBackoffDelayer ¶
func (*BackoffDelayer) BackoffDelay ¶
type BaseRecord ¶
type BaseRecord struct { Namespace string `json:"namespace" ddb:"key=hash"` Resource string `json:"resource" ddb:"key=range"` UpdatedAt time.Time `json:"updatedAt"` Ttl *int64 `json:"ttl,omitempty" ddb:"ttl=enabled"` }
A BaseRecord contains all the fields shared between the main table and all LSIs.
type Checkpoint ¶
type Checkpoint interface { CheckpointWithoutRelease // Release releases ownership over a shard. Do not use the Checkpoint afterwards. Release(ctx context.Context) error }
A Checkpoint describes our position in a shard of the stream.
type CheckpointRecord ¶
type CheckpointRecord struct { BaseRecord OwningClientId ClientId `json:"owningClientId,omitempty"` SequenceNumber SequenceNumber `json:"sequenceNumber,omitempty"` FinishedAt *time.Time `json:"finishedAt"` }
type CheckpointWithoutRelease ¶
type CheckpointWithoutRelease interface { GetSequenceNumber() SequenceNumber // Advance updates our Checkpoint to include all the sequence numbers up to (and including) the new sequence number. Advance(sequenceNumber SequenceNumber) error // Done marks a shard as completely consumed, i.e., there are no further records left to consume. Done(sequenceNumber SequenceNumber) error // Persist writes the current Checkpoint to the database and renews our lock on the shard. Thus, you have to call // Persist from time to time, otherwise you will lose your hold on that lock. If we finished the shard (by calling Done), // Persist will tell us to Release the Checkpoint (and shard). Persist(ctx context.Context) (shouldRelease bool, err error) }
CheckpointWithoutRelease consists of the Checkpoint interface without the release method. We only use this internally to ensure Release can only be called when we have taken ownership of the Checkpoint.
type Client ¶
type Client interface { AddTagsToStream(ctx context.Context, params *kinesis.AddTagsToStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.AddTagsToStreamOutput, error) CreateStream(ctx context.Context, params *kinesis.CreateStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.CreateStreamOutput, error) DecreaseStreamRetentionPeriod(ctx context.Context, params *kinesis.DecreaseStreamRetentionPeriodInput, optFns ...func(*kinesis.Options)) (*kinesis.DecreaseStreamRetentionPeriodOutput, error) DeleteStream(ctx context.Context, params *kinesis.DeleteStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DeleteStreamOutput, error) DeregisterStreamConsumer(ctx context.Context, params *kinesis.DeregisterStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.DeregisterStreamConsumerOutput, error) DescribeLimits(ctx context.Context, params *kinesis.DescribeLimitsInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeLimitsOutput, error) DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) DescribeStreamConsumer(ctx context.Context, params *kinesis.DescribeStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamConsumerOutput, error) DescribeStreamSummary(ctx context.Context, params *kinesis.DescribeStreamSummaryInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamSummaryOutput, error) DisableEnhancedMonitoring(ctx context.Context, params *kinesis.DisableEnhancedMonitoringInput, optFns ...func(*kinesis.Options)) (*kinesis.DisableEnhancedMonitoringOutput, error) EnableEnhancedMonitoring(ctx context.Context, params *kinesis.EnableEnhancedMonitoringInput, optFns ...func(*kinesis.Options)) (*kinesis.EnableEnhancedMonitoringOutput, error) GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) IncreaseStreamRetentionPeriod(ctx context.Context, params *kinesis.IncreaseStreamRetentionPeriodInput, optFns ...func(*kinesis.Options)) (*kinesis.IncreaseStreamRetentionPeriodOutput, error) ListShards(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error) ListStreamConsumers(ctx context.Context, params *kinesis.ListStreamConsumersInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamConsumersOutput, error) ListStreams(ctx context.Context, params *kinesis.ListStreamsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamsOutput, error) ListTagsForStream(ctx context.Context, params *kinesis.ListTagsForStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.ListTagsForStreamOutput, error) MergeShards(ctx context.Context, params *kinesis.MergeShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.MergeShardsOutput, error) PutRecord(ctx context.Context, params *kinesis.PutRecordInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordOutput, error) PutRecords(ctx context.Context, params *kinesis.PutRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error) RegisterStreamConsumer(ctx context.Context, params *kinesis.RegisterStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.RegisterStreamConsumerOutput, error) RemoveTagsFromStream(ctx context.Context, params *kinesis.RemoveTagsFromStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.RemoveTagsFromStreamOutput, error) SplitShard(ctx context.Context, params *kinesis.SplitShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SplitShardOutput, error) StartStreamEncryption(ctx context.Context, params *kinesis.StartStreamEncryptionInput, optFns ...func(*kinesis.Options)) (*kinesis.StartStreamEncryptionOutput, error) StopStreamEncryption(ctx context.Context, params *kinesis.StopStreamEncryptionInput, optFns ...func(*kinesis.Options)) (*kinesis.StopStreamEncryptionOutput, error) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) UpdateShardCount(ctx context.Context, params *kinesis.UpdateShardCountInput, optFns ...func(*kinesis.Options)) (*kinesis.UpdateShardCountOutput, error) UpdateStreamMode(ctx context.Context, params *kinesis.UpdateStreamModeInput, optFns ...func(*kinesis.Options)) (*kinesis.UpdateStreamModeOutput, error) }
type ClientConfig ¶
type ClientConfig struct { Settings ClientSettings LoadOptions []func(options *awsCfg.LoadOptions) error RetryOptions []func(*retry.StandardOptions) }
func (ClientConfig) GetLoadOptions ¶
func (c ClientConfig) GetLoadOptions() []func(options *awsCfg.LoadOptions) error
func (ClientConfig) GetRetryOptions ¶
func (c ClientConfig) GetRetryOptions() []func(*retry.StandardOptions)
func (ClientConfig) GetSettings ¶
func (c ClientConfig) GetSettings() gosoAws.ClientSettings
type ClientOption ¶
type ClientOption func(cfg *ClientConfig)
type ClientRecord ¶
type ClientRecord struct {
BaseRecord
}
type ClientSettings ¶
type ClientSettings struct { gosoAws.ClientSettings ReadProvisionedThroughputDelay time.Duration `cfg:"read_provisioned_throughput_exceeded_delay" default:"1s"` }
type FullRecord ¶
type FullRecord struct { BaseRecord OwningClientId ClientId `json:"owningClientId,omitempty"` SequenceNumber SequenceNumber `json:"sequenceNumber,omitempty"` FinishedAt *time.Time `json:"finishedAt"` }
type Kinsumer ¶
type Kinsumer interface { Run(ctx context.Context, handler MessageHandler) error Stop() }
func NewKinsumer ¶
type KinsumerMetadata ¶
type KinsumerMetadata struct { AwsClientName string `json:"aws_client_name"` ClientId ClientId `json:"client_id"` Name string `json:"name"` OpenShardCount int `json:"open_shard_count"` StreamAppId cfg.AppId `json:"stream_app_id"` StreamArn string `json:"stream_arn"` StreamName string `json:"stream_name"` StreamNameFull Stream `json:"stream_name_full"` }
type MessageHandler ¶
func NewChannelHandler ¶
func NewChannelHandler(records chan []byte) MessageHandler
type MetadataRepository ¶
type MetadataRepository interface { // RegisterClient either creates or refreshes our registration and returns our current index as well as the number of // clients working together on the stream. RegisterClient(ctx context.Context) (clientIndex int, totalClients int, err error) // DeregisterClient removes our client and thus indirectly informs the other clients to take over our work DeregisterClient(ctx context.Context) error // IsShardFinished checks if a shard has already been finished. It might cache this status as normally a shard, once // finished, should never change that status again. IsShardFinished(ctx context.Context, shardId ShardId) (bool, error) // AcquireShard locks a shard for us and ensures no other client is working on it. It might fail to do so and returns // nil in this case. AcquireShard(ctx context.Context, shardId ShardId) (Checkpoint, error) }
func NewMetadataRepository ¶
type NoSuchStreamError ¶
type NoSuchStreamError struct {
// contains filtered or unexported fields
}
func (*NoSuchStreamError) As ¶
func (e *NoSuchStreamError) As(target interface{}) bool
func (*NoSuchStreamError) Error ¶
func (e *NoSuchStreamError) Error() string
type RecordWriter ¶
type RecordWriter interface { PutRecord(ctx context.Context, record *Record) error PutRecords(ctx context.Context, batch []*Record) error }
func NewRecordWriter ¶
func NewRecordWriter(ctx context.Context, config cfg.Config, logger log.Logger, settings *RecordWriterSettings) (RecordWriter, error)
type RecordWriterMetadata ¶
type RecordWriterSettings ¶
type RecordWriterSettings struct { ClientName string StreamName string Backoff exec.BackoffSettings }
type SequenceNumber ¶
type SequenceNumber string
type Settings ¶
type Settings struct { cfg.AppId // Name of the kinesis client to use ClientName string `cfg:"client_name" default:"default"` // Name of the kinsumer Name string // Name of the stream (before expanding with project, env, family & application prefix) StreamName string `cfg:"stream_name" validate:"required"` // The shard reader will sleep until the age of the record is older than this delay ConsumeDelay time.Duration `cfg:"consume_delay" default:"0"` // InitialPosition of a new kinsumer. Defines the starting position on the stream if no metadata is present. InitialPosition SettingsInitialPosition `cfg:"initial_position"` // How many records the shard reader should fetch in a single call MaxBatchSize int `cfg:"max_batch_size" default:"10000" validate:"gt=0,lte=10000"` // Time between reads from empty shards. This defines how fast the kinsumer begins its work. Min = 1ms WaitTime time.Duration `cfg:"wait_time" default:"1s" validate:"min=1000000"` // Time between writing checkpoints to ddb. This defines how much work you might lose. Min = 100ms PersistFrequency time.Duration `cfg:"persist_frequency" default:"5s" validate:"min=100000000"` // Time between checks for new shards. This defines how fast it reacts to shard changes. Min = 1s DiscoverFrequency time.Duration `cfg:"discover_frequency" default:"1m" validate:"min=1000000000"` // How long we extend the deadline of a context when releasing a shard or when deregistering a client. Min = 1s ReleaseDelay time.Duration `cfg:"release_delay" default:"5s" validate:"min=1000000000"` // Should we write how many milliseconds behind each shard is or only the whole stream? ShardLevelMetrics bool `cfg:"shard_level_metrics" default:"false"` }
func (Settings) GetClientName ¶
func (Settings) GetStreamName ¶
type SettingsInitialPosition ¶
type SettingsInitialPosition struct { Type types.ShardIteratorType `cfg:"type" default:"TRIM_HORIZON"` Timestamp time.Time `cfg:"timestamp"` }
type ShardReader ¶
type ShardReader interface { // Run reads records from this shard until we either run out of records to read (i.e., are done with the shard) or our context // is canceled (i.e., we should terminate, maybe, because shards got reassigned, so we need to restart all consumers) Run(ctx context.Context, handler func(record []byte) error) error }
type Stream ¶
type Stream string
func GetStreamName ¶
func GetStreamName(config cfg.Config, settings StreamNameSettingsAware) (Stream, error)
type StreamBusyError ¶
type StreamBusyError struct {
// contains filtered or unexported fields
}
func (*StreamBusyError) As ¶
func (e *StreamBusyError) As(target interface{}) bool
func (*StreamBusyError) Error ¶
func (e *StreamBusyError) Error() string
type StreamDescription ¶
type StreamNameSettingsAware ¶
type StreamNamingSettings ¶
type StreamNamingSettings struct {
Pattern string `cfg:"pattern,nodecode" default:"{project}-{env}-{family}-{group}-{streamName}"`
}