kinesis

package
v0.31.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MetadataKeyKinsumers = "cloud.aws.kinesis.kinsumers"
)
View Source
const ShardTimeout = time.Hour * 24 * 8

ShardTimeout describes the timeout until we remove records about shards from ddb. kinesis has a maximum retention time of 7 days, so we are safe to remove old data from ddb after 8 days (or 7 days and 1 second...). We need this to clean up the database if there are old shards for a stream (i.e., after a stream was scaled and the number of shards was reduced or a shard was replaced by different shards).

Variables

View Source
var (
	ErrCheckpointNoLongerOwned   = fmt.Errorf("can not persist checkpoint which is no longer owned")
	ErrCheckpointAlreadyReleased = fmt.Errorf("failed to release checkpoint, it was already released")
	ErrShardAlreadyFinished      = fmt.Errorf("shard was alread finished")
)

Functions

func NewClient

func NewClient(ctx context.Context, config cfg.Config, logger log.Logger, name string, optFns ...ClientOption) (*kinesis.Client, error)

func NewNoSuchStreamError

func NewNoSuchStreamError(stream Stream) error

func NewStreamBusyError

func NewStreamBusyError(stream Stream) error

func ProvideClient

func ProvideClient(ctx context.Context, config cfg.Config, logger log.Logger, name string, optFns ...ClientOption) (*kinesis.Client, error)

Types

type BackoffDelayer

type BackoffDelayer struct {
	*gosoAws.BackoffDelayer
	// contains filtered or unexported fields
}

func NewBackoffDelayer

func NewBackoffDelayer(initialInterval time.Duration, maxInterval time.Duration, readProvisionedThroughputDelay time.Duration) *BackoffDelayer

func (*BackoffDelayer) BackoffDelay

func (d *BackoffDelayer) BackoffDelay(attempt int, err error) (time.Duration, error)

type BaseRecord

type BaseRecord struct {
	Namespace string    `json:"namespace" ddb:"key=hash"`
	Resource  string    `json:"resource" ddb:"key=range"`
	UpdatedAt time.Time `json:"updatedAt"`
	Ttl       *int64    `json:"ttl,omitempty" ddb:"ttl=enabled"`
}

A BaseRecord contains all the fields shared between the main table and all LSIs.

type Checkpoint

type Checkpoint interface {
	CheckpointWithoutRelease
	// Release releases ownership over a shard. Do not use the Checkpoint afterward.
	Release(ctx context.Context) error
}

A Checkpoint describes our position in a shard of the stream.

type CheckpointRecord

type CheckpointRecord struct {
	BaseRecord
	OwningClientId    ClientId       `json:"owningClientId,omitempty"`
	SequenceNumber    SequenceNumber `json:"sequenceNumber,omitempty"`
	LastShardIterator ShardIterator  `json:"lastShardIterator,omitempty"`
	FinishedAt        *time.Time     `json:"finishedAt"`
}

type CheckpointWithoutRelease

type CheckpointWithoutRelease interface {
	GetSequenceNumber() SequenceNumber
	GetShardIterator() ShardIterator
	// Advance updates our Checkpoint to include all the sequence numbers up to (and including) the new sequence number.
	Advance(sequenceNumber SequenceNumber, shardIterator ShardIterator) error
	// Done marks a shard as completely consumed, i.e., there are no further records left to consume.
	Done(sequenceNumber SequenceNumber) error
	// Persist writes the current Checkpoint to the database and renews our lock on the shard. Thus, you have to call
	// Persist from time to time, otherwise you will lose your hold on that lock. If we finished the shard (by calling Done),
	// Persist will tell us to Release the Checkpoint (and shard).
	Persist(ctx context.Context) (shouldRelease bool, err error)
}

CheckpointWithoutRelease consists of the Checkpoint interface without the release method. We only use this internally to ensure Release can only be called when we have taken ownership of the Checkpoint.

type Client

type Client interface {
	AddTagsToStream(ctx context.Context, params *kinesis.AddTagsToStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.AddTagsToStreamOutput, error)
	CreateStream(ctx context.Context, params *kinesis.CreateStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.CreateStreamOutput, error)
	DecreaseStreamRetentionPeriod(ctx context.Context, params *kinesis.DecreaseStreamRetentionPeriodInput, optFns ...func(*kinesis.Options)) (*kinesis.DecreaseStreamRetentionPeriodOutput, error)
	DeleteStream(ctx context.Context, params *kinesis.DeleteStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DeleteStreamOutput, error)
	DeregisterStreamConsumer(ctx context.Context, params *kinesis.DeregisterStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.DeregisterStreamConsumerOutput, error)
	DescribeLimits(ctx context.Context, params *kinesis.DescribeLimitsInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeLimitsOutput, error)
	DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error)
	DescribeStreamConsumer(ctx context.Context, params *kinesis.DescribeStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamConsumerOutput, error)
	DescribeStreamSummary(ctx context.Context, params *kinesis.DescribeStreamSummaryInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamSummaryOutput, error)
	DisableEnhancedMonitoring(ctx context.Context, params *kinesis.DisableEnhancedMonitoringInput, optFns ...func(*kinesis.Options)) (*kinesis.DisableEnhancedMonitoringOutput, error)
	EnableEnhancedMonitoring(ctx context.Context, params *kinesis.EnableEnhancedMonitoringInput, optFns ...func(*kinesis.Options)) (*kinesis.EnableEnhancedMonitoringOutput, error)
	GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error)
	GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error)
	IncreaseStreamRetentionPeriod(ctx context.Context, params *kinesis.IncreaseStreamRetentionPeriodInput, optFns ...func(*kinesis.Options)) (*kinesis.IncreaseStreamRetentionPeriodOutput, error)
	ListShards(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error)
	ListStreamConsumers(ctx context.Context, params *kinesis.ListStreamConsumersInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamConsumersOutput, error)
	ListStreams(ctx context.Context, params *kinesis.ListStreamsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamsOutput, error)
	ListTagsForStream(ctx context.Context, params *kinesis.ListTagsForStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.ListTagsForStreamOutput, error)
	MergeShards(ctx context.Context, params *kinesis.MergeShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.MergeShardsOutput, error)
	PutRecord(ctx context.Context, params *kinesis.PutRecordInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordOutput, error)
	PutRecords(ctx context.Context, params *kinesis.PutRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error)
	RegisterStreamConsumer(ctx context.Context, params *kinesis.RegisterStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.RegisterStreamConsumerOutput, error)
	RemoveTagsFromStream(ctx context.Context, params *kinesis.RemoveTagsFromStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.RemoveTagsFromStreamOutput, error)
	SplitShard(ctx context.Context, params *kinesis.SplitShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SplitShardOutput, error)
	StartStreamEncryption(ctx context.Context, params *kinesis.StartStreamEncryptionInput, optFns ...func(*kinesis.Options)) (*kinesis.StartStreamEncryptionOutput, error)
	StopStreamEncryption(ctx context.Context, params *kinesis.StopStreamEncryptionInput, optFns ...func(*kinesis.Options)) (*kinesis.StopStreamEncryptionOutput, error)
	SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error)
	UpdateShardCount(ctx context.Context, params *kinesis.UpdateShardCountInput, optFns ...func(*kinesis.Options)) (*kinesis.UpdateShardCountOutput, error)
	UpdateStreamMode(ctx context.Context, params *kinesis.UpdateStreamModeInput, optFns ...func(*kinesis.Options)) (*kinesis.UpdateStreamModeOutput, error)
}

type ClientConfig

type ClientConfig struct {
	Settings     ClientSettings
	LoadOptions  []func(options *awsCfg.LoadOptions) error
	RetryOptions []func(*retry.StandardOptions)
}

func (ClientConfig) GetLoadOptions

func (c ClientConfig) GetLoadOptions() []func(options *awsCfg.LoadOptions) error

func (ClientConfig) GetRetryOptions

func (c ClientConfig) GetRetryOptions() []func(*retry.StandardOptions)

func (ClientConfig) GetSettings

func (c ClientConfig) GetSettings() gosoAws.ClientSettings

type ClientId

type ClientId string

type ClientOption

type ClientOption func(cfg *ClientConfig)

type ClientRecord

type ClientRecord struct {
	BaseRecord
}

type ClientSettings

type ClientSettings struct {
	gosoAws.ClientSettings
	ReadProvisionedThroughputDelay time.Duration `cfg:"read_provisioned_throughput_exceeded_delay" default:"1s"`
}

type FullRecord

type FullRecord struct {
	BaseRecord
	OwningClientId    ClientId       `json:"owningClientId,omitempty"`
	SequenceNumber    SequenceNumber `json:"sequenceNumber,omitempty"`
	LastShardIterator ShardIterator  `json:"lastShardIterator,omitempty"`
	FinishedAt        *time.Time     `json:"finishedAt"`
}

type Kinsumer

type Kinsumer interface {
	Run(ctx context.Context, handler MessageHandler) error
	Stop()
}

func NewKinsumer

func NewKinsumer(ctx context.Context, config cfg.Config, logger log.Logger, settings *Settings) (Kinsumer, error)

func NewKinsumerWithInterfaces

func NewKinsumerWithInterfaces(logger log.Logger, settings Settings, stream Stream, kinesisClient Client, metadataRepository MetadataRepository, metricWriter metric.Writer, clock clock.Clock, shardReaderFactory func(logger log.Logger, shardId ShardId) ShardReader) Kinsumer

type KinsumerMetadata

type KinsumerMetadata struct {
	AwsClientName  string    `json:"aws_client_name"`
	ClientId       ClientId  `json:"client_id"`
	Name           string    `json:"name"`
	OpenShardCount int       `json:"open_shard_count"`
	StreamAppId    cfg.AppId `json:"stream_app_id"`
	StreamArn      string    `json:"stream_arn"`
	StreamName     string    `json:"stream_name"`
	StreamNameFull Stream    `json:"stream_name_full"`
}

type MessageHandler

type MessageHandler interface {
	Handle(rawMessage []byte) error
	Done()
}

func NewChannelHandler

func NewChannelHandler(records chan []byte) MessageHandler

type MetadataRepository

type MetadataRepository interface {
	// RegisterClient either creates or refreshes our registration and returns our current index as well as the number of
	// clients working together on the stream.
	RegisterClient(ctx context.Context) (clientIndex int, totalClients int, err error)
	// DeregisterClient removes our client and thus indirectly informs the other clients to take over our work
	DeregisterClient(ctx context.Context) error
	// IsShardFinished checks if a shard has already been finished. It might cache this status as normally a shard, once
	// finished, should never change that status again.
	IsShardFinished(ctx context.Context, shardId ShardId) (bool, error)
	// AcquireShard locks a shard for us and ensures no other client is working on it. It might fail to do so and returns
	// nil in this case.
	AcquireShard(ctx context.Context, shardId ShardId) (Checkpoint, error)
}

func NewMetadataRepository

func NewMetadataRepository(ctx context.Context, config cfg.Config, logger log.Logger, stream Stream, clientId ClientId, settings Settings) (MetadataRepository, error)

func NewMetadataRepositoryWithInterfaces

func NewMetadataRepositoryWithInterfaces(logger log.Logger, stream Stream, clientId ClientId, repo ddb.Repository, settings Settings, appId cfg.AppId, clock clock.Clock) MetadataRepository

type NoSuchStreamError

type NoSuchStreamError struct {
	// contains filtered or unexported fields
}

func (*NoSuchStreamError) As

func (e *NoSuchStreamError) As(target interface{}) bool

func (*NoSuchStreamError) Error

func (e *NoSuchStreamError) Error() string

type Record

type Record struct {
	Data            []byte
	PartitionKey    *string
	ExplicitHashKey *string
}

type RecordWriter

type RecordWriter interface {
	PutRecord(ctx context.Context, record *Record) error
	PutRecords(ctx context.Context, batch []*Record) error
}

func NewRecordWriter

func NewRecordWriter(ctx context.Context, config cfg.Config, logger log.Logger, settings *RecordWriterSettings) (RecordWriter, error)

func NewRecordWriterWithInterfaces

func NewRecordWriterWithInterfaces(logger log.Logger, metricWriter metric.Writer, clock clock.Clock, uuidGen uuid.Uuid, client Client, settings *RecordWriterSettings) RecordWriter

type RecordWriterMetadata

type RecordWriterMetadata struct {
	AwsClientName  string `json:"aws_client_name"`
	OpenShardCount int    `json:"open_shard_count"`
	StreamArn      string `json:"stream_arn"`
	StreamName     string `json:"stream_name"`
}

type RecordWriterSettings

type RecordWriterSettings struct {
	ClientName string
	StreamName string
	Backoff    exec.BackoffSettings
}

type SequenceNumber

type SequenceNumber string

type Settings

type Settings struct {
	cfg.AppId
	// Name of the kinesis client to use
	ClientName string `cfg:"client_name" default:"default"`
	// Name of the kinsumer
	Name string
	// Name of the stream (before expanding with project, env, family & application prefix)
	StreamName string `cfg:"stream_name" validate:"required"`
	// The shard reader will sleep until the age of the record is older than this delay
	ConsumeDelay time.Duration `cfg:"consume_delay" default:"0"`
	// InitialPosition of a new kinsumer. Defines the starting position on the stream if no metadata is present.
	InitialPosition SettingsInitialPosition `cfg:"initial_position"`
	// How many records the shard reader should fetch in a single call
	MaxBatchSize int `cfg:"max_batch_size" default:"10000" validate:"gt=0,lte=10000"`
	// Time between reads from empty shards. This defines how fast the kinsumer begins its work. Min = 1ms
	WaitTime time.Duration `cfg:"wait_time" default:"1s" validate:"min=1000000"`
	// Time between writing checkpoints to ddb. This defines how much work you might lose. Min = 100ms
	PersistFrequency time.Duration `cfg:"persist_frequency" default:"5s" validate:"min=100000000"`
	// Time between checks for new shards. This defines how fast it reacts to shard changes. Min = 1s
	DiscoverFrequency time.Duration `cfg:"discover_frequency" default:"1m" validate:"min=1000000000"`
	// How long we extend the deadline of a context when releasing a shard or when deregistering a client. Min = 1s
	ReleaseDelay time.Duration `cfg:"release_delay" default:"5s" validate:"min=1000000000"`
	// Should we write how many milliseconds behind each shard is or only the whole stream?
	ShardLevelMetrics bool `cfg:"shard_level_metrics" default:"false"`
}

func (Settings) GetAppId

func (s Settings) GetAppId() cfg.AppId

func (Settings) GetClientName

func (s Settings) GetClientName() string

func (Settings) GetStreamName

func (s Settings) GetStreamName() string

type SettingsInitialPosition

type SettingsInitialPosition struct {
	Type      types.ShardIteratorType `cfg:"type" default:"TRIM_HORIZON"`
	Timestamp time.Time               `cfg:"timestamp"`
}

type ShardId

type ShardId string

type ShardIterator added in v0.21.0

type ShardIterator string

type ShardReader

type ShardReader interface {
	// Run reads records from this shard until we either run out of records to read (i.e., are done with the shard) or our context
	// is canceled (i.e., we should terminate, maybe, because shards got reassigned, so we need to restart all consumers)
	Run(ctx context.Context, handler func(record []byte) error) error
}

func NewShardReaderWithInterfaces

func NewShardReaderWithInterfaces(stream Stream, shardId ShardId, logger log.Logger, metricWriter metric.Writer, metadataRepository MetadataRepository, kinesisClient Client, settings Settings, clock clock.Clock) ShardReader

type Stream

type Stream string

func GetStreamName

func GetStreamName(config cfg.Config, settings StreamNameSettingsAware) (Stream, error)

type StreamBusyError

type StreamBusyError struct {
	// contains filtered or unexported fields
}

func (*StreamBusyError) As

func (e *StreamBusyError) As(target interface{}) bool

func (*StreamBusyError) Error

func (e *StreamBusyError) Error() string

type StreamDescription

type StreamDescription struct {
	StreamArn      string
	StreamName     string
	OpenShardCount int
}

func CreateKinesisStream

func CreateKinesisStream(ctx context.Context, config cfg.Config, logger log.Logger, client Client, streamName string) (*StreamDescription, error)

CreateKinesisStream ensures a kinesis stream exists if dx.auto_create is set to true.

type StreamNameSettingsAware

type StreamNameSettingsAware interface {
	GetAppId() cfg.AppId
	GetClientName() string
	GetStreamName() string
}

type StreamNamingSettings

type StreamNamingSettings struct {
	Pattern string `cfg:"pattern,nodecode" default:"{project}-{env}-{family}-{group}-{streamName}"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL