Documentation ¶
Index ¶
- Constants
- func InitalPositionInStreamToShardIteratorType(pos InitialPositionInStream) *string
- type InitialPositionInStream
- type InitialPositionInStreamExtended
- type KinesisClientLibConfiguration
- func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID string) *KinesisClientLibConfiguration
- func NewKinesisClientLibConfigWithCredential(applicationName, streamName, regionName, workerID string, ...) *KinesisClientLibConfiguration
- func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID string, ...) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithCallProcessRecordsEvenForEmptyRecordList(callProcessRecordsEvenForEmptyRecordList bool) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithDynamoDBEndpoint(dynamoDBEndpoint string) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumer(enable bool) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumerARN(consumerARN string) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumerName(consumerName string) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMillis int) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefreshPeriodMillis int) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithLeaseStealing(enableLeaseStealing bool) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithLeaseStealingIntervalMillis(leaseStealingIntervalMillis int) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithLeaseSyncingIntervalMillis(leaseSyncingIntervalMillis int) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithMaxLeasesForWorker(n int) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.MonitoringService) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithTaskBackoffTimeMillis(taskBackoffTimeMillis int) *KinesisClientLibConfiguration
- func (c *KinesisClientLibConfiguration) WithTimestampAtInitialPositionInStream(timestamp *time.Time) *KinesisClientLibConfiguration
Constants ¶
const ( // LATEST start after the most recent data record (fetch new data). LATEST InitialPositionInStream = iota + 1 // TRIM_HORIZON start from the oldest available data record TRIM_HORIZON // AT_TIMESTAMP start from the record at or after the specified server-side Timestamp. AT_TIMESTAMP // DefaultInitialPositionInStream The location in the shard from which the KinesisClientLibrary will start fetching records from // when the application starts for the first time and there is no checkpoint for the shard. DefaultInitialPositionInStream = LATEST // DefaultFailoverTimeMillis Fail over time in milliseconds. A worker which does not renew it's lease within this time interval // will be regarded as having problems and it's shards will be assigned to other workers. // For applications that have a large number of shards, this may be set to a higher number to reduce // the number of DynamoDB IOPS required for tracking leases. DefaultFailoverTimeMillis = 10000 // DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner. DefaultLeaseRefreshPeriodMillis = 5000 // DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call. DefaultMaxRecords = 10000 // DefaultIdleTimeBetweenReadsMillis The default value for how long the {@link ShardConsumer} // should sleep if no records are returned from the call to DefaultIdleTimeBetweenReadsMillis = 1000 // DefaultDontCallProcessRecordsForEmptyRecordList Don't call processRecords() on the record processor for empty record lists. DefaultDontCallProcessRecordsForEmptyRecordList = false // DefaultParentShardPollIntervalMillis Interval in milliseconds between polling to check for parent shard completion. // Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on // completion of parent shards). DefaultParentShardPollIntervalMillis = 10000 // DefaultShardSyncIntervalMillis Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. DefaultShardSyncIntervalMillis = 60000 // DefaultCleanupLeasesUponShardsCompletion Cleanup leases upon shards completion (don't wait until they expire in Kinesis). // Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by // default we try to delete the ones we don't need any longer. DefaultCleanupLeasesUponShardsCompletion = true // DefaultTaskBackoffTimeMillis Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). DefaultTaskBackoffTimeMillis = 500 // DefaultValidateSequenceNumberBeforeCheckpointing KCL will validate client provided sequence numbers with a call to Amazon Kinesis before // checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)} by default. DefaultValidateSequenceNumberBeforeCheckpointing = true // DefaultMaxLeasesForWorker The max number of leases (shards) this worker should process. // This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints // or during deployment. // NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the // stream due to the max limit. DefaultMaxLeasesForWorker = math.MaxInt16 // DefaultMaxLeasesToStealAtOneTime Max leases to steal from another worker at one time (for load balancing). // Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts), // but can cause higher churn in the system. DefaultMaxLeasesToStealAtOneTime = 1 // DefaultInitialLeaseTableReadCapacity The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity. DefaultInitialLeaseTableReadCapacity = 10 // DefaultInitialLeaseTableWriteCapacity The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity. DefaultInitialLeaseTableWriteCapacity = 10 // DefaultSkipShardSyncAtStartupIfLeasesExist The Worker will skip shard sync during initialization if there are one or more leases in the lease table. This // assumes that the shards and leases are in-sync. This enables customers to choose faster startup times (e.g. // during incremental deployments of an application). DefaultSkipShardSyncAtStartupIfLeasesExist = false // DefaultShutdownGraceMillis The amount of milliseconds to wait before graceful shutdown forcefully terminates. DefaultShutdownGraceMillis = 5000 // DefaultEnableLeaseStealing Lease stealing defaults to false for backwards compatibility. DefaultEnableLeaseStealing = false // DefaultLeaseStealingIntervalMillis Interval between rebalance tasks defaults to 5 seconds. DefaultLeaseStealingIntervalMillis = 5000 // DefaultLeaseStealingClaimTimeoutMillis Number of milliseconds to wait before another worker can aquire a claimed shard DefaultLeaseStealingClaimTimeoutMillis = 120000 // DefaultLeaseSyncingIntervalMillis Number of milliseconds to wait before syncing with lease table (dynamodDB) DefaultLeaseSyncingIntervalMillis = 60000 )
Variables ¶
This section is empty.
Functions ¶
func InitalPositionInStreamToShardIteratorType ¶
func InitalPositionInStreamToShardIteratorType(pos InitialPositionInStream) *string
Types ¶
type InitialPositionInStream ¶
type InitialPositionInStream int
InitialPositionInStream Used to specify the Position in the stream where a new application should start from This is used during initial application bootstrap (when a checkpoint doesn't exist for a shard or its parents)
type InitialPositionInStreamExtended ¶
type InitialPositionInStreamExtended struct { Position InitialPositionInStream // The time stamp of the data record from which to start reading. Used with // shard iterator type AT_TIMESTAMP. A time stamp is the Unix epoch date with // precision in milliseconds. For example, 2016-04-04T19:58:46.480-00:00 or // 1459799926.480. If a record with this exact time stamp does not exist, the // iterator returned is for the next (later) record. If the time stamp is older // than the current trim horizon, the iterator returned is for the oldest untrimmed // data record (TRIM_HORIZON). Timestamp *time.Time `type:"Timestamp" timestampFormat:"unix"` }
InitialPositionInStreamExtended Class that houses the entities needed to specify the Position in the stream from where a new application should start.
type KinesisClientLibConfiguration ¶
type KinesisClientLibConfiguration struct { // ApplicationName is name of application. Kinesis allows multiple applications to consume the same stream. ApplicationName string // DynamoDBEndpoint is an optional endpoint URL that overrides the default generated endpoint for a DynamoDB client. // If this is empty, the default generated endpoint will be used. DynamoDBEndpoint string // KinesisEndpoint is an optional endpoint URL that overrides the default generated endpoint for a Kinesis client. // If this is empty, the default generated endpoint will be used. KinesisEndpoint string // KinesisCredentials is used to access Kinesis KinesisCredentials aws.CredentialsProvider // DynamoDBCredentials is used to access DynamoDB DynamoDBCredentials aws.CredentialsProvider // TableName is name of the dynamo db table for managing kinesis stream default to ApplicationName TableName string // StreamName is the name of Kinesis stream StreamName string // EnableEnhancedFanOutConsumer enables enhanced fan-out consumer // See: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html // Either consumer name or consumer ARN must be specified when Enhanced Fan-Out is enabled. EnableEnhancedFanOutConsumer bool // EnhancedFanOutConsumerName is the name of the enhanced fan-out consumer to create. If this isn't set the ApplicationName will be used. EnhancedFanOutConsumerName string // EnhancedFanOutConsumerARN is the ARN of an already created enhanced fan-out consumer, if this is set no automatic consumer creation will be attempted EnhancedFanOutConsumerARN string // WorkerID used to distinguish different workers/processes of a Kinesis application WorkerID string // InitialPositionInStream specifies the Position in the stream where a new application should start from InitialPositionInStream InitialPositionInStream // InitialPositionInStreamExtended provides actual AT_TIMESTAMP value InitialPositionInStreamExtended InitialPositionInStreamExtended // FailoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) FailoverTimeMillis int // LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner. LeaseRefreshPeriodMillis int // MaxRecords Max records to read per Kinesis getRecords() call MaxRecords int // IdleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis IdleTimeBetweenReadsInMillis int // CallProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if // GetRecords returned an empty record list. CallProcessRecordsEvenForEmptyRecordList bool // ParentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done ParentShardPollIntervalMillis int // ShardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards ShardSyncIntervalMillis int // CleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration) CleanupTerminatedShardsBeforeExpiry bool // TaskBackoffTimeMillis Backoff period when tasks encounter an exception TaskBackoffTimeMillis int // ValidateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers ValidateSequenceNumberBeforeCheckpointing bool // RegionName The region name for the service RegionName string // ShutdownGraceMillis The number of milliseconds before graceful shutdown terminates forcefully ShutdownGraceMillis int // Max leases this Worker can handle at a time MaxLeasesForWorker int // Max leases to steal at one time (for load balancing) MaxLeasesToStealAtOneTime int // Read capacity to provision when creating the lease table (dynamoDB). InitialLeaseTableReadCapacity int // Write capacity to provision when creating the lease table. InitialLeaseTableWriteCapacity int // Worker should skip syncing shards and leases at startup if leases are present // This is useful for optimizing deployments to large fleets working on a stable stream. SkipShardSyncAtWorkerInitializationIfLeasesExist bool // Logger used to log message. Logger logger.Logger // MonitoringService publishes per worker-scoped metrics. MonitoringService metrics.MonitoringService // EnableLeaseStealing turns on lease stealing EnableLeaseStealing bool // LeaseStealingIntervalMillis The number of milliseconds between rebalance tasks LeaseStealingIntervalMillis int // LeaseStealingClaimTimeoutMillis The number of milliseconds to wait before another worker can aquire a claimed shard LeaseStealingClaimTimeoutMillis int // LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB) LeaseSyncingTimeIntervalMillis int }
KinesisClientLibConfiguration Configuration for the Kinesis Client Library. Note: There is no need to configure credential provider. Credential can be get from InstanceProfile.
func NewKinesisClientLibConfig ¶
func NewKinesisClientLibConfig(applicationName, streamName, regionName, workerID string) *KinesisClientLibConfiguration
NewKinesisClientLibConfig creates a default KinesisClientLibConfiguration based on the required fields.
func NewKinesisClientLibConfigWithCredential ¶
func NewKinesisClientLibConfigWithCredential(applicationName, streamName, regionName, workerID string, creds aws.CredentialsProvider) *KinesisClientLibConfiguration
NewKinesisClientLibConfigWithCredential creates a default KinesisClientLibConfiguration based on the required fields and unique credentials.
func NewKinesisClientLibConfigWithCredentials ¶
func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regionName, workerID string, kinesisCreds, dynamodbCreds aws.CredentialsProvider) *KinesisClientLibConfiguration
NewKinesisClientLibConfigWithCredentials creates a default KinesisClientLibConfiguration based on the required fields and specific credentials for each service.
func (*KinesisClientLibConfiguration) WithCallProcessRecordsEvenForEmptyRecordList ¶
func (c *KinesisClientLibConfiguration) WithCallProcessRecordsEvenForEmptyRecordList(callProcessRecordsEvenForEmptyRecordList bool) *KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithDynamoDBEndpoint ¶
func (c *KinesisClientLibConfiguration) WithDynamoDBEndpoint(dynamoDBEndpoint string) *KinesisClientLibConfiguration
WithDynamoDBEndpoint is used to provide an alternative DynamoDB endpoint
func (*KinesisClientLibConfiguration) WithEnhancedFanOutConsumer ¶
func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumer(enable bool) *KinesisClientLibConfiguration
WithEnhancedFanOutConsumer sets EnableEnhancedFanOutConsumer. If enhanced fan-out is enabled and ConsumerName is not specified ApplicationName is used as ConsumerName. For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html Note: You can register up to twenty consumers per stream to use enhanced fan-out.
func (*KinesisClientLibConfiguration) WithEnhancedFanOutConsumerARN ¶
func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumerARN(consumerARN string) *KinesisClientLibConfiguration
WithEnhancedFanOutConsumerARN enables enhanced fan-out consumer with the specified consumer ARN For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html Note: You can register up to twenty consumers per stream to use enhanced fan-out.
func (*KinesisClientLibConfiguration) WithEnhancedFanOutConsumerName ¶
func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumerName(consumerName string) *KinesisClientLibConfiguration
WithEnhancedFanOutConsumerName enables enhanced fan-out consumer with the specified name For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html Note: You can register up to twenty consumers per stream to use enhanced fan-out.
func (*KinesisClientLibConfiguration) WithFailoverTimeMillis ¶
func (c *KinesisClientLibConfiguration) WithFailoverTimeMillis(failoverTimeMillis int) *KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis ¶
func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration
WithIdleTimeBetweenReadsInMillis Controls how long the KCL will sleep if no records are returned from Kinesis
<p> This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will immediately retrieve the next set of records after the call to {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(ProcessRecordsInput)} has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this value it's recommended that you enable {@link #withCallProcessRecordsEvenForEmptyRecordList(boolean)}, and monitor how far behind the records retrieved are by inspecting {@link com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput#getMillisBehindLatest()}, and the <a href= "http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html#kinesis-metrics-stream">CloudWatch Metric: GetRecords.MillisBehindLatest</a> </p>
@param IdleTimeBetweenReadsInMillis: how long to sleep between GetRecords calls when no records are returned. @return KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithInitialPositionInStream ¶
func (c *KinesisClientLibConfiguration) WithInitialPositionInStream(initialPositionInStream InitialPositionInStream) *KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithKinesisEndpoint ¶
func (c *KinesisClientLibConfiguration) WithKinesisEndpoint(kinesisEndpoint string) *KinesisClientLibConfiguration
WithKinesisEndpoint is used to provide an alternative Kinesis endpoint
func (*KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis ¶
func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefreshPeriodMillis int) *KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithLeaseStealing ¶
func (c *KinesisClientLibConfiguration) WithLeaseStealing(enableLeaseStealing bool) *KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithLeaseStealingIntervalMillis ¶
func (c *KinesisClientLibConfiguration) WithLeaseStealingIntervalMillis(leaseStealingIntervalMillis int) *KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithLeaseSyncingIntervalMillis ¶
func (c *KinesisClientLibConfiguration) WithLeaseSyncingIntervalMillis(leaseSyncingIntervalMillis int) *KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithLogger ¶
func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithMaxLeasesForWorker ¶
func (c *KinesisClientLibConfiguration) WithMaxLeasesForWorker(n int) *KinesisClientLibConfiguration
WithMaxLeasesForWorker configures maximum lease this worker can handles. It determines how maximun number of shards this worker can handle.
func (*KinesisClientLibConfiguration) WithMaxRecords ¶
func (c *KinesisClientLibConfiguration) WithMaxRecords(maxRecords int) *KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithMonitoringService ¶
func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.MonitoringService) *KinesisClientLibConfiguration
WithMonitoringService sets the monitoring service to use to publish metrics.
func (*KinesisClientLibConfiguration) WithShardSyncIntervalMillis ¶
func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithTableName ¶
func (c *KinesisClientLibConfiguration) WithTableName(tableName string) *KinesisClientLibConfiguration
WithTableName to provide alternative lease table in DynamoDB
func (*KinesisClientLibConfiguration) WithTaskBackoffTimeMillis ¶
func (c *KinesisClientLibConfiguration) WithTaskBackoffTimeMillis(taskBackoffTimeMillis int) *KinesisClientLibConfiguration
func (*KinesisClientLibConfiguration) WithTimestampAtInitialPositionInStream ¶
func (c *KinesisClientLibConfiguration) WithTimestampAtInitialPositionInStream(timestamp *time.Time) *KinesisClientLibConfiguration