Documentation
¶
Index ¶
- Constants
- Variables
- type API
- type Actor
- type Client
- func (m *Client) CommitRecord(ctx context.Context, record *metamorphosisv1.Record) error
- func (m *Client) CurrentReservation() *Reservation
- func (m *Client) FetchRecord(ctx context.Context) (*metamorphosisv1.Record, error)
- func (m *Client) FetchRecords(ctx context.Context, max int32) ([]*metamorphosisv1.Record, error)
- func (c *Client) Init(ctx context.Context) error
- func (m *Client) IsReserved(reservations []Reservation, shard ktypes.Shard) bool
- func (m *Client) ListReservations(ctx context.Context) ([]Reservation, error)
- func (m *Client) PutRecords(ctx context.Context, req *PutRecordsRequest) error
- func (m *Client) ReleaseReservation(ctx context.Context) error
- func (m *Client) RenewReservation(ctx context.Context) error
- func (c *Client) ReserveShard(ctx context.Context) error
- type ClientContextKey
- type Config
- type DynamoDBAPI
- type KinesisAPI
- type LoggerContextKey
- type Manager
- func (m *Manager) AddActorID(id string)
- func (m *Manager) CheckForAvailableShards(ctx context.Context) ([]types.Shard, error)
- func (m *Manager) DecrementActorCount()
- func (m *Manager) IncrementActorCount()
- func (m *Manager) RefreshActorLoop(ctx context.Context) error
- func (m *Manager) RemoveActorID(id string)
- func (m *Manager) Start(ctx context.Context) error
- type Option
- func WithBatchSize(size int32) Option
- func WithDynamoClient(client DynamoDBAPI) Option
- func WithGroup(id string) Option
- func WithKinesisClient(client KinesisAPI) Option
- func WithLogger(l *slog.Logger) Option
- func WithManagerLoopWaitTime(d time.Duration) Option
- func WithMaxActorCount(actors int) Option
- func WithRecordProcessor(p RecordProcessor) Option
- func WithRenewTime(d time.Duration) Option
- func WithReservationTableName(name string) Option
- func WithReservationTimeout(d time.Duration) Option
- func WithSeed(seed int) Option
- func WithShardCacheDuration(d time.Duration) Option
- func WithShardID(id string) Option
- func WithStreamArn(arn string) Option
- func WithWorkerID(id string) Option
- func WithWorkerPrefix(id string) Option
- type PutRecordsRequest
- type RecordProcessor
- type Reservation
Constants ¶
View Source
const ( GroupIDKey = "groupID" ShardIDKey = "shardID" WorkerIDKey = "workerID" ExpiresAtKey = "expiresAt" LatestSequenceKey = "latestSequence" )
Variables ¶
View Source
var ( ErrNotFound = errors.New("reservation missing") ErrShardReserved = errors.New("shard is already reserved") ErrAllShardsReserved = errors.New("all shards are reserved") Now = time.Now )
View Source
var ( ErrMissingReservation = errors.New("missing reservation") ErrStreamError = errors.New("stream error") )
View Source
var (
ErrInvalidConfiguration = errors.New("invalid metamorphosis config")
)
Functions ¶
This section is empty.
Types ¶
type API ¶
type API interface { CommitRecord(ctx context.Context, record *metamorphosisv1.Record) error FetchRecord(ctx context.Context) (*metamorphosisv1.Record, error) FetchRecords(ctx context.Context, max int32) ([]*metamorphosisv1.Record, error) Init(ctx context.Context) error PutRecords(ctx context.Context, request *PutRecordsRequest) error CurrentReservation() *Reservation ListReservations(ctx context.Context) ([]Reservation, error) IsReserved(reservations []Reservation, shard ktypes.Shard) bool }
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) CommitRecord ¶
func (*Client) CurrentReservation ¶
func (m *Client) CurrentReservation() *Reservation
func (*Client) FetchRecord ¶
func (*Client) FetchRecords ¶
func (*Client) IsReserved ¶
func (m *Client) IsReserved(reservations []Reservation, shard ktypes.Shard) bool
func (*Client) ListReservations ¶
func (m *Client) ListReservations(ctx context.Context) ([]Reservation, error)
func (*Client) PutRecords ¶
func (m *Client) PutRecords(ctx context.Context, req *PutRecordsRequest) error
type ClientContextKey ¶
type ClientContextKey struct{}
type Config ¶
type Config struct { // required fields GroupID string WorkerID string StreamARN string KinesisClient KinesisAPI DynamoClient DynamoDBAPI // optional fields ReservationTableName string ShardID string ReservationTimeout time.Duration RenewTime time.Duration ManagerLoopWaitTime time.Duration RecordProcessor RecordProcessor Logger *slog.Logger ShardCacheDuration time.Duration MaxActorCount int WorkerPrefix string SleepAfterProcessing time.Duration Seed int BatchSize int32 }
Config contains the lower level settings
type DynamoDBAPI ¶
type DynamoDBAPI interface { Scan(ctx context.Context, params *dynamodb.ScanInput, optFns ...func(*dynamodb.Options)) (*dynamodb.ScanOutput, error) DescribeTable(ctx context.Context, params *dynamodb.DescribeTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DescribeTableOutput, error) PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) Query(ctx context.Context, params *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) CreateTable(ctx context.Context, params *dynamodb.CreateTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error) DeleteTable(ctx context.Context, params *dynamodb.DeleteTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteTableOutput, error) }
type KinesisAPI ¶
type KinesisAPI interface { GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) PutRecords(ctx context.Context, params *kinesis.PutRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error) ListShards(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error) CreateStream(ctx context.Context, params *kinesis.CreateStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.CreateStreamOutput, error) DeleteStream(ctx context.Context, params *kinesis.DeleteStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DeleteStreamOutput, error) DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) }
type LoggerContextKey ¶
type LoggerContextKey struct{}
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func (*Manager) AddActorID ¶
func (*Manager) CheckForAvailableShards ¶
func (*Manager) DecrementActorCount ¶
func (m *Manager) DecrementActorCount()
func (*Manager) IncrementActorCount ¶
func (m *Manager) IncrementActorCount()
func (*Manager) RemoveActorID ¶
type Option ¶
type Option func(*Config)
func WithBatchSize ¶
func WithDynamoClient ¶
func WithDynamoClient(client DynamoDBAPI) Option
func WithKinesisClient ¶
func WithKinesisClient(client KinesisAPI) Option
func WithLogger ¶
func WithManagerLoopWaitTime ¶
func WithMaxActorCount ¶
func WithRecordProcessor ¶
func WithRecordProcessor(p RecordProcessor) Option
func WithRenewTime ¶
func WithReservationTimeout ¶
func WithShardCacheDuration ¶
func WithShardID ¶
func WithStreamArn ¶
func WithWorkerID ¶
func WithWorkerPrefix ¶
type PutRecordsRequest ¶
type PutRecordsRequest struct { Records []*metamorphosisv1.Record StreamName *string StreamArn *string }
type RecordProcessor ¶
type RecordProcessor = func(context.Context, *metamorphosisv1.Record) error
type Reservation ¶
type Reservation struct { // primary key GroupID string `dynamodbav:"groupID"` // secondary key ShardID string `dynamodbav:"shardID"` WorkerID string `dynamodbav:"workerID"` ExpiresAt int64 `dynamodbav:"expiresAt"` LatestSequence string `dynamodbav:"latestSequence"` }
func (*Reservation) Expires ¶
func (r *Reservation) Expires() time.Time
Click to show internal directories.
Click to hide internal directories.