Documentation ¶
Overview ¶
Package dynamolock provides a simple utility for using DynamoDB's consistent read/write feature to use it for managing distributed locks.
In order to use this package, the client must create a table in DynamoDB, although the client provides a convenience method for creating that table (CreateTable).
Basic usage:
import ( "context" "log" "cirello.io/dynamolock/v3" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ) //--- cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion("us-west-2")) if err != nil { log.Fatal(err) } c, err := dynamolock.New(dynamodb.NewFromConfig(cfg), "locks", dynamolock.WithLeaseDuration(3*time.Second), dynamolock.WithHeartbeatPeriod(1*time.Second), ) if err != nil { log.Fatal(err) } defer c.Close(context.Background()) log.Println("ensuring table exists") c.CreateTable(context.Background(), dynamolock.WithProvisionedThroughput(&types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(5), WriteCapacityUnits: aws.Int64(5), }), ) //-- at this point you must wait for DynamoDB to complete the creation. data := []byte("some content a") lockedItem, err := c.AcquireLock(context.Background(), "spock", dynamolock.WithData(data), dynamolock.ReplaceData(), ) if err != nil { log.Fatal(err) } log.Println("lock content:", string(lockedItem.Data())) if got := string(lockedItem.Data()); string(data) != got { log.Println("losing information inside lock storage, wanted:", string(data), " got:", got) } log.Println("cleaning lock") success, err := c.ReleaseLock(context.Background(), lockedItem) if !success { log.Fatal("lost lock before release") } if err != nil { log.Fatal("error releasing lock:", err) } log.Println("done")
This package is covered by this SLA: https://github.com/cirello-io/public/blob/master/SLA.md
Index ¶
- Variables
- type AcquireLockOption
- func FailIfLocked() AcquireLockOption
- func ReplaceData() AcquireLockOption
- func WithAdditionalAttributes(attr map[string]types.AttributeValue) AcquireLockOption
- func WithAdditionalTimeToWaitForLock(d time.Duration) AcquireLockOption
- func WithData(b []byte) AcquireLockOption
- func WithDeleteLockOnRelease() AcquireLockOption
- func WithRefreshPeriod(d time.Duration) AcquireLockOption
- func WithSessionMonitor(safeTime time.Duration, callback func()) AcquireLockOption
- type Client
- func (c *Client) AcquireLock(ctx context.Context, partitionKey string, opts ...AcquireLockOption) (*Lock, error)
- func (c Client) Close(ctx context.Context) error
- func (c *Client) CreateTable(ctx context.Context, opts ...CreateTableOption) (*dynamodb.CreateTableOutput, error)
- func (c *Client) Get(ctx context.Context, partitionKey string) (*Lock, error)
- func (c Client) ReleaseLock(ctx context.Context, lockItem *Lock, opts ...ReleaseLockOption) (bool, error)
- func (c Client) SendHeartbeat(ctx context.Context, lockItem *Lock, opts ...SendHeartbeatOption) error
- type ClientOption
- func DisableHeartbeat() ClientOption
- func WithContextLeveledLogger(l ContextLeveledLogger) ClientOption
- func WithHeartbeatPeriod(d time.Duration) ClientOption
- func WithLeaseDuration(d time.Duration) ClientOption
- func WithLeveledLogger(l LeveledLogger) ClientOption
- func WithLogger(l Logger) ClientOption
- func WithOwnerName(s string) ClientOption
- type ClientWithSortKey
- func (c *ClientWithSortKey) AcquireLock(ctx context.Context, partitionKey, sortKey string, opts ...AcquireLockOption) (*Lock, error)
- func (c ClientWithSortKey) Close(ctx context.Context) error
- func (c *ClientWithSortKey) CreateTable(ctx context.Context, opts ...CreateTableOption) (*dynamodb.CreateTableOutput, error)
- func (c *ClientWithSortKey) Get(ctx context.Context, partitionKey, sortKey string) (*Lock, error)
- func (c ClientWithSortKey) ReleaseLock(ctx context.Context, lockItem *Lock, opts ...ReleaseLockOption) (bool, error)
- func (c ClientWithSortKey) SendHeartbeat(ctx context.Context, lockItem *Lock, opts ...SendHeartbeatOption) error
- type ContextLeveledLogger
- type CreateTableOption
- type DynamoDBClient
- type LeveledLogger
- type Lock
- type LockNotGrantedError
- type Logger
- type ReleaseLockOption
- type SendHeartbeatOption
- type TimeoutError
Constants ¶
This section is empty.
Variables ¶
var ( ErrSessionMonitorNotSet = errors.New("session monitor is not set") ErrLockAlreadyReleased = errors.New("lock is already released") ErrCannotReleaseNullLock = errors.New("cannot release null lock item") ErrOwnerMismatched = errors.New("lock owner mismatched") )
Errors related to session manager life-cycle.
var ErrClientClosed = errors.New("client already closed")
ErrClientClosed reports the client cannot be used because it is already closed.
Functions ¶
This section is empty.
Types ¶
type AcquireLockOption ¶
type AcquireLockOption func(*acquireLockOptions)
AcquireLockOption allows to change how the lock is actually held by the client.
func FailIfLocked ¶
func FailIfLocked() AcquireLockOption
FailIfLocked will not retry to acquire the lock, instead returning.
func ReplaceData ¶
func ReplaceData() AcquireLockOption
ReplaceData will force the new content to be stored in the key.
func WithAdditionalAttributes ¶
func WithAdditionalAttributes(attr map[string]types.AttributeValue) AcquireLockOption
WithAdditionalAttributes stores some additional attributes with each lock. This can be used to add any arbitrary parameters to each lock row.
func WithAdditionalTimeToWaitForLock ¶
func WithAdditionalTimeToWaitForLock(d time.Duration) AcquireLockOption
WithAdditionalTimeToWaitForLock defines how long to wait in addition to the lease duration (if set to 10 minutes, this will try to acquire a lock for at least 10 minutes before giving up and returning an error).
func WithData ¶
func WithData(b []byte) AcquireLockOption
WithData stores the content into the lock itself.
func WithDeleteLockOnRelease ¶
func WithDeleteLockOnRelease() AcquireLockOption
WithDeleteLockOnRelease defines whether or not the lock should be deleted when Close() is called on the resulting LockItem will force the new content to be stored in the key.
func WithRefreshPeriod ¶
func WithRefreshPeriod(d time.Duration) AcquireLockOption
WithRefreshPeriod defines how long to wait before trying to get the lock again (if set to 10 seconds, for example, it would attempt to do so every 10 seconds).
func WithSessionMonitor ¶
func WithSessionMonitor(safeTime time.Duration, callback func()) AcquireLockOption
WithSessionMonitor registers a callback that is triggered if the lock is about to expire.
The purpose of this construct is to provide two abilities: provide the ability to determine if the lock is about to expire, and run a user-provided callback when the lock is about to expire. The advantage this provides is notification that your lock is about to expire before it is actually expired, and in case of leader election will help in preventing that there are no two leaders present simultaneously.
If due to any reason heartbeating is unsuccessful for a configurable period of time, your lock enters into a phase known as "danger zone." It is during this "danger zone" that the callback will be run.
Bear in mind that the callback may be null. In this case, no callback will be run upon the lock entering the "danger zone"; yet, one can still make use of the Lock.IsAlmostExpired() call. Furthermore, non-null callbacks can only ever be executed once in a lock's lifetime. Independent of whether or not a callback is run, the client will attempt to heartbeat the lock until the lock is released or obtained by someone else.
Consider an example which uses this mechanism for leader election. One way to make use of this SessionMonitor is to register a callback that kills the instance in case the leader's lock enters the danger zone:
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a dynamoDB based distributed lock client.
func New ¶
func New(dynamoDB DynamoDBClient, tableName, partitionKeyName string, opts ...ClientOption) (*Client, error)
New creates a new dynamoDB based distributed lock client.
func (*Client) AcquireLock ¶
func (c *Client) AcquireLock(ctx context.Context, partitionKey string, opts ...AcquireLockOption) (*Lock, error)
AcquireLock holds the defined lock. The given context is passed down to the underlying dynamoDB call.
func (Client) Close ¶
Close releases all of the locks. The given context is passed down to the underlying dynamoDB calls.
func (*Client) CreateTable ¶
func (c *Client) CreateTable(ctx context.Context, opts ...CreateTableOption) (*dynamodb.CreateTableOutput, error)
CreateTable prepares a DynamoDB table with the right schema for it to be used by this locking library. The table should be set up in advance, because it takes a few minutes for DynamoDB to provision a new instance. Also, if the table already exists, it will return an error. The given context is passed down to the underlying dynamoDB call.
func (*Client) Get ¶
Get finds out who owns the given lock, but does not acquire the lock. It returns the metadata currently associated with the given lock. If the client currently has the lock, it will return the lock, and operations such as releaseLock will work. However, if the client does not have the lock, then operations like releaseLock will not work (after calling Get, the caller should check lockItem.isExpired() to figure out if it currently has the lock.) If the context is canceled, it is going to return the context error on local cache hit. The given context is passed down to the underlying dynamoDB call.
func (Client) ReleaseLock ¶
func (c Client) ReleaseLock(ctx context.Context, lockItem *Lock, opts ...ReleaseLockOption) (bool, error)
ReleaseLock releases the given lock if the current user still has it, returning true if the lock was successfully released, and false if someone else already stole the lock or a problem happened. Deletes the lock item if it is released and deleteLockItemOnClose is set.
func (Client) SendHeartbeat ¶
func (c Client) SendHeartbeat(ctx context.Context, lockItem *Lock, opts ...SendHeartbeatOption) error
SendHeartbeat indicates that the given lock is still being worked on. If using WithHeartbeatPeriod > 0 when setting up this object, then this method is unnecessary, because the background thread will be periodically calling it and sending heartbeats. However, if WithHeartbeatPeriod = 0, then this method must be called to instruct DynamoDB that the lock should not be expired. The given context is passed down to the underlying dynamoDB call.
type ClientOption ¶
type ClientOption func(*commonClient)
ClientOption reconfigure the lock client creation.
func DisableHeartbeat ¶
func DisableHeartbeat() ClientOption
DisableHeartbeat disables automatic hearbeats. Use SendHeartbeat to freshen up the lock.
func WithContextLeveledLogger ¶
func WithContextLeveledLogger(l ContextLeveledLogger) ClientOption
WithContextLeveledLogger injects a logger into the client, so its internals can be recorded.
func WithHeartbeatPeriod ¶
func WithHeartbeatPeriod(d time.Duration) ClientOption
WithHeartbeatPeriod defines the frequency of the heartbeats. Set to zero to disable it. Heartbeats should have no more than half of the duration of the lease.
func WithLeaseDuration ¶
func WithLeaseDuration(d time.Duration) ClientOption
WithLeaseDuration defines how long should the lease be held.
func WithLeveledLogger ¶
func WithLeveledLogger(l LeveledLogger) ClientOption
WithLeveledLogger injects a logger into the client, so its internals can be recorded.
func WithLogger ¶
func WithLogger(l Logger) ClientOption
WithLogger injects a logger into the client, so its internals can be recorded.
func WithOwnerName ¶
func WithOwnerName(s string) ClientOption
WithOwnerName changes the owner linked to the client, and by consequence to locks.
type ClientWithSortKey ¶
type ClientWithSortKey struct {
// contains filtered or unexported fields
}
ClientWithSortKey is a dynamoDB based distributed lock client, but with a required sort key.
func NewWithSortKey ¶
func NewWithSortKey(dynamoDB DynamoDBClient, tableName, partitionKeyName, sortKeyName string, opts ...ClientOption) (*ClientWithSortKey, error)
NewWithSortKey creates a new dynamoDB based distributed lock client.
func (*ClientWithSortKey) AcquireLock ¶
func (c *ClientWithSortKey) AcquireLock(ctx context.Context, partitionKey, sortKey string, opts ...AcquireLockOption) (*Lock, error)
AcquireLock holds the defined lock. The given context is passed down to the underlying dynamoDB call.
func (ClientWithSortKey) Close ¶
Close releases all of the locks. The given context is passed down to the underlying dynamoDB calls.
func (*ClientWithSortKey) CreateTable ¶
func (c *ClientWithSortKey) CreateTable(ctx context.Context, opts ...CreateTableOption) (*dynamodb.CreateTableOutput, error)
CreateTable prepares a DynamoDB table with the right schema for it to be used by this locking library. The table should be set up in advance, because it takes a few minutes for DynamoDB to provision a new instance. Also, if the table already exists, it will return an error. The given context is passed down to the underlying dynamoDB call.
func (*ClientWithSortKey) Get ¶
Get finds out who owns the given lock, but does not acquire the lock. It returns the metadata currently associated with the given lock. If the client currently has the lock, it will return the lock, and operations such as releaseLock will work. However, if the client does not have the lock, then operations like releaseLock will not work (after calling Get, the caller should check lockItem.isExpired() to figure out if it currently has the lock.) If the context is canceled, it is going to return the context error on local cache hit. The given context is passed down to the underlying dynamoDB call.
func (ClientWithSortKey) ReleaseLock ¶
func (c ClientWithSortKey) ReleaseLock(ctx context.Context, lockItem *Lock, opts ...ReleaseLockOption) (bool, error)
ReleaseLock releases the given lock if the current user still has it, returning true if the lock was successfully released, and false if someone else already stole the lock or a problem happened. Deletes the lock item if it is released and deleteLockItemOnClose is set.
func (ClientWithSortKey) SendHeartbeat ¶
func (c ClientWithSortKey) SendHeartbeat(ctx context.Context, lockItem *Lock, opts ...SendHeartbeatOption) error
SendHeartbeat indicates that the given lock is still being worked on. If using WithHeartbeatPeriod > 0 when setting up this object, then this method is unnecessary, because the background thread will be periodically calling it and sending heartbeats. However, if WithHeartbeatPeriod = 0, then this method must be called to instruct DynamoDB that the lock should not be expired. The given context is passed down to the underlying dynamoDB call.
type ContextLeveledLogger ¶
type ContextLeveledLogger interface { Info(ctx context.Context, v ...interface{}) Error(ctx context.Context, v ...interface{}) }
ContextLeveledLogger defines a logger interface that can be used to pass extra information to the implementation. For example, if you use zap, you may have extra fields you want to add to the log line. You can add those extra fields to the parent context of calls like AcquireLock, and then retrieve them in your implementation of ContextLeveledLogger.
type CreateTableOption ¶
type CreateTableOption func(*createDynamoDBTableOptions)
CreateTableOption is an options type for the CreateTable method in the lock client. This allows the user to create a DynamoDB table that is lock client-compatible and specify optional parameters such as the desired throughput and whether or not to use a sort key.
func WithProvisionedThroughput ¶
func WithProvisionedThroughput(provisionedThroughput *types.ProvisionedThroughput) CreateTableOption
WithProvisionedThroughput changes the billing mode of DynamoDB and tells DynamoDB to operate in a provisioned throughput mode instead of pay-per-request
func WithTags ¶
func WithTags(tags []types.Tag) CreateTableOption
WithTags changes the tags of the table. If not specified, the table will have empty tags.
type DynamoDBClient ¶
type DynamoDBClient interface { GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) CreateTable(ctx context.Context, params *dynamodb.CreateTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error) }
DynamoDBClient defines the public interface that must be fulfilled for testing doubles.
type LeveledLogger ¶
type LeveledLogger interface { Info(v ...interface{}) Error(v ...interface{}) }
LeveledLogger defines the minimum desired logger interface for the lock client.
type Lock ¶
type Lock struct {
// contains filtered or unexported fields
}
Lock item properly speaking.
func (*Lock) AdditionalAttributes ¶
func (l *Lock) AdditionalAttributes() map[string]types.AttributeValue
AdditionalAttributes returns the lock's additional data stored during acquisition.
func (*Lock) IsAlmostExpired ¶
IsAlmostExpired returns whether or not the lock is entering the "danger zone" time period.
It returns if the lock has been released or the lock's lease has entered the "danger zone". It returns false if the lock has not been released and the lock has not yet entered the "danger zone"
type LockNotGrantedError ¶
type LockNotGrantedError struct {
// contains filtered or unexported fields
}
LockNotGrantedError indicates that an AcquireLock call has failed to establish a lock because of its current lifecycle state.
func (*LockNotGrantedError) Error ¶
func (e *LockNotGrantedError) Error() string
func (*LockNotGrantedError) Unwrap ¶
func (e *LockNotGrantedError) Unwrap() error
Unwrap reveals the underlying cause why the lock was not granted.
type Logger ¶
type Logger interface {
Println(v ...interface{})
}
Logger defines the minimum desired logger interface for the lock client.
type ReleaseLockOption ¶
type ReleaseLockOption func(*releaseLockOptions)
ReleaseLockOption provides options for releasing a lock when calling the releaseLock() method. This class contains the options that may be configured during the act of releasing a lock.
func WithDataAfterRelease ¶
func WithDataAfterRelease(data []byte) ReleaseLockOption
WithDataAfterRelease is the new data to persist to the lock (only used if deleteLock=false.) If the data is null, then the lock client will keep the data as-is and not change it.
func WithDeleteLock ¶
func WithDeleteLock(deleteLock bool) ReleaseLockOption
WithDeleteLock defines whether or not to delete the lock when releasing it. If set to false, the lock row will continue to be in DynamoDB, but it will be marked as released.
type SendHeartbeatOption ¶
type SendHeartbeatOption func(*sendHeartbeatOptions)
SendHeartbeatOption allows to proceed with Lock content changes in the heartbeat cycle.
func DeleteData ¶
func DeleteData() SendHeartbeatOption
DeleteData removes the Lock data on heartbeat.
func ReplaceHeartbeatData ¶
func ReplaceHeartbeatData(data []byte) SendHeartbeatOption
ReplaceHeartbeatData overrides the content of the Lock in the heartbeat cycle.
type TimeoutError ¶
TimeoutError indicates that the dynamolock gave up acquiring the lock. It holds the length of the attempt that resulted in the error.
func (*TimeoutError) Error ¶
func (e *TimeoutError) Error() string