concurrent

package
v0.1.94 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2024 License: LGPL-2.1 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CacheDefaultCleanNum      = 64
	CacheDefaultCleanInterval = 30 * time.Second
)

Variables

View Source
var (
	ErrFuturesClosed           = errors.New("futures already closed")                   // Future控制器已关闭
	ErrFutureNotFound          = errors.New("future not found")                         // Future未找到
	ErrFutureCanceled          = errors.New("future canceled")                          // Future被取消
	ErrFutureTimeout           = errors.New("future timeout")                           // Future超时
	ErrFutureRespIncorrectType = errors.New("future response has incorrect value type") // Future响应的返回值类型错误
)

Functions

func MakeFutureRespAsyncRet

func MakeFutureRespAsyncRet(fs IFutures, ctx context.Context, timeout ...time.Duration) (Future, RespAsyncRet)

MakeFutureRespAsyncRet 创建future与接收响应返回值的异步调用结果

func MakeFutureRespAsyncRetT added in v0.1.77

func MakeFutureRespAsyncRetT[T any](fs IFutures, ctx context.Context, timeout ...time.Duration) (Future, RespAsyncRetT[T])

MakeFutureRespAsyncRetT 创建future与接收响应返回值的异步调用结果

Types

type Cache added in v0.1.93

type Cache[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func NewCache added in v0.1.93

func NewCache[K comparable, V any]() *Cache[K, V]

func (*Cache[K, V]) AutoClean added in v0.1.93

func (c *Cache[K, V]) AutoClean(ctx context.Context, interval time.Duration, num int)

func (*Cache[K, V]) Clean added in v0.1.93

func (c *Cache[K, V]) Clean(num int)

func (*Cache[K, V]) Del added in v0.1.93

func (c *Cache[K, V]) Del(k K, revision int64)

func (*Cache[K, V]) Get added in v0.1.93

func (c *Cache[K, V]) Get(k K) (V, bool)

func (*Cache[K, V]) OnAdd added in v0.1.93

func (c *Cache[K, V]) OnAdd(cb generic.Action2[K, V])

func (*Cache[K, V]) OnDel added in v0.1.93

func (c *Cache[K, V]) OnDel(cb generic.Action2[K, V])

func (*Cache[K, V]) RefreshTTL added in v0.1.93

func (c *Cache[K, V]) RefreshTTL(k K)

func (*Cache[K, V]) Set added in v0.1.93

func (c *Cache[K, V]) Set(k K, v V, revision int64, ttl time.Duration) V

func (*Cache[K, V]) Snapshot added in v0.1.93

func (c *Cache[K, V]) Snapshot() generic.UnorderedSliceMap[K, V]

type Deduplication

type Deduplication struct {
	// contains filtered or unexported fields
}

Deduplication 去重器,用于保持幂等性

func MakeDeduplication

func MakeDeduplication() Deduplication

MakeDeduplication 创建去重器

func (*Deduplication) Make added in v0.1.33

func (d *Deduplication) Make() int64

Make 创建序号

func (*Deduplication) Remove

func (d *Deduplication) Remove(remote string)

Remove 删除对端

func (*Deduplication) Validate added in v0.1.33

func (d *Deduplication) Validate(remote string, seq int64) (passed bool)

Validate 验证序号

type Future

type Future struct {
	Finish context.Context // 上下文
	Id     int64           // Id
	// contains filtered or unexported fields
}

Future 异步模型Future

func MakeFuture

func MakeFuture[T Resp](fs IFutures, ctx context.Context, resp T, timeout ...time.Duration) Future

MakeFuture 创建Future

func (Future) Cancel

func (f Future) Cancel(err error)

Cancel 取消

func (Future) Wait

func (f Future) Wait(ctx context.Context)

Wait 等待

type Futures

type Futures struct {
	Ctx     context.Context // 上下文
	Id      int64           // 请求id生成器
	Timeout time.Duration   // 请求超时时间
	// contains filtered or unexported fields
}

Futures Future控制器

func MakeFutures

func MakeFutures(ctx context.Context, timeout time.Duration) Futures

MakeFutures 创建Future控制器

func (*Futures) Make

func (fs *Futures) Make(ctx context.Context, resp Resp, timeout ...time.Duration) Future

Make 创建Future

func (*Futures) Request

func (fs *Futures) Request(ctx context.Context, handler RequestHandler, timeout ...time.Duration) async.AsyncRet

Request 请求

func (*Futures) Resolve

func (fs *Futures) Resolve(id int64, ret async.Ret) error

Resolve 解决

type IDeduplication

type IDeduplication interface {
	// Make 创建序号
	Make() int64
	// Validate 验证序号
	Validate(remote string, seq int64) bool
	// Remove 删除对端
	Remove(remote string)
}

IDeduplication 去重器接口

type IFutures

type IFutures interface {

	// Make 创建Future
	Make(ctx context.Context, resp Resp, timeout ...time.Duration) Future
	// Request 请求
	Request(ctx context.Context, handler RequestHandler, timeout ...time.Duration) async.AsyncRet
	// Resolve 解决
	Resolve(id int64, ret async.Ret) error
	// contains filtered or unexported methods
}

IFutures Future控制器接口

type Locked

type Locked[T any] struct {
	// contains filtered or unexported fields
}

func MakeLocked

func MakeLocked[T any](obj T) Locked[T]

func NewLocked

func NewLocked[T any](obj T) *Locked[T]

func (*Locked[T]) AutoLock

func (l *Locked[T]) AutoLock(fun generic.Action1[*T])

type LockedMap

type LockedMap[K comparable, V any] struct {
	RWLocked[map[K]V]
}

func MakeLockedMap

func MakeLockedMap[K comparable, V any](size int) LockedMap[K, V]

func NewLockedMap

func NewLockedMap[K comparable, V any](size int) *LockedMap[K, V]

func (*LockedMap[K, V]) Add added in v0.1.93

func (lm *LockedMap[K, V]) Add(k K, v V)

func (*LockedMap[K, V]) Delete

func (lm *LockedMap[K, V]) Delete(k K)

func (*LockedMap[K, V]) Each added in v0.1.32

func (lm *LockedMap[K, V]) Each(fun generic.Action2[K, V])

func (*LockedMap[K, V]) Exist added in v0.1.93

func (lm *LockedMap[K, V]) Exist(k K) (b bool)

func (*LockedMap[K, V]) Get

func (lm *LockedMap[K, V]) Get(k K) (v V, ok bool)

func (*LockedMap[K, V]) Len

func (lm *LockedMap[K, V]) Len() (l int)

func (*LockedMap[K, V]) Range added in v0.1.33

func (lm *LockedMap[K, V]) Range(fun generic.Func2[K, V, bool])

func (*LockedMap[K, V]) TryAdd added in v0.1.93

func (lm *LockedMap[K, V]) TryAdd(k K, v V)

func (*LockedMap[K, V]) Value added in v0.1.93

func (lm *LockedMap[K, V]) Value(k K) (v V)

type LockedSlice

type LockedSlice[T any] struct {
	RWLocked[[]T]
}

func MakeLockedSlice

func MakeLockedSlice[T any](len, cap int) LockedSlice[T]

func NewLockedSlice

func NewLockedSlice[T any](len, cap int) *LockedSlice[T]

func (*LockedSlice[T]) Append

func (ls *LockedSlice[T]) Append(values ...T)

func (*LockedSlice[T]) Delete

func (ls *LockedSlice[T]) Delete(idx ...int)

func (*LockedSlice[T]) Each added in v0.1.32

func (ls *LockedSlice[T]) Each(fun generic.Action1[T])

func (*LockedSlice[T]) Insert

func (ls *LockedSlice[T]) Insert(idx int, values ...T)

func (*LockedSlice[T]) Len

func (ls *LockedSlice[T]) Len() (l int)

func (*LockedSlice[T]) Range added in v0.1.33

func (ls *LockedSlice[T]) Range(fun generic.Func1[T, bool])

type RWLocked

type RWLocked[T any] struct {
	// contains filtered or unexported fields
}

func MakeRWLocked

func MakeRWLocked[T any](obj T) RWLocked[T]

func NewRWLocked

func NewRWLocked[T any](obj T) *RWLocked[T]

func (*RWLocked[T]) AutoLock

func (l *RWLocked[T]) AutoLock(fun generic.Action1[*T])

func (*RWLocked[T]) AutoRLock

func (l *RWLocked[T]) AutoRLock(fun generic.Action1[*T])

type RequestHandler

type RequestHandler = generic.Action1[Future] // Future请求处理器

type Resp

type Resp interface {
	// Push 填入返回结果
	Push(ret async.Ret) error
}

Resp 响应接口

type RespAsyncRet

type RespAsyncRet chan async.Ret

RespAsyncRet 接收响应返回值的异步调用结果

func MakeRespAsyncRet

func MakeRespAsyncRet() RespAsyncRet

MakeRespAsyncRet 创建接收响应返回值的异步调用结果

func (RespAsyncRet) CastAsyncRet

func (ch RespAsyncRet) CastAsyncRet() async.AsyncRet

CastAsyncRet 转换为异步调用结果

func (RespAsyncRet) Push

func (ch RespAsyncRet) Push(ret async.Ret) error

Push 填入返回结果

type RespAsyncRetT added in v0.1.77

type RespAsyncRetT[T any] chan async.RetT[T]

RespAsyncRetT 接收响应返回值的channel

func MakeRespAsyncRetT added in v0.1.77

func MakeRespAsyncRetT[T any]() RespAsyncRetT[T]

MakeRespAsyncRetT 创建接收响应返回值的异步调用结果

func (RespAsyncRetT[T]) CastAsyncRetT added in v0.1.77

func (ch RespAsyncRetT[T]) CastAsyncRetT() async.AsyncRetT[T]

CastAsyncRetT 转换为异步调用结果

func (RespAsyncRetT[T]) Push added in v0.1.77

func (ch RespAsyncRetT[T]) Push(ret async.Ret) error

Push 填入返回结果

type RespDelegate

type RespDelegate[T any] generic.DelegateAction1[async.RetT[T]]

RespDelegate 接收响应返回值的委托

func (RespDelegate[T]) Push

func (dlg RespDelegate[T]) Push(ret async.RetT[any]) error

Push 填入返回结果

type RespFunc

type RespFunc[T any] generic.Action1[async.RetT[T]]

RespFunc 接收响应返回值的函数

func (RespFunc[T]) Push

func (fun RespFunc[T]) Push(ret async.RetT[any]) error

Push 填入返回结果

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL