zookeeper

package
v4.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

README

Overview

This package provides a ZooKeeper backed, coarse grained distributed lock. The lock path is determined at instantiation time. At request time, locks are enqueued and block until the lock is either acquired or the context deadline is met. Locks can be configured with an optional TTL.

Further implementation notes:

  • Locks are enqueued and granted in order as locks ahead are relinquished or timed out.
  • Session timeouts/disconnects are handled through ZooKeeper sessions with automatic cleanup; locks that fail to acquire before the context timeout are removed from the queue even if the lock session is still active.
  • Setting a ZooKeeperLockConfig.TTL value > 0 enables lock TTLs. Take note that TTL expirations are handled at request time from contending locks; if service A is not using TTLs and service B is, service B can forcibly abort service A locks.

Examples

Example implementation in zookeeper-example:

package main

import (
	"context"
	"flag"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	zklocking "github.com/DataDog/kafka-kit/v4/cluster/zookeeper"
)

func main() {
	timeout := flag.Duration("timeout", 3*time.Second, "lock wait timeout")
	owner := flag.String("owner", "user1", "the lock owner ID")
	flag.Parse()

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	// Init a Lock.
	cfg := zklocking.ZooKeeperLockConfig{
		Address:  "localhost:2181",
		Path:     "/my/locks",
		TTL: 30000,
		OwnerKey: "owner",
	}

	lock, _ := zklocking.NewZooKeeperLock(cfg)
	ctx, c := context.WithTimeout(context.WithValue(context.Background(), "owner", *owner), *timeout)
	defer c()

	// Try the lock.
	if err := lock.Lock(ctx); err != nil {
		log.Println(err)
	} else {
		log.Println("I've got the lock!")
		defer log.Println("I've released the lock")
		defer lock.Unlock(ctx)
	}

	<-sigs
}

Running the test in two terminals:

% ./zookeeper-example
2021/12/08 10:46:31 I've got the lock!
% ./zookeeper-example
2021/12/08 10:46:39 attempt to acquire lock timed out

Same test, exiting the first lock while the second lock is waiting:

% ./zookeeper-example
2021/12/08 10:46:58 I've got the lock!
^C2021/12/08 10:47:00 I've released the lock
% ./zookeeper-example
2021/12/08 10:47:00 I've got the lock!

Ephemeral lock znodes visible at the configured path:

[zk: localhost:2181(CONNECTED) 8] ls /my/locks
[_c_83c1bcf372c265e9ac7ee364e5d3bac5-lock-0000000027, _c_979cb11f40bb3dbc6908edeaac8f2de1-lock-0000000028]
[zk: localhost:2181(CONNECTED) 9] ls /my/locks
[_c_64c30aea1b15839542824a7b47d49ce3-lock-0000000029]

Documentation

Overview

package zookeeper implements a ZooKeeper based Lock.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrAlreadyOwnLock is returned if Lock is called with a context holding an
	// OwnerKey equal to that of an active lock.
	ErrAlreadyOwnLock = errors.New("requestor already has an active lock")
	// ErrLockingTimedOut is returned when a lock couldn't be acquired  by the
	// context deadline.
	ErrLockingTimedOut = errors.New("attempt to acquire lock timed out")
	// ErrInvalidSeqNode is returned when sequential znodes are being parsed for
	// a trailing integer ID, but one isn't found.
	ErrInvalidSeqNode = errors.New("znode doesn't appear to be a sequential type")
	// ErrNotLockOwner is returned when Unlock is attempting to be called where the
	// requestor's OwnerKey value does not equal the current lock owner.
	ErrNotLockOwner = errors.New("non-owner attempted to call unlock")
	// ErrOwnerAlreadySet is returned when SetOwner is being called on a lock where
	// the owner field is non-nil.
	ErrOwnerAlreadySet = errors.New("attempt to set owner on a claimed lock")
)

Functions

This section is empty.

Types

type ErrExpireLockFailed

type ErrExpireLockFailed struct {
	// contains filtered or unexported fields
}

ErrExpireLockFailed is returned when a lock with an expired TTL fails to purge.

func (ErrExpireLockFailed) Error

func (err ErrExpireLockFailed) Error() string

Error returns an error string.

type ErrLockingFailed

type ErrLockingFailed struct {
	// contains filtered or unexported fields
}

ErrLockingFailed is a general failure.

func (ErrLockingFailed) Error

func (err ErrLockingFailed) Error() string

Error returns an error string.

type ErrUnlockingFailed

type ErrUnlockingFailed struct {
	// contains filtered or unexported fields
}

ErrUnlockingFailed is a general failure.

func (ErrUnlockingFailed) Error

func (err ErrUnlockingFailed) Error() string

Error returns an error string.

type LockEntries

type LockEntries struct {
	// contains filtered or unexported fields
}

LockEntries is a container of locks.

func (LockEntries) First

func (le LockEntries) First() (int, error)

First returns the ID with the lowest value.

func (LockEntries) IDs

func (le LockEntries) IDs() []int

IDs returns all held lock IDs ascending.

func (LockEntries) LockAhead

func (le LockEntries) LockAhead(id int) (int, error)

LockAhead returns the lock ahead of the ID provided.

func (LockEntries) LockPath

func (le LockEntries) LockPath(id int) (string, error)

LockPath takes a lock ID and returns the znode path.

type ZooKeeperClient

type ZooKeeperClient interface {
	Children(string) ([]string, *zk.Stat, error)
	Create(string, []byte, int32, []zk.ACL) (string, error)
	CreateProtectedEphemeralSequential(string, []byte, []zk.ACL) (string, error)
	Delete(string, int32) error
	Get(string) ([]byte, *zk.Stat, error)
	GetW(string) ([]byte, *zk.Stat, <-chan zk.Event, error)
}

ZooKeeperClient interface.

type ZooKeeperLock

type ZooKeeperLock struct {
	Path     string
	OwnerKey string
	TTL      int
	// contains filtered or unexported fields
}

ZooKeeperLock implements a Lock.

func NewZooKeeperLock

func NewZooKeeperLock(c ZooKeeperLockConfig) (*ZooKeeperLock, error)

NewZooKeeperLock returns a ZooKeeperLock.

func NewZooKeeperLockWithClient

func NewZooKeeperLockWithClient(cfg ZooKeeperLockConfig, zkc ZooKeeperClient) (*ZooKeeperLock, error)

NewZooKeeperLock takes a ZooKeeperLockConfig and ZooKeeperClient and returns a ZooKeeperLock. Any initialization (such as path creation) should be performed outside of this initializer.

func (*ZooKeeperLock) Lock

func (z *ZooKeeperLock) Lock(ctx context.Context) error

Lock attemps to acquire a lock. If the lock cannot be acquired by the context deadline, the lock attempt times out.

func (*ZooKeeperLock) Owner

func (z *ZooKeeperLock) Owner() interface{}

Owner returns the current lock owner.

func (*ZooKeeperLock) Unlock

func (z *ZooKeeperLock) Unlock(ctx context.Context) error

Unlock releases a lock.

func (*ZooKeeperLock) UnlockLogError

func (z *ZooKeeperLock) UnlockLogError(ctx context.Context)

Unlock releases a lock and logs, rather than returning, any errors if encountered.

type ZooKeeperLockConfig

type ZooKeeperLockConfig struct {
	// The address of the ZooKeeper cluster.
	Address string
	// The locking path; this is the register that locks are attempting to acquire.
	Path string
	// A non-zero TTL sets a limit (in milliseconds) on how long a lock is possibly
	// valid for. Once this limit is exceeded, any new lock claims can destroy those
	// exceeding their TTL.
	TTL int
	// An optional lock ownership identifier. Context values can be inspected to
	// determine if a lock owner already has the lock. For example, if we specify
	// an OwnerKey configuration value of UserID, any successful lock claim will
	// set the lock owner as the value of UserID from the context received. Any
	// successive calls to Lock() with the same UserID context value will also
	// succeed. As a safety, this is not a distributed feature and is scoped to the
	// ZooKeeperLock instance; attempting to have two processes claim a lock
	// on the same path with the same OwnerKey/value will result in only one lock
	// being granted. This also prevents a concurrent program sharing a ZooKeeperLock
	// from allowing requestors to call Unlock on a lock that it doesn't own.
	OwnerKey string
}

ZooKeeperLockConfig holds ZooKeeperLock configurations.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL