Documentation ¶
Index ¶
- Constants
- Variables
- type AttributeType
- type Backoff
- type Backofface
- type Clientface
- type Config
- type Coordinator
- func (c *Coordinator) Create(lease Lease) (Lease, error)
- func (c *Coordinator) Delete(l Lease) error
- func (c *Coordinator) ForceUpdate(lease Lease) (Lease, error)
- func (c *Coordinator) GetHeldLeases() []Lease
- func (c *Coordinator) Start() error
- func (c *Coordinator) Stop()
- func (c *Coordinator) Update(lease Lease) (Lease, error)
- type Lease
- type LeaseManager
- func (l *LeaseManager) CreateLease(lease *Lease) (*Lease, error)
- func (l *LeaseManager) CreateLeaseTable() (err error)
- func (l *LeaseManager) DeleteLease(lease *Lease) (err error)
- func (l *LeaseManager) EvictLease(lease *Lease) (err error)
- func (l *LeaseManager) ListLeases() (list []*Lease, err error)
- func (l *LeaseManager) RenewLease(lease *Lease) (err error)
- func (l *LeaseManager) TakeLease(lease *Lease) (err error)
- func (l *LeaseManager) UpdateLease(lease *Lease) (*Lease, error)
- type Leaser
- type Logger
- type Manager
- type Renewer
- type Serializer
- type Taker
Constants ¶
const ( // Table schema LeaseKeyKey = "leaseKey" LeaseOwnerKey = "leaseOwner" LeaseCounterKey = "leaseCounter" // AWS exception AlreadyExist = "ResourceInUseException" ConditionalFailed = "ConditionalCheckFailedException" )
Variables ¶
var ( // ErrTokenNotMatch and ErrLeaseNotHeld could be return only on the Update() call. // // If the concurrency token of the passed-in lease doesn't match the // concurrency token of the authoritative lease, it means the lease was // lost and regained between when the caller acquired his concurrency // token and when the caller called update. ErrTokenNotMatch = errors.New("leaser: concurrency token doesn't match the authoritative lease") // ErrLeaseNotHeld error will be returns only if the passed-in lease object // does not held be this worker. ErrLeaseNotHeld = errors.New("leaser: worker does not hold the passed-in lease object") // ErrValueNotMatch error will be returns only if you tring to set an extra field on // a lease object using the SetAs method and the field value does not match the field // type. // for example: StringSet type excepts only []string{...} ErrValueNotMatch = errors.New("leaser: field value does not match the field type") )
Functions ¶
This section is empty.
Types ¶
type AttributeType ¶
type AttributeType int
AttributeType used to explicitly set the DynamoDB data type when setting an extra field on a lease object.
const ( // StringSet is a string set data type StringSet AttributeType = iota // NumberSet is a number set data type NumberSet // BinarySet is a binary set data type BinarySet )
type Backofface ¶
Backofface is the interface that holds the backoff strategy
type Clientface ¶
type Clientface interface { Scan(*dynamodb.ScanInput) (*dynamodb.ScanOutput, error) PutItem(*dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) UpdateItem(*dynamodb.UpdateItemInput) (*dynamodb.UpdateItemOutput, error) DeleteItem(*dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) CreateTable(*dynamodb.CreateTableInput) (*dynamodb.CreateTableOutput, error) DescribeTable(*dynamodb.DescribeTableInput) (*dynamodb.DescribeTableOutput, error) }
Clientface is a thin methods set of DynamoDB.
type Config ¶
type Config struct { // Client is a Clientface implemetation. Client Clientface // Logger is the logger used. defaults to log.Log Logger Logger // Backoff determines the backoff strategy for http failures. // Defaults to lease.Backoff with min value of time.Second and jitter // set to true. Backoff Backofface // The Amazon DynamoDB table name used for tracking leases. LeaseTable string // WorkerId used as a lease-owner. WorkerId string // ExpireAfter indicate how long lease unit can live without renovation // before expiration. // A worker which does not renew it's lease, will be regarded as having problems // and it's shards will be assigned to other workers. defaults to 10s. ExpireAfter time.Duration // Max leases to steal from another worker at one time (for load balancing). // Setting this to a higher number allow faster load convergence (e.g. during deployments, cold starts), // but can cause higher churn in the system. defaults to 1. MaxLeasesToStealAtOneTime int // The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity. // Defaults to 10. LeaseTableReadCap int // The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity. // Defaults to 10. LeaseTableWriteCap int // contains filtered or unexported fields }
Config is the representation of Coordinator settings.
type Coordinator ¶
type Coordinator struct { *Config Manager Manager Renewer Renewer Taker Taker // contains filtered or unexported fields }
Coordinator is the implemtation of the Leaser interface. It's abstracts away LeaseTaker and LeaseRenewer from the application code that using leasing and it owns the scheduling of two previously mentioned components.
func (*Coordinator) Create ¶
func (c *Coordinator) Create(lease Lease) (Lease, error)
Create a new lease. Conditional on a lease not already existing with different owner and counter.
func (*Coordinator) Delete ¶
func (c *Coordinator) Delete(l Lease) error
Delete the given lease from DB. does nothing when passed a lease that does not exist in the DB. The deletion is conditional on the fact that the lease is being held by this worker.
func (*Coordinator) ForceUpdate ¶
func (c *Coordinator) ForceUpdate(lease Lease) (Lease, error)
ForceUpdate used to update the lease object without checking if the concurrency token is valid or if we already lost this lease.
Unlike Update, this method allows you to update the task status, or any other fields even if you lost the lease.
for example: {"status": "done", "last_update": "unix seconds"} To add extra fields on a Lease, use Lease.Set(key, val)
func (*Coordinator) GetHeldLeases ¶
func (c *Coordinator) GetHeldLeases() []Lease
GetHeldLeases returns the currently held leases. A lease is currently held if we successfully renewed it on the last run of Renewer.Renew(). Lease objects returned are copies and their counters will not tick.
func (*Coordinator) Start ¶
func (c *Coordinator) Start() error
Start create the leases table if it's not exist and then start background leaseHolder and leaseTaker handling.
func (*Coordinator) Stop ¶
func (c *Coordinator) Stop()
Stop the coordinator gracefully. wait for background tasks to complete.
func (*Coordinator) Update ¶
func (c *Coordinator) Update(lease Lease) (Lease, error)
Update used to update only the extra fields on the Lease object and it cannot be used to update internal fields such as leaseCounter, leaseOwner.
Fails if we do not hold the lease, or if the concurrency token does not match the concurrency token on the internal authoritative copy of the lease (ie, if we lost and re-acquired the lease).
With this method you will be able to update the task status, or any other fields. for example: {"status": "done", "last_update": "unix seconds"} To add extra fields on a Lease, use Lease.Set(key, val)
type Lease ¶
type Lease struct { Key string `dynamodbav:"leaseKey"` Owner string `dynamodbav:"leaseOwner"` Counter int `dynamodbav:"leaseCounter"` // contains filtered or unexported fields }
Lease type contains data pertianing to a Lease. Distributed systems may use leases to partition work across a fleet of workers. Each unit of work/task identified by a leaseKey and has a corresponding Lease. Every worker will contend for all leases - only one worker will successfully take each one. The worker should hold the lease until it is ready to stop processing the corresponding unit of work, or until it fails. When the worker stops holding the lease, another worker will take and hold the lease.
func (*Lease) Get ¶
Get extra field(metadata) from the Lease object that not belongs to this package.
func (*Lease) Set ¶
Set extra field(metadata) to the Lease object before you create or update it using the Leaser.
Use this method to add meta-data on the lease. for example:
lease.Set("success", true) lease.Set("checkpoint", 35465786912)
func (*Lease) SetAs ¶
func (l *Lease) SetAs(key string, val interface{}, typ AttributeType) error
SetAs is like the Set method, but with another argument "typ" that explicitly sets the DynamoDB data type.
For example:
Set("key", []string{"foo", "bar"}) // add this field as a list SetAs("key", []string{"foo", "bar"}, StringSet) // add this field as a string set
Error will be returns only if the field value does not match the field type.
type LeaseManager ¶
type LeaseManager struct { *Config Serializer Serializer }
LeaseManager is the default implemntation of Manager that uses DynamoDB.
func (*LeaseManager) CreateLease ¶
func (l *LeaseManager) CreateLease(lease *Lease) (*Lease, error)
Create a new lease. conditional on a lease not already existing with different owner and counter.
func (*LeaseManager) CreateLeaseTable ¶
func (l *LeaseManager) CreateLeaseTable() (err error)
CreateLeaseTable creates the table that will store the leases. succeeds if it's already exists.
func (*LeaseManager) DeleteLease ¶
func (l *LeaseManager) DeleteLease(lease *Lease) (err error)
Delete the given lease from DynamoDB. does nothing when passed a lease that does not exist in DynamoDB.
func (*LeaseManager) EvictLease ¶
func (l *LeaseManager) EvictLease(lease *Lease) (err error)
Evict the current owner of lease by setting owner to null Conditional on the owner in DynamoDB matching the owner of the input. Mutates the lease owner of the passed-in lease object after updating the record in DynamoDB.
func (*LeaseManager) ListLeases ¶
func (l *LeaseManager) ListLeases() (list []*Lease, err error)
ListLeasses returns all the lease units stored in the table.
func (*LeaseManager) RenewLease ¶
func (l *LeaseManager) RenewLease(lease *Lease) (err error)
Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter of the input Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB.
func (*LeaseManager) TakeLease ¶
func (l *LeaseManager) TakeLease(lease *Lease) (err error)
Take a lease by incrementing its leaseCounter and setting its owner field. Conditional on the leaseCounter in DynamoDB matching the leaseCounter of the input Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB.
func (*LeaseManager) UpdateLease ¶
func (l *LeaseManager) UpdateLease(lease *Lease) (*Lease, error)
UpdateLease used to update only the extra fields on the Lease object. With this method you will be able to update the task status, or any other fields. for example: {"status": "done", "last_update": "unix seconds"} To add extra fields on a Lease, use Lease.Set(key, val)
type Leaser ¶
type Leaser interface { Stop() Start() error Delete(Lease) error Create(Lease) (Lease, error) Update(Lease) (Lease, error) ForceUpdate(Lease) (Lease, error) GetHeldLeases() []Lease }
Leaser is the interface that wraps the Coordinator methods.
type Logger ¶
type Logger interface { WithFields(logrus.Fields) *logrus.Entry WithField(string, interface{}) *logrus.Entry WithError(error) *logrus.Entry Debug(...interface{}) Info(...interface{}) Error(...interface{}) Fatal(...interface{}) Debugf(string, ...interface{}) Infof(string, ...interface{}) Warnf(string, ...interface{}) }
Logger represents the desired API of both Logger and Entry.
type Manager ¶
type Manager interface { // Creates the table that will store leases if it's not already exists. CreateLeaseTable() error // List all leases(objects) in table. ListLeases() ([]*Lease, error) // Renew a lease RenewLease(*Lease) error // Take a lease TakeLease(*Lease) error // Evict a lease EvictLease(*Lease) error // Delete a lease DeleteLease(*Lease) error // Create a lease CreateLease(*Lease) (*Lease, error) // Update a lease UpdateLease(*Lease) (*Lease, error) }
Manager wrap the basic operations for leases.
type Renewer ¶
Renewer used by the LeaseCoordinator to renew leases held by the system. Each LeaseCoordinator instance corresponds to one worker and uses exactly one LeaseRenewer to manage lease renewal for that worker.
type Serializer ¶
type Serializer interface { // Decode convert the provided dynamodb item to Lease object. Decode(map[string]*dynamodb.AttributeValue) (*Lease, error) // Encode serializes the provided Lease object to dynamodb item. Encode(*Lease) (map[string]*dynamodb.AttributeValue, error) }
Serializer used to encode and decode lease objects to DynamoDB records and vice versa.