redlock

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2021 License: MIT Imports: 13 Imported by: 0

README

Redlock 实现一个基于redis的分布式锁

Go Reference

redlock

注意: 因为使用了embed包, 所以至少要求Go版本为1.16

示例:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/gomodule/redigo/redis"
	"github.com/qingtao/redlock"
	"github.com/qingtao/redlock/driver/redigo"
)

func main() {
	// 初始化redigo的连接
	p := &redis.Pool{
		DialContext: func(ctx context.Context) (redis.Conn, error) {
			return redis.DialContext(ctx, "tcp", "127.0.0.1:6379",
				redis.DialDatabase(2),
				redis.DialPassword("redis"),
			)
		},
		MaxIdle:     2,
		IdleTimeout: 2 * time.Minute,
	}
	taskName := func(id string) string {
		return fmt.Sprintf("task:%s", id)
	}
	// 创建一个工作任务锁
	workers := redlock.New(redigo.NewClient(p))
	lock := workers.NewMutex(taskName("exec"),
		// 请求超时时间
		redlock.WithTimeout(30*time.Second),
		// 锁的有效期
		redlock.WithExpires(60*time.Second),
		// 重试次数, 因此最大尝试次数是2+1
		redlock.WithRetries(2),
	)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	if err := lock.Exec(ctx, func(c context.Context) {
		fmt.Println("ok")
	}); err != nil {
		fmt.Println(err.Error())
	}

	// Output: ok
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetLogger

func SetLogger(l *log.Logger)

SetLogger 设置日志, 如果l==nil,则不打印日志

Types

type Conn

type Conn interface {
	// SetNX 存储锁的key和value, 如果需要设置连接超时,可以使用context.WithTimeout,
	// 如果超时时间比过期时间(expires)大,使用expires作为超时时间
	SetNX(ctx context.Context, key, value string, expires time.Duration) error
	// Delete 删除锁的key, 如果需要设置连接超时,可以使用context.WithTimeout
	Delete(ctx context.Context, key, value string) error
	// Extend 存储锁的key的延长过期时间, 如果需要设置连接超时,可以使用context.WithTimeout,
	// 如果超时时间比过期时间(expires)大,使用expires作为超时时间
	Extend(ctx context.Context, key, value string, expires time.Duration) error
	// Close 关闭redis的连接
	Close() error
}

Conn redis连接

type Lock

type Lock struct {
	// contains filtered or unexported fields
}

Lock 锁的提供者

func New

func New(clients ...Conn) *Lock

New 使用初始化完成的客户端列表, 创建锁的提供者

func (*Lock) NewMutex

func (l *Lock) NewMutex(name string, opts ...Option) *Mutex

NewMutex 创建互斥锁 提供者已经初始化了客户端连接, 调用者不需要使用WithPool选项

type MultiError

type MultiError []error

MultiError 包装多个错误

func (MultiError) Error

func (m MultiError) Error() string

Error 遍历多个错误, 以字符串格式返回

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

Mutex 互斥锁

func NewMutex

func NewMutex(name string, opts ...Option) (*Mutex, error)

newMutex 创建一个新的互斥锁,参数name是认证的唯一标识,如:

userid:051c6b48-7a40-4826-850d-ea500adb7ebc

func (*Mutex) Exec

func (m *Mutex) Exec(ctx context.Context, f WorkerFunc) (err error)

Exec 加锁并执行 传入的f函数如果执行时间过长, 会尝试延续锁的有效期, 延长有效期失败就退出,至于执行的操作是否有中断方法需要在f函数内部使用context

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/gomodule/redigo/redis"
	"github.com/qingtao/redlock"
	"github.com/qingtao/redlock/driver/redigo"
)

func main() {
	// 初始化redigo的连接
	p := &redis.Pool{
		DialContext: func(ctx context.Context) (redis.Conn, error) {
			return redis.DialContext(ctx, "tcp", "127.0.0.1:6379",
				redis.DialDatabase(2),
				redis.DialPassword("redis"),
			)
		},
		MaxIdle:     2,
		IdleTimeout: 2 * time.Minute,
	}
	taskName := func(id string) string {
		return fmt.Sprintf("task:%s", id)
	}
	// 创建一个工作任务锁
	workers := redlock.New(redigo.NewClient(p))
	lock := workers.NewMutex(taskName("exec"),
		// 请求超时时间
		redlock.WithTimeout(30*time.Second),
		// 锁的有效期
		redlock.WithExpires(60*time.Second),
		// 重试次数, 因此最大尝试次数是2+1
		redlock.WithRetries(2),
	)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	if err := lock.Exec(ctx, func(c context.Context) {
		fmt.Println("ok")
	}); err != nil {
		fmt.Println(err.Error())
	}

}
Output:

ok
Example (ExtendCanceled)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/gomodule/redigo/redis"
	"github.com/qingtao/redlock"
	"github.com/qingtao/redlock/driver/redigo"
)

func main() {
	// 初始化redigo的连接
	p := &redis.Pool{
		DialContext: func(ctx context.Context) (redis.Conn, error) {
			return redis.DialContext(ctx, "tcp", "127.0.0.1:6379",
				redis.DialDatabase(2),
				redis.DialPassword("redis"),
			)
		},
		MaxIdle:     2,
		IdleTimeout: 2 * time.Minute,
	}
	taskName := func(id string) string {
		return fmt.Sprintf("task:%s", id)
	}
	// 创建一个工作任务锁
	workers := redlock.New(redigo.NewClient(p))
	lock := workers.NewMutex(taskName("exec:extend:canceled"),
		// 锁的有效期
		redlock.WithExpires(500*time.Millisecond),
		// 重试次数, 因此最大尝试次数是2+1
		redlock.WithRetries(2),
	)
	ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
	defer cancel()
	exit := make(chan struct{})
	if err := lock.Exec(ctx, func(c context.Context) {
		defer close(exit)
		for i := 0; i < 5; i++ {
			select {
			case <-ctx.Done():
				fmt.Println("cancel")
				return
			default:
				time.Sleep(100 * time.Millisecond)
			}
		}
		fmt.Println("ok")
	}); err != nil {
		fmt.Println(err.Error())
	}
	<-exit

}
Output:

context deadline exceeded
cancel
Example (ExtendOk)
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/gomodule/redigo/redis"
	"github.com/qingtao/redlock"
	"github.com/qingtao/redlock/driver/redigo"
)

func main() {
	// 初始化redigo的连接
	redlock.SetLogger(log.Default())
	p := &redis.Pool{
		DialContext: func(ctx context.Context) (redis.Conn, error) {
			return redis.DialContext(ctx, "tcp", "127.0.0.1:6379",
				redis.DialDatabase(2),
				redis.DialPassword("redis"),
			)
		},
		MaxIdle:     2,
		IdleTimeout: 2 * time.Minute,
	}
	taskName := func(id string) string {
		return fmt.Sprintf("task:%s", id)
	}
	// 创建一个工作任务锁
	workers := redlock.New(redigo.NewClient(p))
	lock := workers.NewMutex(taskName("exec:extend:ok"),
		// 请求超时时间
		redlock.WithTimeout(2*time.Second),
		// 锁的有效期
		redlock.WithExpires(3*time.Second),
		// 重试次数, 因此最大尝试次数是2+1
		redlock.WithRetries(2),
	)
	ctx := context.Background()
	exit := make(chan struct{})
	if err := lock.Exec(ctx, func(ctx context.Context) {
		defer close(exit)
		for i := 0; i < 2; i++ {
			select {
			case <-ctx.Done():
				fmt.Println("cancel")
				return
			default:
				time.Sleep(1 * time.Second)
			}
		}
		fmt.Println("ok")
	}); err != nil {
		fmt.Println(err.Error())
	}
	<-exit

}
Output:

ok

func (*Mutex) Extend

func (m *Mutex) Extend(ctx context.Context) error

lock 延长锁的有效期, 只有在加锁成功后调用才能成功

func (*Mutex) LastTime

func (m *Mutex) LastTime() time.Time

LastTime 返回加锁成功的时间

func (*Mutex) Lock

func (m *Mutex) Lock(ctx context.Context) error

Lock 执行加锁

func (*Mutex) Unlock

func (m *Mutex) Unlock(ctx context.Context) error

Unlock 执行解锁

type Option

type Option func(*Options)

Option 选项函数

func WithConnections

func WithConnections(conns ...Conn) Option

WithConnections 设置客户端连接, 并不检查是否是nil, 传入的数量就是所有的redis实例数

func WithExpires

func WithExpires(d time.Duration) Option

WithExpires 设置过期时间

func WithRetries

func WithRetries(n int) Option

WithRetries 设置重试次数

func WithTimeout

func WithTimeout(d time.Duration) Option

WithTimeout 设置请求超时时间

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options 请求锁时的选项

type WorkerFunc

type WorkerFunc func(context.Context)

WorkerFunc 工作函数, 任务可以选择接收ctx.Done() 由更上层的上下文取消时, 执行退出操作, 也可以忽略此取消事件;

Directories

Path Synopsis
driver

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL