dynamolock

package module
v3.0.0-...-c58a494 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

README

DynamoDB Lock Client for Go v3

SLA

This repository is covered by this SLA.

The dymanoDB Lock Client for Go is a general purpose distributed locking library built for DynamoDB. The dynamoDB Lock Client for Go supports both fine-grained and coarse-grained locking as the lock keys can be any arbitrary string, up to a certain length. Please create issues in the GitHub repository with questions, pull request are very much welcome.

It is a port in Go of Amazon's original dynamodb-lock-client using the AWS's latest Go SDK.

Use cases

A common use case for this lock client is: let's say you have a distributed system that needs to periodically do work on a given campaign (or a given customer, or any other object) and you want to make sure that two boxes don't work on the same campaign/customer at the same time. An easy way to fix this is to write a system that takes a lock on a customer, but fine-grained locking is a tough problem. This library attempts to simplify this locking problem on top of DynamoDB.

Another use case is leader election. If you only want one host to be the leader, then this lock client is a great way to pick one. When the leader fails, it will fail over to another host within a customizable leaseDuration that you set.

Getting Started

To use the DynamoDB Lock Client for Go, you must make it sure it is present in $GOPATH or in your vendor directory.

$ go get -u cirello.io/dynamolock/v3

This package has the go.mod file to be used with Go's module system.

Then, you need to set up a DynamoDB table that has a hash key on a key with the name key. For your convenience, there is a function in the package called CreateTable that you can use to set up your table, but it is also possible to set up the table in the AWS Console. The table should be created in advance, since it takes a couple minutes for DynamoDB to provision your table for you. The package level documentation comment has an example of how to use this package.

First you have to create the table and wait for DynamoDB to complete:

package main

import (
	"context"
	"log"

	"cirello.io/dynamolock/v3"
	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

func main() {
	cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion("us-west-2"))
	if err != nil {
		log.Fatal(err)
	}
	c, err := dynamolock.New(dynamodb.NewFromConfig(cfg),
		"locks",
		dynamolock.WithLeaseDuration(3*time.Second),
		dynamolock.WithHeartbeatPeriod(1*time.Second),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close(context.Background())

	log.Println("ensuring table exists")
	_, err := c.CreateTable(context.Background(),
		dynamolock.WithProvisionedThroughput(&types.ProvisionedThroughput{
			ReadCapacityUnits:  aws.Int64(5),
			WriteCapacityUnits: aws.Int64(5),
		}),
	)
	if err != nil {
		log.Fatal(err)
	}
}

Once you see the table is created in the DynamoDB console, you should be ready to run:

package main

import (
	"context"
	"log"

	"cirello.io/dynamolock/v3"
	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/dynamodb"
)

func main() {
	cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion("us-west-2"))
	if err != nil {
		log.Fatal(err)
	}
	c, err := dynamolock.New(dynamodb.NewFromConfig(cfg),
		"locks",
		dynamolock.WithLeaseDuration(3*time.Second),
		dynamolock.WithHeartbeatPeriod(1*time.Second),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close(context.Background())

	data := []byte("some content a")
	lockedItem, err := c.AcquireLock(context.Background(), "spock",
		dynamolock.WithData(data),
		dynamolock.ReplaceData(),
	)
	if err != nil {
		log.Fatal(err)
	}

	log.Println("lock content:", string(lockedItem.Data()))
	if got := string(lockedItem.Data()); string(data) != got {
		log.Println("losing information inside lock storage, wanted:", string(data), " got:", got)
	}

	log.Println("cleaning lock")
	success, err := c.ReleaseLock(context.Background(), lockedItem)
	if !success {
		log.Fatal("lost lock before release")
	}
	if err != nil {
		log.Fatal("error releasing lock:", err)
	}
	log.Println("done")
}

Selected Features

Send Automatic Heartbeats

When you create the lock client, you can specify WithHeartbeatPeriod(time.Duration) like in the above example, and it will spawn a background goroutine that continually updates the record version number on your locks to prevent them from expiring (it does this by calling the SendHeartbeat() method in the lock client.) This will ensure that as long as your application is running, your locks will not expire until you call ReleaseLock() or lockItem.Close()

Read the data in a lock without acquiring it

You can read the data in the lock without acquiring it, and find out who owns the lock. Here's how:

ctx := context.Background() // or some other context
lock, err := lockClient.Get(ctx, "kirk");

Logic to avoid problems with clock skew

The lock client never stores absolute times in DynamoDB -- only the relative "lease duration" time is stored in DynamoDB. The way locks are expired is that a call to acquireLock reads in the current lock, checks the RecordVersionNumber of the lock (which is a GUID) and starts a timer. If the lock still has the same GUID after the lease duration time has passed, the client will determine that the lock is stale and expire it.

What this means is that, even if two different machines disagree about what time it is, they will still avoid clobbering each other's locks.

Required DynamoDB Actions

For an IAM role to take full advantage of dynamolock/**v2**, it must be allowed to perform all of the following actions on the DynamoDB table containing the locks:

  • GetItem
  • PutItem
  • UpdateItem
  • DeleteItem
  • CreateTable

Documentation

Overview

Package dynamolock provides a simple utility for using DynamoDB's consistent read/write feature to use it for managing distributed locks.

In order to use this package, the client must create a table in DynamoDB, although the client provides a convenience method for creating that table (CreateTable).

Basic usage:

	import (
		"context"
		"log"

		"cirello.io/dynamolock/v3"
		"github.com/aws/aws-sdk-go-v2/aws"
		"github.com/aws/aws-sdk-go-v2/config"
		"github.com/aws/aws-sdk-go-v2/service/dynamodb"
		"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
	)

	//---

	cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion("us-west-2"))
	if err != nil {
		log.Fatal(err)
	}
	c, err := dynamolock.New(dynamodb.NewFromConfig(cfg),
		"locks",
		dynamolock.WithLeaseDuration(3*time.Second),
		dynamolock.WithHeartbeatPeriod(1*time.Second),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close(context.Background())

	log.Println("ensuring table exists")
	c.CreateTable(context.Background(),
		dynamolock.WithProvisionedThroughput(&types.ProvisionedThroughput{
			ReadCapacityUnits:  aws.Int64(5),
			WriteCapacityUnits: aws.Int64(5),
		}),
	)

 //-- at this point you must wait for DynamoDB to complete the creation.

	data := []byte("some content a")
	lockedItem, err := c.AcquireLock(context.Background(), "spock",
		dynamolock.WithData(data),
		dynamolock.ReplaceData(),
	)
	if err != nil {
		log.Fatal(err)
	}

	log.Println("lock content:", string(lockedItem.Data()))
	if got := string(lockedItem.Data()); string(data) != got {
		log.Println("losing information inside lock storage, wanted:", string(data), " got:", got)
	}

	log.Println("cleaning lock")
	success, err := c.ReleaseLock(context.Background(), lockedItem)
	if !success {
		log.Fatal("lost lock before release")
	}
	if err != nil {
		log.Fatal("error releasing lock:", err)
	}
	log.Println("done")

This package is covered by this SLA: https://github.com/cirello-io/public/blob/master/SLA.md

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSessionMonitorNotSet  = errors.New("session monitor is not set")
	ErrLockAlreadyReleased   = errors.New("lock is already released")
	ErrCannotReleaseNullLock = errors.New("cannot release null lock item")
	ErrOwnerMismatched       = errors.New("lock owner mismatched")
)

Errors related to session manager life-cycle.

View Source
var ErrClientClosed = errors.New("client already closed")

ErrClientClosed reports the client cannot be used because it is already closed.

Functions

This section is empty.

Types

type AcquireLockOption

type AcquireLockOption func(*acquireLockOptions)

AcquireLockOption allows to change how the lock is actually held by the client.

func FailIfLocked

func FailIfLocked() AcquireLockOption

FailIfLocked will not retry to acquire the lock, instead returning.

func ReplaceData

func ReplaceData() AcquireLockOption

ReplaceData will force the new content to be stored in the key.

func WithAdditionalAttributes

func WithAdditionalAttributes(attr map[string]types.AttributeValue) AcquireLockOption

WithAdditionalAttributes stores some additional attributes with each lock. This can be used to add any arbitrary parameters to each lock row.

func WithAdditionalTimeToWaitForLock

func WithAdditionalTimeToWaitForLock(d time.Duration) AcquireLockOption

WithAdditionalTimeToWaitForLock defines how long to wait in addition to the lease duration (if set to 10 minutes, this will try to acquire a lock for at least 10 minutes before giving up and returning an error).

func WithData

func WithData(b []byte) AcquireLockOption

WithData stores the content into the lock itself.

func WithDeleteLockOnRelease

func WithDeleteLockOnRelease() AcquireLockOption

WithDeleteLockOnRelease defines whether or not the lock should be deleted when Close() is called on the resulting LockItem will force the new content to be stored in the key.

func WithRefreshPeriod

func WithRefreshPeriod(d time.Duration) AcquireLockOption

WithRefreshPeriod defines how long to wait before trying to get the lock again (if set to 10 seconds, for example, it would attempt to do so every 10 seconds).

func WithSessionMonitor

func WithSessionMonitor(safeTime time.Duration, callback func()) AcquireLockOption

WithSessionMonitor registers a callback that is triggered if the lock is about to expire.

The purpose of this construct is to provide two abilities: provide the ability to determine if the lock is about to expire, and run a user-provided callback when the lock is about to expire. The advantage this provides is notification that your lock is about to expire before it is actually expired, and in case of leader election will help in preventing that there are no two leaders present simultaneously.

If due to any reason heartbeating is unsuccessful for a configurable period of time, your lock enters into a phase known as "danger zone." It is during this "danger zone" that the callback will be run.

Bear in mind that the callback may be null. In this case, no callback will be run upon the lock entering the "danger zone"; yet, one can still make use of the Lock.IsAlmostExpired() call. Furthermore, non-null callbacks can only ever be executed once in a lock's lifetime. Independent of whether or not a callback is run, the client will attempt to heartbeat the lock until the lock is released or obtained by someone else.

Consider an example which uses this mechanism for leader election. One way to make use of this SessionMonitor is to register a callback that kills the instance in case the leader's lock enters the danger zone:

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a dynamoDB based distributed lock client.

func New

func New(dynamoDB DynamoDBClient, tableName, partitionKeyName string, opts ...ClientOption) (*Client, error)

New creates a new dynamoDB based distributed lock client.

func (*Client) AcquireLock

func (c *Client) AcquireLock(ctx context.Context, partitionKey string, opts ...AcquireLockOption) (*Lock, error)

AcquireLock holds the defined lock. The given context is passed down to the underlying dynamoDB call.

func (Client) Close

func (c Client) Close(ctx context.Context) error

Close releases all of the locks. The given context is passed down to the underlying dynamoDB calls.

func (*Client) CreateTable

func (c *Client) CreateTable(ctx context.Context, opts ...CreateTableOption) (*dynamodb.CreateTableOutput, error)

CreateTable prepares a DynamoDB table with the right schema for it to be used by this locking library. The table should be set up in advance, because it takes a few minutes for DynamoDB to provision a new instance. Also, if the table already exists, it will return an error. The given context is passed down to the underlying dynamoDB call.

func (*Client) Get

func (c *Client) Get(ctx context.Context, partitionKey string) (*Lock, error)

Get finds out who owns the given lock, but does not acquire the lock. It returns the metadata currently associated with the given lock. If the client currently has the lock, it will return the lock, and operations such as releaseLock will work. However, if the client does not have the lock, then operations like releaseLock will not work (after calling Get, the caller should check lockItem.isExpired() to figure out if it currently has the lock.) If the context is canceled, it is going to return the context error on local cache hit. The given context is passed down to the underlying dynamoDB call.

func (Client) ReleaseLock

func (c Client) ReleaseLock(ctx context.Context, lockItem *Lock, opts ...ReleaseLockOption) (bool, error)

ReleaseLock releases the given lock if the current user still has it, returning true if the lock was successfully released, and false if someone else already stole the lock or a problem happened. Deletes the lock item if it is released and deleteLockItemOnClose is set.

func (Client) SendHeartbeat

func (c Client) SendHeartbeat(ctx context.Context, lockItem *Lock, opts ...SendHeartbeatOption) error

SendHeartbeat indicates that the given lock is still being worked on. If using WithHeartbeatPeriod > 0 when setting up this object, then this method is unnecessary, because the background thread will be periodically calling it and sending heartbeats. However, if WithHeartbeatPeriod = 0, then this method must be called to instruct DynamoDB that the lock should not be expired. The given context is passed down to the underlying dynamoDB call.

type ClientOption

type ClientOption func(*commonClient)

ClientOption reconfigure the lock client creation.

func DisableHeartbeat

func DisableHeartbeat() ClientOption

DisableHeartbeat disables automatic hearbeats. Use SendHeartbeat to freshen up the lock.

func WithContextLeveledLogger

func WithContextLeveledLogger(l ContextLeveledLogger) ClientOption

WithContextLeveledLogger injects a logger into the client, so its internals can be recorded.

func WithHeartbeatPeriod

func WithHeartbeatPeriod(d time.Duration) ClientOption

WithHeartbeatPeriod defines the frequency of the heartbeats. Set to zero to disable it. Heartbeats should have no more than half of the duration of the lease.

func WithLeaseDuration

func WithLeaseDuration(d time.Duration) ClientOption

WithLeaseDuration defines how long should the lease be held.

func WithLeveledLogger

func WithLeveledLogger(l LeveledLogger) ClientOption

WithLeveledLogger injects a logger into the client, so its internals can be recorded.

func WithLogger

func WithLogger(l Logger) ClientOption

WithLogger injects a logger into the client, so its internals can be recorded.

func WithOwnerName

func WithOwnerName(s string) ClientOption

WithOwnerName changes the owner linked to the client, and by consequence to locks.

type ClientWithSortKey

type ClientWithSortKey struct {
	// contains filtered or unexported fields
}

ClientWithSortKey is a dynamoDB based distributed lock client, but with a required sort key.

func NewWithSortKey

func NewWithSortKey(dynamoDB DynamoDBClient, tableName, partitionKeyName, sortKeyName string, opts ...ClientOption) (*ClientWithSortKey, error)

NewWithSortKey creates a new dynamoDB based distributed lock client.

func (*ClientWithSortKey) AcquireLock

func (c *ClientWithSortKey) AcquireLock(ctx context.Context, partitionKey, sortKey string, opts ...AcquireLockOption) (*Lock, error)

AcquireLock holds the defined lock. The given context is passed down to the underlying dynamoDB call.

func (ClientWithSortKey) Close

func (c ClientWithSortKey) Close(ctx context.Context) error

Close releases all of the locks. The given context is passed down to the underlying dynamoDB calls.

func (*ClientWithSortKey) CreateTable

CreateTable prepares a DynamoDB table with the right schema for it to be used by this locking library. The table should be set up in advance, because it takes a few minutes for DynamoDB to provision a new instance. Also, if the table already exists, it will return an error. The given context is passed down to the underlying dynamoDB call.

func (*ClientWithSortKey) Get

func (c *ClientWithSortKey) Get(ctx context.Context, partitionKey, sortKey string) (*Lock, error)

Get finds out who owns the given lock, but does not acquire the lock. It returns the metadata currently associated with the given lock. If the client currently has the lock, it will return the lock, and operations such as releaseLock will work. However, if the client does not have the lock, then operations like releaseLock will not work (after calling Get, the caller should check lockItem.isExpired() to figure out if it currently has the lock.) If the context is canceled, it is going to return the context error on local cache hit. The given context is passed down to the underlying dynamoDB call.

func (ClientWithSortKey) ReleaseLock

func (c ClientWithSortKey) ReleaseLock(ctx context.Context, lockItem *Lock, opts ...ReleaseLockOption) (bool, error)

ReleaseLock releases the given lock if the current user still has it, returning true if the lock was successfully released, and false if someone else already stole the lock or a problem happened. Deletes the lock item if it is released and deleteLockItemOnClose is set.

func (ClientWithSortKey) SendHeartbeat

func (c ClientWithSortKey) SendHeartbeat(ctx context.Context, lockItem *Lock, opts ...SendHeartbeatOption) error

SendHeartbeat indicates that the given lock is still being worked on. If using WithHeartbeatPeriod > 0 when setting up this object, then this method is unnecessary, because the background thread will be periodically calling it and sending heartbeats. However, if WithHeartbeatPeriod = 0, then this method must be called to instruct DynamoDB that the lock should not be expired. The given context is passed down to the underlying dynamoDB call.

type ContextLeveledLogger

type ContextLeveledLogger interface {
	Info(ctx context.Context, v ...interface{})
	Error(ctx context.Context, v ...interface{})
}

ContextLeveledLogger defines a logger interface that can be used to pass extra information to the implementation. For example, if you use zap, you may have extra fields you want to add to the log line. You can add those extra fields to the parent context of calls like AcquireLock, and then retrieve them in your implementation of ContextLeveledLogger.

type CreateTableOption

type CreateTableOption func(*createDynamoDBTableOptions)

CreateTableOption is an options type for the CreateTable method in the lock client. This allows the user to create a DynamoDB table that is lock client-compatible and specify optional parameters such as the desired throughput and whether or not to use a sort key.

func WithProvisionedThroughput

func WithProvisionedThroughput(provisionedThroughput *types.ProvisionedThroughput) CreateTableOption

WithProvisionedThroughput changes the billing mode of DynamoDB and tells DynamoDB to operate in a provisioned throughput mode instead of pay-per-request

func WithTags

func WithTags(tags []types.Tag) CreateTableOption

WithTags changes the tags of the table. If not specified, the table will have empty tags.

type DynamoDBClient

type DynamoDBClient interface {
	GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error)
	PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error)
	UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
	DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error)
	CreateTable(ctx context.Context, params *dynamodb.CreateTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error)
}

DynamoDBClient defines the public interface that must be fulfilled for testing doubles.

type LeveledLogger

type LeveledLogger interface {
	Info(v ...interface{})
	Error(v ...interface{})
}

LeveledLogger defines the minimum desired logger interface for the lock client.

type Lock

type Lock struct {
	// contains filtered or unexported fields
}

Lock item properly speaking.

func (*Lock) AdditionalAttributes

func (l *Lock) AdditionalAttributes() map[string]types.AttributeValue

AdditionalAttributes returns the lock's additional data stored during acquisition.

func (*Lock) Close

func (l *Lock) Close() error

Close releases the lock.

func (*Lock) Data

func (l *Lock) Data() []byte

Data returns the content of the lock, if any is available.

func (*Lock) IsAlmostExpired

func (l *Lock) IsAlmostExpired() (bool, error)

IsAlmostExpired returns whether or not the lock is entering the "danger zone" time period.

It returns if the lock has been released or the lock's lease has entered the "danger zone". It returns false if the lock has not been released and the lock has not yet entered the "danger zone"

func (*Lock) IsExpired

func (l *Lock) IsExpired() bool

IsExpired returns if the lock is expired, released, or neither.

func (*Lock) OwnerName

func (l *Lock) OwnerName() string

OwnerName returns the lock's owner.

type LockNotGrantedError

type LockNotGrantedError struct {
	// contains filtered or unexported fields
}

LockNotGrantedError indicates that an AcquireLock call has failed to establish a lock because of its current lifecycle state.

func (*LockNotGrantedError) Error

func (e *LockNotGrantedError) Error() string

func (*LockNotGrantedError) Unwrap

func (e *LockNotGrantedError) Unwrap() error

Unwrap reveals the underlying cause why the lock was not granted.

type Logger

type Logger interface {
	Println(v ...interface{})
}

Logger defines the minimum desired logger interface for the lock client.

type ReleaseLockOption

type ReleaseLockOption func(*releaseLockOptions)

ReleaseLockOption provides options for releasing a lock when calling the releaseLock() method. This class contains the options that may be configured during the act of releasing a lock.

func WithDataAfterRelease

func WithDataAfterRelease(data []byte) ReleaseLockOption

WithDataAfterRelease is the new data to persist to the lock (only used if deleteLock=false.) If the data is null, then the lock client will keep the data as-is and not change it.

func WithDeleteLock

func WithDeleteLock(deleteLock bool) ReleaseLockOption

WithDeleteLock defines whether or not to delete the lock when releasing it. If set to false, the lock row will continue to be in DynamoDB, but it will be marked as released.

type SendHeartbeatOption

type SendHeartbeatOption func(*sendHeartbeatOptions)

SendHeartbeatOption allows to proceed with Lock content changes in the heartbeat cycle.

func DeleteData

func DeleteData() SendHeartbeatOption

DeleteData removes the Lock data on heartbeat.

func ReplaceHeartbeatData

func ReplaceHeartbeatData(data []byte) SendHeartbeatOption

ReplaceHeartbeatData overrides the content of the Lock in the heartbeat cycle.

type TimeoutError

type TimeoutError struct {
	Age time.Duration
}

TimeoutError indicates that the dynamolock gave up acquiring the lock. It holds the length of the attempt that resulted in the error.

func (*TimeoutError) Error

func (e *TimeoutError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL