concurrent

package
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2024 License: LGPL-2.1 Imports: 10 Imported by: 1

Documentation

Index

Constants

View Source
const (
	CacheDefaultCleanNum      = 64
	CacheDefaultCleanInterval = 30 * time.Second
)

Variables

View Source
var (
	ErrFuturesClosed           = errors.New("futures already closed")                   // Future控制器已关闭
	ErrFutureNotFound          = errors.New("future not found")                         // Future未找到
	ErrFutureCanceled          = errors.New("future canceled")                          // Future被取消
	ErrFutureTimeout           = errors.New("future timeout")                           // Future超时
	ErrFutureRespIncorrectType = errors.New("future response has incorrect value type") // Future响应的返回值类型错误
)

Functions

func MakeFutureRespAsyncRet

func MakeFutureRespAsyncRet(fs IFutures, ctx context.Context, timeout ...time.Duration) (Future, RespAsyncRet)

MakeFutureRespAsyncRet 创建future与接收响应返回值的异步调用结果

func MakeFutureRespAsyncRetT

func MakeFutureRespAsyncRetT[T any](fs IFutures, ctx context.Context, timeout ...time.Duration) (Future, RespAsyncRetT[T])

MakeFutureRespAsyncRetT 创建future与接收响应返回值的异步调用结果

Types

type Cache

type Cache[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func NewCache

func NewCache[K comparable, V any]() *Cache[K, V]

func (*Cache[K, V]) AutoClean

func (c *Cache[K, V]) AutoClean(ctx context.Context, interval time.Duration, num int)

func (*Cache[K, V]) Clean

func (c *Cache[K, V]) Clean(num int)

func (*Cache[K, V]) Del

func (c *Cache[K, V]) Del(k K, revision int64)

func (*Cache[K, V]) Get

func (c *Cache[K, V]) Get(k K) (V, bool)

func (*Cache[K, V]) OnAdd

func (c *Cache[K, V]) OnAdd(cb generic.Action2[K, V])

func (*Cache[K, V]) OnDel

func (c *Cache[K, V]) OnDel(cb generic.Action2[K, V])

func (*Cache[K, V]) RefreshTTL

func (c *Cache[K, V]) RefreshTTL(k K)

func (*Cache[K, V]) Set

func (c *Cache[K, V]) Set(k K, v V, revision int64, ttl time.Duration) V

func (*Cache[K, V]) Snapshot

func (c *Cache[K, V]) Snapshot() generic.UnorderedSliceMap[K, V]

type Deduplication

type Deduplication struct {
	// contains filtered or unexported fields
}

Deduplication 去重器,用于保持幂等性

func MakeDeduplication

func MakeDeduplication() Deduplication

MakeDeduplication 创建去重器

func (*Deduplication) Make

func (d *Deduplication) Make() int64

Make 创建序号

func (*Deduplication) Remove

func (d *Deduplication) Remove(remote string)

Remove 删除对端

func (*Deduplication) Validate

func (d *Deduplication) Validate(remote string, seq int64) (passed bool)

Validate 验证序号

type Future

type Future struct {
	Finish context.Context // 上下文
	Id     int64           // Id
	// contains filtered or unexported fields
}

Future 异步模型Future

func MakeFuture

func MakeFuture[T Resp](fs IFutures, ctx context.Context, resp T, timeout ...time.Duration) Future

MakeFuture 创建Future

func (Future) Cancel

func (f Future) Cancel(err error)

Cancel 取消

func (Future) Wait

func (f Future) Wait(ctx context.Context)

Wait 等待

type Futures

type Futures struct {
	Ctx     context.Context // 上下文
	Id      int64           // 请求id生成器
	Timeout time.Duration   // 请求超时时间
	// contains filtered or unexported fields
}

Futures Future控制器

func MakeFutures

func MakeFutures(ctx context.Context, timeout time.Duration) Futures

MakeFutures 创建Future控制器

func (*Futures) Make

func (fs *Futures) Make(ctx context.Context, resp Resp, timeout ...time.Duration) Future

Make 创建Future

func (*Futures) Request

func (fs *Futures) Request(ctx context.Context, handler RequestHandler, timeout ...time.Duration) async.AsyncRet

Request 请求

func (*Futures) Resolve

func (fs *Futures) Resolve(id int64, ret async.Ret) error

Resolve 解决

type IDeduplication

type IDeduplication interface {
	// Make 创建序号
	Make() int64
	// Validate 验证序号
	Validate(remote string, seq int64) bool
	// Remove 删除对端
	Remove(remote string)
}

IDeduplication 去重器接口

type IFutures

type IFutures interface {

	// Make 创建Future
	Make(ctx context.Context, resp Resp, timeout ...time.Duration) Future
	// Request 请求
	Request(ctx context.Context, handler RequestHandler, timeout ...time.Duration) async.AsyncRet
	// Resolve 解决
	Resolve(id int64, ret async.Ret) error
	// contains filtered or unexported methods
}

IFutures Future控制器接口

type Locked

type Locked[T any] struct {
	// contains filtered or unexported fields
}

func MakeLocked

func MakeLocked[T any](obj T) Locked[T]

func NewLocked

func NewLocked[T any](obj T) *Locked[T]

func (*Locked[T]) AutoLock

func (l *Locked[T]) AutoLock(fun generic.Action1[*T])

type LockedMap

type LockedMap[K comparable, V any] struct {
	RWLocked[map[K]V]
}

func MakeLockedMap

func MakeLockedMap[K comparable, V any](size int) LockedMap[K, V]

func NewLockedMap

func NewLockedMap[K comparable, V any](size int) *LockedMap[K, V]

func (*LockedMap[K, V]) Add

func (lm *LockedMap[K, V]) Add(k K, v V)

func (*LockedMap[K, V]) Delete

func (lm *LockedMap[K, V]) Delete(k K)

func (*LockedMap[K, V]) Each

func (lm *LockedMap[K, V]) Each(fun generic.Action2[K, V])

func (*LockedMap[K, V]) Exist

func (lm *LockedMap[K, V]) Exist(k K) (b bool)

func (*LockedMap[K, V]) Get

func (lm *LockedMap[K, V]) Get(k K) (v V, ok bool)

func (*LockedMap[K, V]) Len

func (lm *LockedMap[K, V]) Len() (l int)

func (*LockedMap[K, V]) Range

func (lm *LockedMap[K, V]) Range(fun generic.Func2[K, V, bool])

func (*LockedMap[K, V]) TryAdd

func (lm *LockedMap[K, V]) TryAdd(k K, v V)

func (*LockedMap[K, V]) Value

func (lm *LockedMap[K, V]) Value(k K) (v V)

type LockedSlice

type LockedSlice[T any] struct {
	RWLocked[[]T]
}

func MakeLockedSlice

func MakeLockedSlice[T any](len, cap int) LockedSlice[T]

func NewLockedSlice

func NewLockedSlice[T any](len, cap int) *LockedSlice[T]

func (*LockedSlice[T]) Append

func (ls *LockedSlice[T]) Append(values ...T)

func (*LockedSlice[T]) Delete

func (ls *LockedSlice[T]) Delete(idx ...int)

func (*LockedSlice[T]) Each

func (ls *LockedSlice[T]) Each(fun generic.Action1[T])

func (*LockedSlice[T]) Insert

func (ls *LockedSlice[T]) Insert(idx int, values ...T)

func (*LockedSlice[T]) Len

func (ls *LockedSlice[T]) Len() (l int)

func (*LockedSlice[T]) Range

func (ls *LockedSlice[T]) Range(fun generic.Func1[T, bool])

type RWLocked

type RWLocked[T any] struct {
	// contains filtered or unexported fields
}

func MakeRWLocked

func MakeRWLocked[T any](obj T) RWLocked[T]

func NewRWLocked

func NewRWLocked[T any](obj T) *RWLocked[T]

func (*RWLocked[T]) AutoLock

func (l *RWLocked[T]) AutoLock(fun generic.Action1[*T])

func (*RWLocked[T]) AutoRLock

func (l *RWLocked[T]) AutoRLock(fun generic.Action1[*T])

type RequestHandler

type RequestHandler = generic.Action1[Future] // Future请求处理器

type Resp

type Resp interface {
	// Push 填入返回结果
	Push(ret async.Ret) error
}

Resp 响应接口

type RespAsyncRet

type RespAsyncRet chan async.Ret

RespAsyncRet 接收响应返回值的异步调用结果

func MakeRespAsyncRet

func MakeRespAsyncRet() RespAsyncRet

MakeRespAsyncRet 创建接收响应返回值的异步调用结果

func (RespAsyncRet) Push

func (ch RespAsyncRet) Push(ret async.Ret) error

Push 填入返回结果

func (RespAsyncRet) ToAsyncRet added in v0.1.96

func (ch RespAsyncRet) ToAsyncRet() async.AsyncRet

ToAsyncRet 转换为异步调用结果

type RespAsyncRetT

type RespAsyncRetT[T any] chan async.RetT[T]

RespAsyncRetT 接收响应返回值的channel

func MakeRespAsyncRetT

func MakeRespAsyncRetT[T any]() RespAsyncRetT[T]

MakeRespAsyncRetT 创建接收响应返回值的异步调用结果

func (RespAsyncRetT[T]) Push

func (ch RespAsyncRetT[T]) Push(ret async.Ret) error

Push 填入返回结果

func (RespAsyncRetT[T]) ToAsyncRetT added in v0.1.96

func (ch RespAsyncRetT[T]) ToAsyncRetT() async.AsyncRetT[T]

ToAsyncRetT 转换为异步调用结果

type RespDelegate

type RespDelegate[T any] generic.DelegateAction1[async.RetT[T]]

RespDelegate 接收响应返回值的委托

func (RespDelegate[T]) Push

func (dlg RespDelegate[T]) Push(ret async.RetT[any]) error

Push 填入返回结果

type RespFunc

type RespFunc[T any] generic.Action1[async.RetT[T]]

RespFunc 接收响应返回值的函数

func (RespFunc[T]) Push

func (fun RespFunc[T]) Push(ret async.RetT[any]) error

Push 填入返回结果

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL