config

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// LATEST start after the most recent data record (fetch new data).
	LATEST InitialPositionInStream = iota + 1
	// TRIM_HORIZON start from the oldest available data record
	TRIM_HORIZON
	// AT_TIMESTAMP start from the record at or after the specified server-side Timestamp.
	AT_TIMESTAMP

	// DefaultInitialPositionInStream The location in the shard from which the KinesisClientLibrary will start fetching records from
	// when the application starts for the first time and there is no checkpoint for the shard.
	DefaultInitialPositionInStream = LATEST

	// DefaultFailoverTimeMillis Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
	// will be regarded as having problems and it's shards will be assigned to other workers.
	// For applications that have a large number of shards, this may be set to a higher number to reduce
	// the number of DynamoDB IOPS required for tracking leases.
	DefaultFailoverTimeMillis = 10000

	// DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner.
	DefaultLeaseRefreshPeriodMillis = 5000

	// DefaultLeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
	DefaultLeaseRefreshWaitTime = 2500

	// DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call.
	DefaultMaxRecords = 10000

	// DefaultIdleTimeBetweenReadsMillis The default value for how long the {@link ShardConsumer}
	// should sleep if no records are returned from the call to
	DefaultIdleTimeBetweenReadsMillis = 1000

	// DefaultDontCallProcessRecordsForEmptyRecordList Don't call processRecords() on the record processor for empty record lists.
	DefaultDontCallProcessRecordsForEmptyRecordList = false

	// DefaultParentShardPollIntervalMillis Interval in milliseconds between polling to check for parent shard completion.
	// Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
	// completion of parent shards).
	DefaultParentShardPollIntervalMillis = 10000

	// DefaultShardSyncIntervalMillis Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
	DefaultShardSyncIntervalMillis = 60000

	// DefaultCleanupLeasesUponShardsCompletion Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
	// Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by
	// default we try to delete the ones we don't need any longer.
	DefaultCleanupLeasesUponShardsCompletion = true

	// DefaultTaskBackoffTimeMillis Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
	DefaultTaskBackoffTimeMillis = 500

	// DefaultValidateSequenceNumberBeforeCheckpointing KCL will validate client provided sequence numbers with a call to Amazon Kinesis before
	// checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)} by default.
	DefaultValidateSequenceNumberBeforeCheckpointing = true

	// DefaultMaxLeasesForWorker The max number of leases (shards) this worker should process.
	// This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
	// or during deployment.
	// NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the
	// stream due to the max limit.
	DefaultMaxLeasesForWorker = math.MaxInt16

	// DefaultMaxLeasesToStealAtOneTime Max leases to steal from another worker at one time (for load balancing).
	// Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
	// but can cause higher churn in the system.
	DefaultMaxLeasesToStealAtOneTime = 1

	// DefaultInitialLeaseTableReadCapacity The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
	DefaultInitialLeaseTableReadCapacity = 10

	// DefaultInitialLeaseTableWriteCapacity The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
	DefaultInitialLeaseTableWriteCapacity = 10

	// DefaultSkipShardSyncAtStartupIfLeasesExist The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This
	// assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g.
	// during incremental deployments of an application).
	DefaultSkipShardSyncAtStartupIfLeasesExist = false

	// DefaultShutdownGraceMillis The amount of milliseconds to wait before graceful shutdown forcefully terminates.
	DefaultShutdownGraceMillis = 5000

	// DefaultEnableLeaseStealing Lease stealing defaults to false for backwards compatibility.
	DefaultEnableLeaseStealing = false

	// DefaultLeaseStealingIntervalMillis Interval between rebalance tasks defaults to 5 seconds.
	DefaultLeaseStealingIntervalMillis = 5000

	// DefaultLeaseStealingClaimTimeoutMillis Number of milliseconds to wait before another worker can aquire a claimed shard
	DefaultLeaseStealingClaimTimeoutMillis = 120000

	// DefaultLeaseSyncingIntervalMillis Number of milliseconds to wait before syncing with lease table (dynamodDB)
	DefaultLeaseSyncingIntervalMillis = 60000

	// DefaultMaxRetryCount The default maximum number of retries in case of error
	DefaultMaxRetryCount = 5
)

Variables

This section is empty.

Functions

func InitalPositionInStreamToShardIteratorType

func InitalPositionInStreamToShardIteratorType(pos InitialPositionInStream) *string

Types

type InitialPositionInStream

type InitialPositionInStream int

InitialPositionInStream Used to specify the Position in the stream where a new application should start from This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents)

type InitialPositionInStreamExtended

type InitialPositionInStreamExtended struct {
	Position InitialPositionInStream

	// The time stamp of the data record from which to start reading. Used with
	// shard iterator type AT_TIMESTAMP. A time stamp is the Unix epoch date with
	// precision in milliseconds. For example, 2016-04-04T19:58:46.480-00:00 or
	// 1459799926.480. If a record with this exact time stamp does not exist, the
	// iterator returned is for the next (later) record. If the time stamp is older
	// than the current trim horizon, the iterator returned is for the oldest untrimmed
	// data record (TRIM_HORIZON).
	Timestamp *time.Time `type:"Timestamp" timestampFormat:"unix"`
}

InitialPositionInStreamExtended Class that houses the entities needed to specify the Position in the stream from where a new application should start.

type KinesisClientLibConfiguration

type KinesisClientLibConfiguration struct {
	// ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream.
	ApplicationName string

	// DynamoDBEndpoint is an optional endpoint URL that overrides the default generated endpoint for a DynamoDB client.
	// If this is empty, the default generated endpoint will be used.
	DynamoDBEndpoint string

	// KinesisEndpoint is an optional endpoint URL that overrides the default generated endpoint for a Kinesis client.
	// If this is empty, the default generated endpoint will be used.
	KinesisEndpoint string

	// KinesisCredentials is used to access Kinesis
	KinesisCredentials aws.CredentialsProvider

	// DynamoDBCredentials is used to access DynamoDB
	DynamoDBCredentials aws.CredentialsProvider

	// TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName
	TableName string

	// StreamName is the name of Kinesis stream
	StreamName string

	// EnableEnhancedFanOutConsumer enables enhanced fan-out consumer
	// See: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
	// Either consumer name or consumer ARN must be specified when Enhanced Fan-Out is enabled.
	EnableEnhancedFanOutConsumer bool

	// EnhancedFanOutConsumerName is the name of the enhanced fan-out consumer to create. If this isn't set the ApplicationName will be used.
	EnhancedFanOutConsumerName string

	// EnhancedFanOutConsumerARN is the ARN of an already created enhanced fan-out consumer, if this is set no automatic consumer creation will be attempted
	EnhancedFanOutConsumerARN string

	// WorkerID used to distinguish different workers/processes of a Kinesis application
	WorkerID string

	// InitialPositionInStream specifies the Position in the stream where a new application should start from
	InitialPositionInStream InitialPositionInStream

	// InitialPositionInStreamExtended provides actual AT_TIMESTAMP value
	InitialPositionInStreamExtended InitialPositionInStreamExtended

	// FailoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
	FailoverTimeMillis int

	// LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner.
	LeaseRefreshPeriodMillis int

	// LeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt
	LeaseRefreshWaitTime int

	// MaxRecords Max records to read per Kinesis getRecords() call
	MaxRecords int

	// IdleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
	IdleTimeBetweenReadsInMillis int

	// CallProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
	// GetRecords returned an empty record list.
	CallProcessRecordsEvenForEmptyRecordList bool

	// ParentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
	ParentShardPollIntervalMillis int

	// ShardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
	ShardSyncIntervalMillis int

	// CleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration)
	CleanupTerminatedShardsBeforeExpiry bool

	// TaskBackoffTimeMillis Backoff period when tasks encounter an exception
	TaskBackoffTimeMillis int

	// ValidateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
	ValidateSequenceNumberBeforeCheckpointing bool

	// RegionName The region name for the service
	RegionName string

	// ShutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully
	ShutdownGraceMillis int

	// Max leases this Worker can handle at a time
	MaxLeasesForWorker int

	// Max leases to steal at one time (for load balancing)
	MaxLeasesToStealAtOneTime int

	// Read capacity to provision when creating the lease table (dynamoDB).
	InitialLeaseTableReadCapacity int

	// Write capacity to provision when creating the lease table.
	InitialLeaseTableWriteCapacity int

	// Worker should skip syncing shards and leases at startup if leases are present
	// This is useful for optimizing deployments to large fleets working on a stable stream.
	SkipShardSyncAtWorkerInitializationIfLeasesExist bool

	// Logger used to log message.
	Logger logger.Logger

	// MonitoringService publishes per worker-scoped metrics.
	MonitoringService metrics.MonitoringService

	// EnableLeaseStealing turns on lease stealing
	EnableLeaseStealing bool

	// LeaseStealingIntervalMillis The number of milliseconds between rebalance tasks
	LeaseStealingIntervalMillis int

	// LeaseStealingClaimTimeoutMillis The number of milliseconds to wait before another worker can aquire a claimed shard
	LeaseStealingClaimTimeoutMillis int

	// LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB)
	LeaseSyncingTimeIntervalMillis int

	// MaxRetryCount The maximum number of retries in case of error
	MaxRetryCount int
}

KinesisClientLibConfiguration Configuration for the Kinesis Client Library. Note: There is no need to configure credential provider. Credential can be get from InstanceProfile.

func NewKinesisClientLibConfig

func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID string) *KinesisClientLibConfiguration

NewKinesisClientLibConfig creates a default KinesisClientLibConfiguration based on the required fields.

func NewKinesisClientLibConfigWithCredential

func NewKinesisClientLibConfigWithCredential(applicationName, streamName, regionName, workerID string,
	creds aws.CredentialsProvider) *KinesisClientLibConfiguration

NewKinesisClientLibConfigWithCredential creates a default KinesisClientLibConfiguration based on the required fields and unique credentials.

func NewKinesisClientLibConfigWithCredentials

func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID string,
	kinesisCreds, dynamodbCreds aws.CredentialsProvider) *KinesisClientLibConfiguration

NewKinesisClientLibConfigWithCredentials creates a default KinesisClientLibConfiguration based on the required fields and specific credentials for each service.

func (*KinesisClientLibConfiguration) WithCallProcessRecordsEvenForEmptyRecordList

func (c *KinesisClientLibConfiguration) WithCallProcessRecordsEvenForEmptyRecordList(callProcessRecordsEvenForEmptyRecordList bool) *KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithDynamoDBEndpoint

func (c *KinesisClientLibConfiguration) WithDynamoDBEndpoint(dynamoDBEndpoint string) *KinesisClientLibConfiguration

WithDynamoDBEndpoint is used to provide an alternative DynamoDB endpoint

func (*KinesisClientLibConfiguration) WithEnhancedFanOutConsumer

func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumer(enable bool) *KinesisClientLibConfiguration

WithEnhancedFanOutConsumer sets EnableEnhancedFanOutConsumer. If enhanced fan-out is enabled and ConsumerName is not specified ApplicationName is used as ConsumerName. For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html Note: You can register up to twenty consumers per stream to use enhanced fan-out.

func (*KinesisClientLibConfiguration) WithEnhancedFanOutConsumerARN

func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumerARN(consumerARN string) *KinesisClientLibConfiguration

WithEnhancedFanOutConsumerARN enables enhanced fan-out consumer with the specified consumer ARN For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html Note: You can register up to twenty consumers per stream to use enhanced fan-out.

func (*KinesisClientLibConfiguration) WithEnhancedFanOutConsumerName

func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumerName(consumerName string) *KinesisClientLibConfiguration

WithEnhancedFanOutConsumerName enables enhanced fan-out consumer with the specified name For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html Note: You can register up to twenty consumers per stream to use enhanced fan-out.

func (*KinesisClientLibConfiguration) WithFailoverTimeMillis

func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMillis int) *KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis

func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration

WithIdleTimeBetweenReadsInMillis Controls how long the KCL will sleep if no records are returned from Kinesis

<p> This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will immediately retrieve the next set of records after the call to {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(ProcessRecordsInput)} has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this value it's recommended that you enable {@link #withCallProcessRecordsEvenForEmptyRecordList(boolean)}, and monitor how far behind the records retrieved are by inspecting {@link com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput#getMillisBehindLatest()}, and the <a href= "http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html#kinesis-metrics-stream">CloudWatch Metric: GetRecords.MillisBehindLatest</a> </p>

@param IdleTimeBetweenReadsInMillis: how long to sleep between GetRecords calls when no records are returned. @return KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithInitialPositionInStream

func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithKinesisEndpoint

func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration

WithKinesisEndpoint is used to provide an alternative Kinesis endpoint

func (*KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis

func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefreshPeriodMillis int) *KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithLeaseRefreshWaitTime added in v0.0.5

func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithLeaseStealing

func (c *KinesisClientLibConfiguration) WithLeaseStealing(enableLeaseStealing bool) *KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithLeaseStealingIntervalMillis

func (c *KinesisClientLibConfiguration) WithLeaseStealingIntervalMillis(leaseStealingIntervalMillis int) *KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithLeaseSyncingIntervalMillis

func (c *KinesisClientLibConfiguration) WithLeaseSyncingIntervalMillis(leaseSyncingIntervalMillis int) *KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithLogger

func (*KinesisClientLibConfiguration) WithMaxLeasesForWorker

func (c *KinesisClientLibConfiguration) WithMaxLeasesForWorker(n int) *KinesisClientLibConfiguration

WithMaxLeasesForWorker configures maximum lease this worker can handles. It determines how maximun number of shards this worker can handle.

func (*KinesisClientLibConfiguration) WithMaxRecords

func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithMaxRetryCount added in v0.0.5

func (c *KinesisClientLibConfiguration) WithMaxRetryCount(maxRetryCount int) *KinesisClientLibConfiguration

WithMaxRetryCount sets the max retry count in case of error.

func (*KinesisClientLibConfiguration) WithMonitoringService

WithMonitoringService sets the monitoring service to use to publish metrics.

func (*KinesisClientLibConfiguration) WithShardSyncIntervalMillis

func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithTableName

WithTableName to provide alternative lease table in DynamoDB

func (*KinesisClientLibConfiguration) WithTaskBackoffTimeMillis

func (c *KinesisClientLibConfiguration) WithTaskBackoffTimeMillis(taskBackoffTimeMillis int) *KinesisClientLibConfiguration

func (*KinesisClientLibConfiguration) WithTimestampAtInitialPositionInStream

func (c *KinesisClientLibConfiguration) WithTimestampAtInitialPositionInStream(timestamp *time.Time) *KinesisClientLibConfiguration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL