distlock

package
v0.2.0-alpha.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: MIT Imports: 18 Imported by: 0

README

分布式锁

支持以下类型的分布式锁:

  • Noop(假的,勿使用)
  • MySQL
  • PostgreSQL
  • Redis
  • Etcd
  • Zookeeper
  • Consul
  • Memcached
  • MongoDB

测试情况

  • 已测试:MySQL、PostgreSQL、Redis
  • 未测试(使用前建议你自己充分测试下):Etcd、Zookeeper、Consul、Memcached、MongoDB

GPT Prompt

package distlock

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/go-sql-driver/mysql"
    "github.com/jackc/pgx/v5/pgconn"
    "gorm.io/gorm"

    "github.com/superproj/onex/pkg/logger"
)

// GORMLocker provides a distributed locking mechanism using GORM.
type GORMLocker struct {
    db          *gorm.DB
    lockName    string
    lockTimeout time.Duration
    renewTicker *time.Ticker
    stopChan    chan struct{}
    mu          sync.Mutex
    ownerID     string
    logger      logger.Logger
}

// Lock represents a database record for a distributed lock.
type Lock struct {
    ID        uint   `gorm:"primarykey"`
    Name      string `gorm:"unique"`
    OwnerID   string
    ExpiredAt time.Time
    CreatedAt time.Time
    UpdatedAt time.Time
}

// Ensure GORMLocker implements the Locker interface.
var _ Locker = (*GORMLocker)(nil)

// NewGORMLocker initializes a new GORMLocker instance.
func NewGORMLocker(db *gorm.DB, opts ...Option) (*GORMLocker, error) {
    o := ApplyOptions(opts...)

    if err := db.AutoMigrate(&Lock{}); err != nil {
        return nil, err
    }

    locker := &GORMLocker{
        db:          db,
        ownerID:     o.ownerID,
        lockName:    o.lockName,
        lockTimeout: o.lockTimeout,
        stopChan:    make(chan struct{}),
        logger:      o.logger,
    }

    locker.logger.Info("GORMLocker initialized", "lockName", locker.lockName, "ownerID", locker.ownerID)

    return locker, nil
}

// Lock acquires the distributed lock.
func (l *GORMLocker) Lock(ctx context.Context) error {
    l.mu.Lock()
    defer l.mu.Unlock()

    now := time.Now()
    expiredAt := now.Add(l.lockTimeout)

    err := l.db.Transaction(func(tx *gorm.DB) error {
        if err := tx.Create(&Lock{Name: l.lockName, OwnerID: l.ownerID, ExpiredAt: expiredAt}).Error; err != nil {
            if !isDuplicateEntry(err) {
                l.logger.Error("failed to create lock", "error", err)
                return err
            }

            var lock Lock
            if err := tx.First(&lock, "name = ?", l.lockName).Error; err != nil {
                l.logger.Error("failed to fetch existing lock", "error", err)
                return err
            }

            if !lock.ExpiredAt.Before(now) {
                l.logger.Warn("lock is already held by another owner", "ownerID", lock.OwnerID)
                return fmt.Errorf("lock is already held by %s", lock.OwnerID)
            }

            lock.OwnerID = l.ownerID
            lock.ExpiredAt = expiredAt
            if err := tx.Save(&lock).Error; err != nil {
                l.logger.Error("failed to update expired lock", "error", err)
                return err
            }
            l.logger.Info("Lock expired, updated owner", "lockName", l.lockName, "newOwnerID", l.ownerID)
        }

        l.renewTicker = time.NewTicker(l.lockTimeout / 2)
        go l.renewLock(ctx)

        l.logger.Info("Lock acquired", "lockName", l.lockName, "ownerID", l.ownerID)
        return nil
    })

    return err
}

// Unlock releases the distributed lock.
func (l *GORMLocker) Unlock(ctx context.Context) error {
    l.mu.Lock()
    defer l.mu.Unlock()

    if l.renewTicker != nil {
        l.renewTicker.Stop()
        l.renewTicker = nil
        l.logger.Info("Stopped renewing lock", "lockName", l.lockName)
    }

    err := l.db.Delete(&Lock{}, "name = ?", l.lockName).Error
    if err != nil {
        l.logger.Error("failed to delete lock", "error", err)
        return err
    }

    l.logger.Info("Lock released", "lockName", l.lockName)
    return nil
}

// Renew refreshes the lease for the distributed lock.
func (l *GORMLocker) Renew(ctx context.Context) error {
    l.mu.Lock()
    defer l.mu.Unlock()

    now := time.Now()
    expiredAt := now.Add(l.lockTimeout)

    err := l.db.Model(&Lock{}).Where("name = ?", l.lockName).Update("expired_at", expiredAt).Error
    if err != nil {
        l.logger.Error("failed to renew lock", "error", err)
        return err
    }

    l.logger.Info("Lock renewed", "lockName", l.lockName, "newExpiration", expiredAt)
    return nil
}

// renewLock periodically renews the lock lease.
func (l *GORMLocker) renewLock(ctx context.Context) {
    for {
        select {
        case <-l.stopChan:
            return
        case <-l.renewTicker.C:
            if err := l.Renew(ctx); err != nil {
                l.logger.Error("failed to renew lock", "error", err)
            }
        }
    }
}

// isDuplicateEntry checks if the error is a duplicate entry error for MySQL and PostgreSQL.
func isDuplicateEntry(err error) bool {
    if err == nil {
        return false
    }

    if mysqlErr, ok := err.(*mysql.MySQLError); ok {
        return mysqlErr.Number == 1062 // MySQL error code for duplicate entry
    }

    if pgErr, ok := err.(*pgconn.PgError); ok {
        return pgErr.Code == "23505" // PostgreSQL error code for unique violation
    }

    return false
}

参考上述基于MySQL的分布式锁实现,使用Consul实现一个分布式锁,该分布式锁同样实现了以下接口:

type Locker interface {
    Lock(ctx context.Context) error
    Unlock(ctx context.Context) error
    Renew(ctx context.Context) error
}

并且需要满足以下要求:
1. 分布式锁启动后,会有一个异步的协程序,根据创建时的参数,定期续锁;
2. 如果代码需要创建数据库表,需要使用gorm的AutoMigrate方法自动创建表
3. 使用logger记录必要的日志

Documentation

Overview

Package distlock provides an interface for distributed locking mechanisms.

Index

Constants

View Source
const DefaultLockName = "onex-distributed-lock"

DefaultLockName is the default name used for the distributed lock.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsulLocker

type ConsulLocker struct {
	// contains filtered or unexported fields
}

ConsulLocker is a structure that implements distributed locking using Consul.

func NewConsulLocker

func NewConsulLocker(consulAddr string, opts ...Option) (*ConsulLocker, error)

NewConsulLocker creates a new ConsulLocker instance.

func (*ConsulLocker) Lock

func (l *ConsulLocker) Lock(ctx context.Context) error

Lock attempts to acquire the distributed lock.

func (*ConsulLocker) Renew

func (l *ConsulLocker) Renew(ctx context.Context) error

Renew refreshes the lock's expiration time.

func (*ConsulLocker) Unlock

func (l *ConsulLocker) Unlock(ctx context.Context) error

Unlock releases the distributed lock.

type EtcdLocker

type EtcdLocker struct {
	// contains filtered or unexported fields
}

EtcdLocker provides a distributed locking mechanism using etcd.

func NewEtcdLocker

func NewEtcdLocker(endpoints []string, opts ...Option) (*EtcdLocker, error)

NewEtcdLocker initializes a new EtcdLocker instance.

func (*EtcdLocker) Lock

func (l *EtcdLocker) Lock(ctx context.Context) error

Lock acquires the distributed lock.

func (*EtcdLocker) Renew

func (l *EtcdLocker) Renew(ctx context.Context) error

Renew refreshes the lease for the distributed lock.

func (*EtcdLocker) Unlock

func (l *EtcdLocker) Unlock(ctx context.Context) error

Unlock releases the distributed lock.

type GORMLocker

type GORMLocker struct {
	// contains filtered or unexported fields
}

GORMLocker provides a distributed locking mechanism using GORM.

func NewGORMLocker

func NewGORMLocker(db *gorm.DB, opts ...Option) (*GORMLocker, error)

NewGORMLocker initializes a new GORMLocker instance.

func (*GORMLocker) Lock

func (l *GORMLocker) Lock(ctx context.Context) error

Lock acquires the distributed lock.

func (*GORMLocker) Renew

func (l *GORMLocker) Renew(ctx context.Context) error

Renew refreshes the lease for the distributed lock.

func (*GORMLocker) Unlock

func (l *GORMLocker) Unlock(ctx context.Context) error

Unlock releases the distributed lock.

type Lock

type Lock struct {
	ID        uint   `gorm:"primarykey"`
	Name      string `gorm:"unique"`
	OwnerID   string
	ExpiredAt time.Time
	CreatedAt time.Time
	UpdatedAt time.Time
}

Lock represents a database record for a distributed lock.

type Locker

type Locker interface {
	// Lock attempts to acquire the lock.
	Lock(ctx context.Context) error

	// Unlock releases the previously acquired lock.
	Unlock(ctx context.Context) error

	// Renew updates the expiration time of the lock.
	// It should be called periodically to keep the lock active.
	Renew(ctx context.Context) error
}

Locker is an interface that defines the methods for a distributed lock. It provides methods to acquire, release, and renew a lock in a distributed system.

type MemcachedLocker

type MemcachedLocker struct {
	// contains filtered or unexported fields
}

MemcachedLocker provides a distributed locking mechanism using Memcached.

func NewMemcachedLocker

func NewMemcachedLocker(memcachedAddr string, opts ...Option) *MemcachedLocker

NewMemcachedLocker creates a new MemcachedLocker instance.

func (*MemcachedLocker) Lock

func (l *MemcachedLocker) Lock(ctx context.Context) error

Lock attempts to acquire the distributed lock.

func (*MemcachedLocker) Renew

func (l *MemcachedLocker) Renew(ctx context.Context) error

Renew refreshes the expiration time of the lock.

func (*MemcachedLocker) Unlock

func (l *MemcachedLocker) Unlock(ctx context.Context) error

Unlock releases the distributed lock.

type MongoLocker

type MongoLocker struct {
	// contains filtered or unexported fields
}

MongoLocker provides a distributed locking mechanism using MongoDB.

func NewMongoLocker

func NewMongoLocker(mongoURI string, dbName string, opts ...Option) (*MongoLocker, error)

NewMongoLocker creates a new MongoLocker instance.

func (*MongoLocker) Lock

func (l *MongoLocker) Lock(ctx context.Context) error

Lock attempts to acquire the distributed lock.

func (*MongoLocker) Renew

func (l *MongoLocker) Renew(ctx context.Context) error

Renew refreshes the lock's expiration time.

func (*MongoLocker) Unlock

func (l *MongoLocker) Unlock(ctx context.Context) error

Unlock releases the distributed lock.

type NoopLocker

type NoopLocker struct {
	// contains filtered or unexported fields
}

NoopLocker provides a no-operation implementation of a distributed lock.

func NewNoopLocker

func NewNoopLocker(opts ...Option) *NoopLocker

NewNoopLocker creates a new NoopLocker instance.

func (*NoopLocker) Lock

func (l *NoopLocker) Lock(ctx context.Context) error

Lock simulates acquiring a distributed lock.

func (*NoopLocker) Renew

func (l *NoopLocker) Renew(ctx context.Context) error

Renew simulates refreshing the lock's expiration time.

func (*NoopLocker) Unlock

func (l *NoopLocker) Unlock(ctx context.Context) error

Unlock simulates releasing a distributed lock.

type Option

type Option func(o *Options)

Option is a function that modifies Options.

func WithLockName

func WithLockName(name string) Option

WithLockName sets the lock name in Options.

func WithLockTimeout

func WithLockTimeout(timeout time.Duration) Option

WithLockTimeout sets the lock timeout in Options.

func WithLogger

func WithLogger(logger logger.Logger) Option

WithLogger sets the logger in Options.

func WithOwnerID

func WithOwnerID(ownerID string) Option

WithOwnerID sets the owner ID in Options.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options holds the configuration for the distributed lock.

func ApplyOptions

func ApplyOptions(opts ...Option) *Options

ApplyOptions applies a series of Option functions to configure Options.

func NewOptions

func NewOptions() *Options

NewOptions initializes Options with default values.

type RedisLocker

type RedisLocker struct {
	// contains filtered or unexported fields
}

RedisLocker provides a distributed locking mechanism using Redis.

func NewRedisLocker

func NewRedisLocker(client *redis.Client, opts ...Option) *RedisLocker

NewRedisLocker creates a new RedisLocker instance.

func (*RedisLocker) Lock

func (l *RedisLocker) Lock(ctx context.Context) error

Lock attempts to acquire the distributed lock.

func (*RedisLocker) Renew

func (l *RedisLocker) Renew(ctx context.Context) error

Renew refreshes the lock's expiration time.

func (*RedisLocker) Unlock

func (l *RedisLocker) Unlock(ctx context.Context) error

Unlock releases the distributed lock.

type ZookeeperLocker

type ZookeeperLocker struct {
	// contains filtered or unexported fields
}

ZookeeperLocker provides a distributed locking mechanism using Zookeeper.

func NewZookeeperLocker

func NewZookeeperLocker(zkServers []string, opts ...Option) (*ZookeeperLocker, error)

NewZookeeperLocker creates a new ZookeeperLocker instance.

func (*ZookeeperLocker) Lock

func (l *ZookeeperLocker) Lock(ctx context.Context) error

Lock attempts to acquire the distributed lock.

func (*ZookeeperLocker) Renew

func (l *ZookeeperLocker) Renew(ctx context.Context) error

Renew refreshes the lock's expiration time.

func (*ZookeeperLocker) Unlock

func (l *ZookeeperLocker) Unlock(ctx context.Context) error

Unlock releases the distributed lock.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL