lease

package module
v0.0.0-...-9bb2630 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2017 License: MIT Imports: 15 Imported by: 1

README

Lease Build status License GoDoc

A generic leasing implementation of the Amazon-KCL.lease package. ideal for manage and partition work across a fleet of workers.

Screenshot

What is a Lease ?

Lease type contains data pertianing to a lease.
Distributed systems may use leases to partition work across a fleet of workers.
Each unit of work (identified by a leaseKey) has a corresponding Lease.
Every worker will contend for all leases - only one worker will successfully take each one.
The worker should hold the lease until it is ready to stop processing the corresponding unit of work, or until it fails.
When the worker stops holding the lease, another worker will take and hold the lease.

To get started, see the examples

License

MIT

Documentation

Index

Constants

View Source
const (
	// Table schema
	LeaseKeyKey     = "leaseKey"
	LeaseOwnerKey   = "leaseOwner"
	LeaseCounterKey = "leaseCounter"

	// AWS exception
	AlreadyExist      = "ResourceInUseException"
	ConditionalFailed = "ConditionalCheckFailedException"
)

Variables

View Source
var (
	// ErrTokenNotMatch and ErrLeaseNotHeld could be return only on the Update() call.
	//
	// If the concurrency token of the passed-in lease doesn't match the
	// concurrency token of the authoritative lease, it means the lease was
	// lost and regained between when the caller acquired his concurrency
	// token and when the caller called update.
	ErrTokenNotMatch = errors.New("leaser: concurrency token doesn't match the authoritative lease")
	// ErrLeaseNotHeld error will be returns only if the passed-in lease object
	// does not held be this  worker.
	ErrLeaseNotHeld = errors.New("leaser: worker does not hold the passed-in lease object")
	// ErrValueNotMatch error will be returns only if you tring to set an extra field on
	// a lease object using the SetAs method and the field value does not match the field
	// type.
	// for example: StringSet type excepts only []string{...}
	ErrValueNotMatch = errors.New("leaser: field value does not match the field type")
)

Functions

This section is empty.

Types

type AttributeType

type AttributeType int

AttributeType used to explicitly set the DynamoDB data type when setting an extra field on a lease object.

const (
	// StringSet is a string set data type
	StringSet AttributeType = iota
	// NumberSet is a  number set data type
	NumberSet
	// BinarySet is a binary set data type
	BinarySet
)

type Backoff

type Backoff struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Backoff is the default thread-safe implemtation for Backofface

func (*Backoff) Attempt

func (b *Backoff) Attempt() float64

func (*Backoff) Duration

func (b *Backoff) Duration() time.Duration

func (*Backoff) Reset

func (b *Backoff) Reset()

type Backofface

type Backofface interface {
	Reset()
	Attempt() float64
	Duration() time.Duration
}

Backofface is the interface that holds the backoff strategy

type Clientface

Clientface is a thin methods set of DynamoDB.

type Config

type Config struct {
	// Client is a Clientface implemetation.
	Client Clientface

	// Logger is the logger used. defaults to log.Log
	Logger Logger

	// Backoff determines the backoff strategy for http failures.
	// Defaults to lease.Backoff with min value of time.Second and jitter
	// set to true.
	Backoff Backofface

	// The Amazon DynamoDB table name used for tracking leases.
	LeaseTable string

	// WorkerId used as a lease-owner.
	WorkerId string

	// ExpireAfter indicate how long lease unit can live without renovation
	// before expiration.
	// A worker which does not renew it's lease, will be regarded as having problems
	// and it's shards will be assigned to other workers. defaults to 10s.
	ExpireAfter time.Duration

	// Max leases to steal from another worker at one time (for load balancing).
	// Setting this to a higher number allow faster load convergence (e.g. during deployments, cold starts),
	// but can cause higher churn in the system. defaults to 1.
	MaxLeasesToStealAtOneTime int

	// The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
	// Defaults to 10.
	LeaseTableReadCap int

	// The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
	// Defaults to 10.
	LeaseTableWriteCap int
	// contains filtered or unexported fields
}

Config is the representation of Coordinator settings.

type Coordinator

type Coordinator struct {
	*Config
	Manager Manager
	Renewer Renewer
	Taker   Taker
	// contains filtered or unexported fields
}

Coordinator is the implemtation of the Leaser interface. It's abstracts away LeaseTaker and LeaseRenewer from the application code that using leasing and it owns the scheduling of two previously mentioned components.

func (*Coordinator) Create

func (c *Coordinator) Create(lease Lease) (Lease, error)

Create a new lease. Conditional on a lease not already existing with different owner and counter.

func (*Coordinator) Delete

func (c *Coordinator) Delete(l Lease) error

Delete the given lease from DB. does nothing when passed a lease that does not exist in the DB. The deletion is conditional on the fact that the lease is being held by this worker.

func (*Coordinator) ForceUpdate

func (c *Coordinator) ForceUpdate(lease Lease) (Lease, error)

ForceUpdate used to update the lease object without checking if the concurrency token is valid or if we already lost this lease.

Unlike Update, this method allows you to update the task status, or any other fields even if you lost the lease.

for example: {"status": "done", "last_update": "unix seconds"} To add extra fields on a Lease, use Lease.Set(key, val)

func (*Coordinator) GetHeldLeases

func (c *Coordinator) GetHeldLeases() []Lease

GetHeldLeases returns the currently held leases. A lease is currently held if we successfully renewed it on the last run of Renewer.Renew(). Lease objects returned are copies and their counters will not tick.

func (*Coordinator) Start

func (c *Coordinator) Start() error

Start create the leases table if it's not exist and then start background leaseHolder and leaseTaker handling.

func (*Coordinator) Stop

func (c *Coordinator) Stop()

Stop the coordinator gracefully. wait for background tasks to complete.

func (*Coordinator) Update

func (c *Coordinator) Update(lease Lease) (Lease, error)

Update used to update only the extra fields on the Lease object and it cannot be used to update internal fields such as leaseCounter, leaseOwner.

Fails if we do not hold the lease, or if the concurrency token does not match the concurrency token on the internal authoritative copy of the lease (ie, if we lost and re-acquired the lease).

With this method you will be able to update the task status, or any other fields. for example: {"status": "done", "last_update": "unix seconds"} To add extra fields on a Lease, use Lease.Set(key, val)

type Lease

type Lease struct {
	Key     string `dynamodbav:"leaseKey"`
	Owner   string `dynamodbav:"leaseOwner"`
	Counter int    `dynamodbav:"leaseCounter"`
	// contains filtered or unexported fields
}

Lease type contains data pertianing to a Lease. Distributed systems may use leases to partition work across a fleet of workers. Each unit of work/task identified by a leaseKey and has a corresponding Lease. Every worker will contend for all leases - only one worker will successfully take each one. The worker should hold the lease until it is ready to stop processing the corresponding unit of work, or until it fails. When the worker stops holding the lease, another worker will take and hold the lease.

func NewLease

func NewLease(key string) Lease

NewLease gets a key(represents the lease key/name) and returns a new Lease object.

func (*Lease) Del

func (l *Lease) Del(key string)

Del deletes extra field(metadata) of the lease object.

func (*Lease) Get

func (l *Lease) Get(key string) (interface{}, bool)

Get extra field(metadata) from the Lease object that not belongs to this package.

func (*Lease) Set

func (l *Lease) Set(key string, val interface{})

Set extra field(metadata) to the Lease object before you create or update it using the Leaser.

Use this method to add meta-data on the lease. for example:

lease.Set("success", true)
lease.Set("checkpoint", 35465786912)

func (*Lease) SetAs

func (l *Lease) SetAs(key string, val interface{}, typ AttributeType) error

SetAs is like the Set method, but with another argument "typ" that explicitly sets the DynamoDB data type.

For example:

Set("key", []string{"foo", "bar"})               // add this field as a list
SetAs("key", []string{"foo", "bar"}, StringSet)  // add this field as a string set

Error will be returns only if the field value does not match the field type.

type LeaseManager

type LeaseManager struct {
	*Config
	Serializer Serializer
}

LeaseManager is the default implemntation of Manager that uses DynamoDB.

func (*LeaseManager) CreateLease

func (l *LeaseManager) CreateLease(lease *Lease) (*Lease, error)

Create a new lease. conditional on a lease not already existing with different owner and counter.

func (*LeaseManager) CreateLeaseTable

func (l *LeaseManager) CreateLeaseTable() (err error)

CreateLeaseTable creates the table that will store the leases. succeeds if it's already exists.

func (*LeaseManager) DeleteLease

func (l *LeaseManager) DeleteLease(lease *Lease) (err error)

Delete the given lease from DynamoDB. does nothing when passed a lease that does not exist in DynamoDB.

func (*LeaseManager) EvictLease

func (l *LeaseManager) EvictLease(lease *Lease) (err error)

Evict the current owner of lease by setting owner to null Conditional on the owner in DynamoDB matching the owner of the input. Mutates the lease owner of the passed-in lease object after updating the record in DynamoDB.

func (*LeaseManager) ListLeases

func (l *LeaseManager) ListLeases() (list []*Lease, err error)

ListLeasses returns all the lease units stored in the table.

func (*LeaseManager) RenewLease

func (l *LeaseManager) RenewLease(lease *Lease) (err error)

Renew a lease by incrementing the lease counter. Conditional on the leaseCounter in DynamoDB matching the leaseCounter of the input Mutates the leaseCounter of the passed-in lease object after updating the record in DynamoDB.

func (*LeaseManager) TakeLease

func (l *LeaseManager) TakeLease(lease *Lease) (err error)

Take a lease by incrementing its leaseCounter and setting its owner field. Conditional on the leaseCounter in DynamoDB matching the leaseCounter of the input Mutates the lease counter and owner of the passed-in lease object after updating the record in DynamoDB.

func (*LeaseManager) UpdateLease

func (l *LeaseManager) UpdateLease(lease *Lease) (*Lease, error)

UpdateLease used to update only the extra fields on the Lease object. With this method you will be able to update the task status, or any other fields. for example: {"status": "done", "last_update": "unix seconds"} To add extra fields on a Lease, use Lease.Set(key, val)

type Leaser

type Leaser interface {
	Stop()
	Start() error
	Delete(Lease) error
	Create(Lease) (Lease, error)
	Update(Lease) (Lease, error)
	ForceUpdate(Lease) (Lease, error)
	GetHeldLeases() []Lease
}

Leaser is the interface that wraps the Coordinator methods.

func New

func New(config *Config) Leaser

New create new Coordinator with the given config.

type Logger

type Logger interface {
	WithFields(logrus.Fields) *logrus.Entry
	WithField(string, interface{}) *logrus.Entry
	WithError(error) *logrus.Entry
	Debug(...interface{})
	Info(...interface{})
	Error(...interface{})
	Fatal(...interface{})
	Debugf(string, ...interface{})
	Infof(string, ...interface{})
	Warnf(string, ...interface{})
}

Logger represents the desired API of both Logger and Entry.

type Manager

type Manager interface {
	// Creates the table that will store leases if it's not already exists.
	CreateLeaseTable() error

	// List all leases(objects) in table.
	ListLeases() ([]*Lease, error)

	// Renew a lease
	RenewLease(*Lease) error

	// Take a lease
	TakeLease(*Lease) error

	// Evict a lease
	EvictLease(*Lease) error

	// Delete a lease
	DeleteLease(*Lease) error

	// Create a lease
	CreateLease(*Lease) (*Lease, error)

	// Update a lease
	UpdateLease(*Lease) (*Lease, error)
}

Manager wrap the basic operations for leases.

type Renewer

type Renewer interface {
	Renew() error
	GetHeldLeases() []Lease
}

Renewer used by the LeaseCoordinator to renew leases held by the system. Each LeaseCoordinator instance corresponds to one worker and uses exactly one LeaseRenewer to manage lease renewal for that worker.

type Serializer

type Serializer interface {
	// Decode convert the provided dynamodb item to Lease object.
	Decode(map[string]*dynamodb.AttributeValue) (*Lease, error)
	// Encode serializes the provided Lease object to dynamodb item.
	Encode(*Lease) (map[string]*dynamodb.AttributeValue, error)
}

Serializer used to encode and decode lease objects to DynamoDB records and vice versa.

type Taker

type Taker interface {
	Take() error
}

Taker is the interface that wraps the Take method. It used by Coordinator to take new leases, or leases that other workers fail to renew. Each Coordinator instance corresponds to one worker and uses exactly one Taker to take leases for that worker.

Directories

Path Synopsis
simulate distributed system that uses leases to partition work across a fleet of workers.
simulate distributed system that uses leases to partition work across a fleet of workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL