Documentation ¶
Overview ¶
Package interfaces The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
- Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. *
- Licensed under the Amazon Software License (the "License").
- You may not use this file except in compliance with the License.
- A copy of the License is located at *
- http://aws.amazon.com/asl/ *
- or in the "license" file accompanying this file. This file is distributed
- on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- express or implied. See the License for the specific language governing
- permissions and limitations under the License.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ShutdownReasonMessage ¶
func ShutdownReasonMessage(reason ShutdownReason) *string
Types ¶
type ExtendedSequenceNumber ¶
ExtendedSequenceNumber represents a two-part sequence number for records aggregated by the Kinesis Producer Library.
The KPL combines multiple user records into a single Kinesis record. Each user record therefore has an integer sub-sequence number, in addition to the regular sequence number of the Kinesis record. The sub-sequence number is used to checkpoint within an aggregated record.
type IPreparedCheckpointer ¶
type IPreparedCheckpointer interface { GetPendingCheckpoint() *ExtendedSequenceNumber // Checkpoint /* * This method will record a pending checkpoint. * * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently. * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. * @error ShutdownError The record processor instance has been shutdown. Another instance may have * started processing some of these records already. * The application should abort processing via this RecordProcessor instance. * @error InvalidStateError Can't store checkpoint. * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can * backoff and retry. * @error IllegalArgumentError The sequence number being checkpointed is invalid because it is out of range, * i.e. it is smaller than the last check point value (prepared or committed), or larger than the greatest * sequence number seen by the associated record processor. */ Checkpoint() error }
type IRecordProcessor ¶
type IRecordProcessor interface { // Initialize /* * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance * (via processRecords). * * @param initializationInput Provides information related to initialization */ Initialize(initializationInput *InitializationInput) // ProcessRecords /* * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the * application. * Upon fail over, the new instance will get records with sequence number > checkpoint position * for each partition key. * * @param processRecordsInput Provides the records to be processed as well as information and capabilities related * to them (eg checkpointing). */ ProcessRecords(processRecordsInput *ProcessRecordsInput) // ResetShardIterator /* * If true, resets the shard iterator to the last checkpointed sequence number for the shard. */ ResetShardIterator() bool // Shutdown /* * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this * RecordProcessor instance. * * <h2><b>Warning</b></h2> * * When the value of {@link ShutdownInput#getShutdownReason()} is * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you * checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. * * @param shutdownInput * Provides information and capabilities (eg checkpointing) related to shutdown of this record processor. */ Shutdown(shutdownInput *ShutdownInput) }
IRecordProcessor is the interface for some callback functions invoked by KCL will The main task of using KCL is to provide implementation on IRecordProcessor interface. Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2
type IRecordProcessorCheckpointer ¶
type IRecordProcessorCheckpointer interface { // Checkpoint /* * This method will checkpoint the progress at the provided sequenceNumber. This method is analogous to * {@link #checkpoint()} but provides the ability to specify the sequence number at which to * checkpoint. * * @param sequenceNumber A sequence number at which to checkpoint in this shard. Upon failover, * the Kinesis Client Library will start fetching records after this sequence number. * @error ThrottlingError Can't store checkpoint. Can be caused by checkpointing too frequently. * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. * @error ShutdownError The record processor instance has been shutdown. Another instance may have * started processing some of these records already. * The application should abort processing via this RecordProcessor instance. * @error InvalidStateError Can't store checkpoint. * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). * @error KinesisClientLibDependencyError Encountered an issue when storing the checkpoint. The application can * backoff and retry. * @error IllegalArgumentError The sequence number is invalid for one of the following reasons: * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the * greatest sequence number seen by the associated record processor. * 2.) It is not a valid sequence number for a record in this shard. */ Checkpoint(sequenceNumber *string) error // PrepareCheckpoint /** * This method will record a pending checkpoint at the provided sequenceNumber. * * @param sequenceNumber A sequence number at which to prepare checkpoint in this shard. * @return an IPreparedCheckpointer object that can be called later to persist the checkpoint. * * @error ThrottlingError Can't store pending checkpoint. Can be caused by checkpointing too frequently. * Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency. * @error ShutdownError The record processor instance has been shutdown. Another instance may have * started processing some of these records already. * The application should abort processing via this RecordProcessor instance. * @error InvalidStateError Can't store pending checkpoint. * Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist). * @error KinesisClientLibDependencyError Encountered an issue when storing the pending checkpoint. The * application can backoff and retry. * @error IllegalArgumentError The sequence number is invalid for one of the following reasons: * 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the * greatest sequence number seen by the associated record processor. * 2.) It is not a valid sequence number for a record in this shard. */ PrepareCheckpoint(sequenceNumber *string) (IPreparedCheckpointer, error) }
IRecordProcessorCheckpointer
- Used by RecordProcessors when they want to checkpoint their progress.
- The Kinesis Client Library will pass an object implementing this interface to RecordProcessors, so they can
- checkpoint their progress.
type IRecordProcessorFactory ¶
type IRecordProcessorFactory interface { // CreateProcessor /* * Returns a record processor to be used for processing data records for a (assigned) shard. * * @return Returns a processor object. */ CreateProcessor() IRecordProcessor }
IRecordProcessorFactory is interface for creating IRecordProcessor. Each Worker can have multiple threads for processing shard. Client can choose either creating one processor per shard or sharing them.
type InitializationInput ¶
type InitializationInput struct { // The shardId that the record processor is being initialized for. ShardId string // The last extended sequence number that was successfully checkpointed by the previous record processor. ExtendedSequenceNumber *ExtendedSequenceNumber }
Containers for the parameters to the IRecordProcessor
type ProcessRecordsInput ¶
type ProcessRecordsInput struct { // The time that this batch of records was received by the KCL. CacheEntryTime *time.Time // The time that this batch of records was prepared to be provided to the RecordProcessor. CacheExitTime *time.Time // The records received from Kinesis. These records may have been de-aggregated if they were published by the KPL. Records []types.Record // A checkpointer that the RecordProcessor can use to checkpoint its progress. Checkpointer IRecordProcessorCheckpointer // How far behind this batch of records was when received from Kinesis. MillisBehindLatest int64 }
Containers for the parameters to the IRecordProcessor
type ShutdownInput ¶
type ShutdownInput struct { // ShutdownReason shows why RecordProcessor is going to be shutdown. ShutdownReason ShutdownReason // Checkpointer is used to record the current progress. Checkpointer IRecordProcessorCheckpointer }
Containers for the parameters to the IRecordProcessor
type ShutdownReason ¶
type ShutdownReason int
* Reason the RecordProcessor is being shutdown. * Used to distinguish between a fail-over vs. a termination (shard is closed and all records have been delivered). * In case of a fail-over, applications should NOT checkpoint as part of shutdown, * since another record processor may have already started processing records for that shard. * In case of termination (resharding use case), applications SHOULD keep checkpointing their progress to indicate * that they have successfully processed all the records (processing of child shards can then begin).
const ( /* * REQUESTED Indicates that the entire application is being shutdown, and if desired the record processor will be given a * final chance to checkpoint. This state will not trigger a direct call to * {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#shutdown(ShutdownInput)}, but * instead depend on a different interface for backward compatibility. */ REQUESTED ShutdownReason = iota + 1 /* * Terminate processing for this RecordProcessor (resharding use case). * Indicates that the shard is closed and all records from the shard have been delivered to the application. * Applications SHOULD checkpoint their progress to indicate that they have successfully processed all records * from this shard and processing of child shards can be started. */ TERMINATE /* * Processing will be moved to a different record processor (fail over, load balancing use cases). * Applications SHOULD NOT checkpoint their progress (as another record processor may have already started * processing data). */ ZOMBIE )