Documentation ¶
Overview ¶
Package kinesumer is a generated GoMock package.
Index ¶
- Variables
- type CommitConfig
- type Config
- type Kinesumer
- func (k *Kinesumer) Close()
- func (k *Kinesumer) Commit()
- func (k *Kinesumer) Consume(ctx context.Context, streams []string) (<-chan *Record, error)
- func (k *Kinesumer) Errors() <-chan error
- func (k *Kinesumer) MarkRecord(record *Record)
- func (k *Kinesumer) Refresh(ctx context.Context, streams []string)
- type MockStateStore
- func (m *MockStateStore) DeregisterClient(ctx context.Context, clientID string) error
- func (m *MockStateStore) EXPECT() *MockStateStoreMockRecorder
- func (m *MockStateStore) GetShards(ctx context.Context, stream string) (Shards, error)
- func (m *MockStateStore) ListAllAliveClientIDs(ctx context.Context) ([]string, error)
- func (m *MockStateStore) ListCheckPoints(ctx context.Context, stream string, shardIDs []string) (map[string]string, error)
- func (m *MockStateStore) PingClientAliveness(ctx context.Context, clientID string) error
- func (m *MockStateStore) PruneClients(ctx context.Context) error
- func (m *MockStateStore) RegisterClient(ctx context.Context, clientID string) error
- func (m *MockStateStore) UpdateCheckPoints(ctx context.Context, checkpoints []*ShardCheckPoint) error
- func (m *MockStateStore) UpdateShards(ctx context.Context, stream string, shards Shards) error
- type MockStateStoreMockRecorder
- func (mr *MockStateStoreMockRecorder) DeregisterClient(ctx, clientID interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) GetShards(ctx, stream interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) ListAllAliveClientIDs(ctx interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) ListCheckPoints(ctx, stream, shardIDs interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) PingClientAliveness(ctx, clientID interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) PruneClients(ctx interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) RegisterClient(ctx, clientID interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) UpdateCheckPoints(ctx, checkpoints interface{}) *gomock.Call
- func (mr *MockStateStoreMockRecorder) UpdateShards(ctx, stream, shards interface{}) *gomock.Call
- type Record
- type Shard
- type ShardCheckPoint
- type Shards
- type StateStore
Constants ¶
This section is empty.
Variables ¶
var ( ErrEmptySequenceNumber = errors.New("kinesumer: sequence number can't be empty") ErrInvalidStream = errors.New("kinesumer: invalid stream") )
Error codes.
var ( ErrNoShardCache = errors.New("kinesumer: shard cache not found") ErrEmptyShardIDs = errors.New("kinesumer: empty shard ids given") )
Error codes.
Functions ¶
This section is empty.
Types ¶
type CommitConfig ¶
type CommitConfig struct { // Whether to auto-commit updated sequence number. (default is true) Auto bool // How frequently to commit updated sequence numbers. (default is 5s) Interval time.Duration // A Timeout config for commit per stream. (default is 2s) Timeout time.Duration }
CommitConfig holds options for how to offset handled.
func NewDefaultCommitConfig ¶
func NewDefaultCommitConfig() *CommitConfig
NewDefaultCommitConfig returns a new default offset management configuration.
type Config ¶
type Config struct { App string // Application name. Region string // Region name. (optional) ClientID string // Consumer group client id. (optional) // Kinesis configs. KinesisRegion string KinesisEndpoint string // Only for local server. // If you want to consume messages from Kinesis in a different account, // you need to set up the IAM role to access to target account, and pass the role arn here. // Reference: https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html. RoleARN string // State store configs. StateStore *StateStore DynamoDBRegion string DynamoDBTable string DynamoDBEndpoint string // Only for local server. // These configs are not used in EFO mode. ScanLimit int32 ScanTimeout time.Duration ScanInterval time.Duration EFOMode bool // On/off the Enhanced Fan-Out feature. // This config is used for how to manage sequence number. Commit *CommitConfig }
Config defines configs for the Kinesumer client.
type Kinesumer ¶
type Kinesumer struct {
// contains filtered or unexported fields
}
Kinesumer implements auto re-balancing consumer group for Kinesis. TODO(mingrammer): export prometheus metrics.
func NewKinesumer ¶
NewKinesumer initializes and returns a new Kinesumer client.
func (*Kinesumer) Commit ¶
func (k *Kinesumer) Commit()
Commit updates check point using current checkpoints.
func (*Kinesumer) MarkRecord ¶
MarkRecord marks the provided record as consumed.
type MockStateStore ¶
type MockStateStore struct {
// contains filtered or unexported fields
}
MockStateStore is a mock of StateStore interface.
func NewMockStateStore ¶
func NewMockStateStore(ctrl *gomock.Controller) *MockStateStore
NewMockStateStore creates a new mock instance.
func (*MockStateStore) DeregisterClient ¶
func (m *MockStateStore) DeregisterClient(ctx context.Context, clientID string) error
DeregisterClient mocks base method.
func (*MockStateStore) EXPECT ¶
func (m *MockStateStore) EXPECT() *MockStateStoreMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockStateStore) ListAllAliveClientIDs ¶
func (m *MockStateStore) ListAllAliveClientIDs(ctx context.Context) ([]string, error)
ListAllAliveClientIDs mocks base method.
func (*MockStateStore) ListCheckPoints ¶
func (m *MockStateStore) ListCheckPoints(ctx context.Context, stream string, shardIDs []string) (map[string]string, error)
ListCheckPoints mocks base method.
func (*MockStateStore) PingClientAliveness ¶
func (m *MockStateStore) PingClientAliveness(ctx context.Context, clientID string) error
PingClientAliveness mocks base method.
func (*MockStateStore) PruneClients ¶
func (m *MockStateStore) PruneClients(ctx context.Context) error
PruneClients mocks base method.
func (*MockStateStore) RegisterClient ¶
func (m *MockStateStore) RegisterClient(ctx context.Context, clientID string) error
RegisterClient mocks base method.
func (*MockStateStore) UpdateCheckPoints ¶
func (m *MockStateStore) UpdateCheckPoints(ctx context.Context, checkpoints []*ShardCheckPoint) error
UpdateCheckPoints mocks base method.
func (*MockStateStore) UpdateShards ¶
UpdateShards mocks base method.
type MockStateStoreMockRecorder ¶
type MockStateStoreMockRecorder struct {
// contains filtered or unexported fields
}
MockStateStoreMockRecorder is the mock recorder for MockStateStore.
func (*MockStateStoreMockRecorder) DeregisterClient ¶
func (mr *MockStateStoreMockRecorder) DeregisterClient(ctx, clientID interface{}) *gomock.Call
DeregisterClient indicates an expected call of DeregisterClient.
func (*MockStateStoreMockRecorder) GetShards ¶
func (mr *MockStateStoreMockRecorder) GetShards(ctx, stream interface{}) *gomock.Call
GetShards indicates an expected call of GetShards.
func (*MockStateStoreMockRecorder) ListAllAliveClientIDs ¶
func (mr *MockStateStoreMockRecorder) ListAllAliveClientIDs(ctx interface{}) *gomock.Call
ListAllAliveClientIDs indicates an expected call of ListAllAliveClientIDs.
func (*MockStateStoreMockRecorder) ListCheckPoints ¶
func (mr *MockStateStoreMockRecorder) ListCheckPoints(ctx, stream, shardIDs interface{}) *gomock.Call
ListCheckPoints indicates an expected call of ListCheckPoints.
func (*MockStateStoreMockRecorder) PingClientAliveness ¶
func (mr *MockStateStoreMockRecorder) PingClientAliveness(ctx, clientID interface{}) *gomock.Call
PingClientAliveness indicates an expected call of PingClientAliveness.
func (*MockStateStoreMockRecorder) PruneClients ¶
func (mr *MockStateStoreMockRecorder) PruneClients(ctx interface{}) *gomock.Call
PruneClients indicates an expected call of PruneClients.
func (*MockStateStoreMockRecorder) RegisterClient ¶
func (mr *MockStateStoreMockRecorder) RegisterClient(ctx, clientID interface{}) *gomock.Call
RegisterClient indicates an expected call of RegisterClient.
func (*MockStateStoreMockRecorder) UpdateCheckPoints ¶
func (mr *MockStateStoreMockRecorder) UpdateCheckPoints(ctx, checkpoints interface{}) *gomock.Call
UpdateCheckPoints indicates an expected call of UpdateCheckPoints.
func (*MockStateStoreMockRecorder) UpdateShards ¶
func (mr *MockStateStoreMockRecorder) UpdateShards(ctx, stream, shards interface{}) *gomock.Call
UpdateShards indicates an expected call of UpdateShards.
type ShardCheckPoint ¶
type ShardCheckPoint struct { Stream string ShardID string SequenceNumber string UpdatedAt time.Time }
ShardCheckPoint manages a shard check point.
type StateStore ¶
type StateStore interface { GetShards(ctx context.Context, stream string) (Shards, error) UpdateShards(ctx context.Context, stream string, shards Shards) error ListAllAliveClientIDs(ctx context.Context) ([]string, error) RegisterClient(ctx context.Context, clientID string) error DeregisterClient(ctx context.Context, clientID string) error PingClientAliveness(ctx context.Context, clientID string) error PruneClients(ctx context.Context) error ListCheckPoints(ctx context.Context, stream string, shardIDs []string) (map[string]string, error) UpdateCheckPoints(ctx context.Context, checkpoints []*ShardCheckPoint) error }
StateStore is a distributed key-value store for managing states.