Documentation ¶
Overview ¶
Package dynalock provides a locking store powered by AWS DynamoDB.
Index ¶
- Constants
- Variables
- func MarshalStruct(in interface{}) (*dynamodb.AttributeValue, error)
- func UnmarshalStruct(val *dynamodb.AttributeValue, out interface{}) error
- type Dynalock
- func (ddb *Dynalock) AtomicDelete(ctx context.Context, key string, previous *KVPair) (bool, error)
- func (ddb *Dynalock) AtomicPut(ctx context.Context, key string, options ...WriteOption) (bool, *KVPair, error)
- func (ddb *Dynalock) Delete(ctx context.Context, key string) error
- func (ddb *Dynalock) Exists(ctx context.Context, key string, options ...ReadOption) (bool, error)
- func (ddb *Dynalock) Get(ctx context.Context, key string, options ...ReadOption) (*KVPair, error)
- func (ddb *Dynalock) List(ctx context.Context, prefix string, options ...ReadOption) ([]*KVPair, error)
- func (ddb *Dynalock) NewLock(ctx context.Context, key string, options ...LockOption) (Locker, error)
- func (ddb *Dynalock) Put(ctx context.Context, key string, options ...WriteOption) error
- type DynamodbLock
- type KVPair
- type LockOption
- type LockOptions
- type Locker
- type ReadOption
- type ReadOptions
- type Store
- type WriteOption
- type WriteOptions
Examples ¶
Constants ¶
const ( // DefaultLockTTL default duration for locks DefaultLockTTL = 20 * time.Second )
Variables ¶
var ( // ErrKeyNotFound record not found in the table ErrKeyNotFound = errors.New("key not found in table") // ErrKeyExists record already exists in table ErrKeyExists = errors.New("key already exists in table") // ErrKeyModified record has been modified, this probably means someone beat you to the change/lock ErrKeyModified = errors.New("key has been modified") // ErrLockAcquireCancelled lock acquire was cancelled ErrLockAcquireCancelled = errors.New("lock acquire was cancelled") )
var ( // DefaultLockBackOff if locing is unsuccessful then this backoff will be used DefaultLockBackOff = 3 * time.Second )
Functions ¶
func MarshalStruct ¶
func MarshalStruct(in interface{}) (*dynamodb.AttributeValue, error)
MarshalStruct this helper method marshals a struct into an *dynamodb.AttributeValue which contains a map in the format required to provide to WriteWithAttributeValue.
func UnmarshalStruct ¶
func UnmarshalStruct(val *dynamodb.AttributeValue, out interface{}) error
UnmarshalStruct this helper method un-marshals a struct from an *dynamodb.AttributeValue returned by KVPair.AttributeValue.
Types ¶
type Dynalock ¶
type Dynalock struct {
// contains filtered or unexported fields
}
Dynalock lock store which is backed by AWS DynamoDB
func (*Dynalock) AtomicDelete ¶
AtomicDelete delete of a single value
This supports two different operations: * if previous is supplied assert it exists with the version supplied * if previous is nil then assert that the key doesn't exist
FIXME: should the second case just return false, nil?
func (*Dynalock) AtomicPut ¶
func (ddb *Dynalock) AtomicPut(ctx context.Context, key string, options ...WriteOption) (bool, *KVPair, error)
AtomicPut Atomic CAS operation on a single value.
func (*Dynalock) Get ¶
Get a value given its key
Example ¶
type message struct{ Message string } cfg, err := external.LoadDefaultAWSConfig() if err != nil { panic("unable to load SDK config, " + err.Error()) } dbSvc := dynamodb.New(cfg) dl := dynalock.New(dbSvc, "testing-locks", "agent") kv, _ := dl.Get(context.Background(), "agents/123") msg := &message{} av := kv.AttributeValue() dynalock.UnmarshalStruct(av, msg) fmt.Println("Message:", msg.Message)
Output:
func (*Dynalock) List ¶
func (ddb *Dynalock) List(ctx context.Context, prefix string, options ...ReadOption) ([]*KVPair, error)
List the content of a given prefix
func (*Dynalock) NewLock ¶
func (ddb *Dynalock) NewLock(ctx context.Context, key string, options ...LockOption) (Locker, error)
NewLock has to implemented at the library level since its not supported by DynamoDB
Example ¶
cfg, err := external.LoadDefaultAWSConfig() if err != nil { panic("unable to load SDK config, " + err.Error()) } dbSvc := dynamodb.New(cfg) dl := dynalock.New(dbSvc, "testing-locks", "agent") lock, _ := dl.NewLock( context.Background(), "agents/123", dynalock.LockWithTTL(2*time.Second), dynalock.LockWithBytes([]byte(`{"agent": "testing"}`)), ) lock.Lock(context.Background(), nil) defer lock.Unlock(context.Background())
Output:
func (*Dynalock) Put ¶
Put a value at the specified key
Example ¶
cfg, err := external.LoadDefaultAWSConfig() if err != nil { panic("unable to load SDK config, " + err.Error()) } dbSvc := dynamodb.New(cfg) dl := dynalock.New(dbSvc, "testing-locks", "agent") message := struct{ Message string }{Message: "hello"} attrVal, _ := dynalock.MarshalStruct(&message) dl.Put( context.Background(), "agents/123", dynalock.WriteWithAttributeValue(attrVal), )
Output:
type DynamodbLock ¶
type DynamodbLock struct {
// contains filtered or unexported fields
}
type KVPair ¶
type KVPair struct { Partition string `dynamodbav:"id"` Key string `dynamodbav:"name"` Version int64 `dynamodbav:"version"` Expires int64 `dynamodbav:"expires"` // contains filtered or unexported fields }
KVPair represents {Key, Value, Version} tuple, internally this uses a *dynamodb.AttributeValue which can be used to store strings, slices or structs
func (*KVPair) AttributeValue ¶
func (kv *KVPair) AttributeValue() *dynamodb.AttributeValue
AttributeValue return the current dynamodb attribute value, may be nil
func (*KVPair) BytesValue ¶
BytesValue use the attribute to return a slice of bytes, a nil will be returned if it is empty or nil
type LockOption ¶
type LockOption func(opts *LockOptions)
LockOption assign various settings to the lock options
func LockWithBytes ¶
func LockWithBytes(val []byte) LockOption
LockWithBytes byte slice to the key which is written when the lock is acquired
func LockWithRenewLock ¶
func LockWithRenewLock(renewLockChan chan struct{}) LockOption
LockWithRenewLock renewal channel to the lock
func LockWithTTL ¶
func LockWithTTL(ttl time.Duration) LockOption
LockWithTTL time to live (TTL) to the key which is written when the lock is acquired
type LockOptions ¶
type LockOptions struct {
// contains filtered or unexported fields
}
LockOptions contains optional request parameters
func NewLockOptions ¶
func NewLockOptions(opts ...LockOption) *LockOptions
NewLockOptions create lock options, assign defaults then accept overrides
type Locker ¶
type Locker interface { // Lock attempt to lock the store record, this will BLOCK and retry at a rate of once every 3 seconds Lock(ctx context.Context, stopChan chan struct{}) (<-chan struct{}, error) // Unlock this will unlock and perfom a DELETE to remove the store record Unlock(ctx context.Context) error }
Locker provides locking mechanism on top of the store. Similar to `sync.Lock` except it may return errors.
type ReadOption ¶
type ReadOption func(opts *ReadOptions)
ReadOption assign various settings to the read options
func ReadConsistentDisable ¶
func ReadConsistentDisable() ReadOption
ReadConsistentDisable disable consistent reads
type ReadOptions ¶
type ReadOptions struct {
// contains filtered or unexported fields
}
ReadOptions contains optional request parameters
func NewReadOptions ¶
func NewReadOptions(opts ...ReadOption) *ReadOptions
NewReadOptions create read options, assign defaults then accept overrides enable the read consistent flag by default
type Store ¶
type Store interface { // Put a value at the specified key Put(ctx context.Context, key string, options ...WriteOption) error // Get a value given its key Get(ctx context.Context, key string, options ...ReadOption) (*KVPair, error) // List the content of a given prefix List(ctx context.Context, prefix string, options ...ReadOption) ([]*KVPair, error) // Delete the value at the specified key Delete(ctx context.Context, key string) error // Verify if a Key exists in the store Exists(ctx context.Context, key string, options ...ReadOption) (bool, error) // NewLock creates a lock for a given key. // The returned Locker is not held and must be acquired // with `.Lock`. The Value is optional. NewLock(ctx context.Context, key string, options ...LockOption) (Locker, error) // Atomic CAS operation on a single value. // Pass previous = nil to create a new key. // Pass previous = kv to update an existing value. AtomicPut(ctx context.Context, key string, options ...WriteOption) (bool, *KVPair, error) // Atomic delete of a single value AtomicDelete(ctx context.Context, key string, previous *KVPair) (bool, error) }
Store represents the backend K/V storage
type WriteOption ¶
type WriteOption func(opts *WriteOptions)
WriteOption assign various settings to the write options
func WriteWithAttributeValue ¶
func WriteWithAttributeValue(av *dynamodb.AttributeValue) WriteOption
WriteWithAttributeValue dynamodb attribute value which is written
func WriteWithBytes ¶
func WriteWithBytes(val []byte) WriteOption
WriteWithBytes byte slice to the key which is written
func WriteWithNoExpires ¶
func WriteWithNoExpires() WriteOption
WriteWithNoExpires time to live (TTL) is set not set so it never expires
func WriteWithPreviousKV ¶
func WriteWithPreviousKV(previous *KVPair) WriteOption
WriteWithPreviousKV previous KV which will be checked prior to update
func WriteWithTTL ¶
func WriteWithTTL(ttl time.Duration) WriteOption
WriteWithTTL time to live (TTL) to the key which is written
type WriteOptions ¶
type WriteOptions struct {
// contains filtered or unexported fields
}
WriteOptions contains optional request parameters
func NewWriteOptions ¶
func NewWriteOptions(opts ...WriteOption) *WriteOptions
NewWriteOptions create write options, assign defaults then accept overrides
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
competing-consumers
We have a table containing some entries, for each one we need to lock, do some work, then unlock the entry.
|
We have a table containing some entries, for each one we need to lock, do some work, then unlock the entry. |