mux

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	//DefaultMuxSize slot size
	//素数
	DefaultMuxSize = 127
	//DefaultDeepSize default queue deep
	DefaultDeepSize = 1024 * 8
)

Variables

View Source
var (
	//ErrClosed close
	ErrClosed = status.Error(codes.Unavailable, "q.closed")

	//ErrQFull req q full
	ErrQFull = status.Error(codes.ResourceExhausted, "q.full")

	//ErrSync never gonna happen
	ErrSync = errors.New("never.gonna.happen.crazy")
)
View Source
var (
	ErrDupKey = status.Error(codes.AlreadyExists, "gas.duplicated.key")
)

Functions

This section is empty.

Types

type AsyncC

type AsyncC struct {
	// contains filtered or unexported fields
}

AsyncC : async cell

func NewAsync

func NewAsync(ctx context.Context, op OpCode) *AsyncC

NewAsync : new async call

func (*AsyncC) R

func (a *AsyncC) R() (interface{}, error)

R : get result

func (*AsyncC) SetR

func (a *AsyncC) SetR(r interface{}, err error)

SetR : set op result

type Byte

type Byte byte

Byte : byte implement HashedInt

func (Byte) HashedInt

func (v Byte) HashedInt() int

type Bytes

type Bytes []byte

Bytes : bytes implement HashedInt, use crc32 to hash

func (Bytes) HashedInt

func (v Bytes) HashedInt() int

type CacheFacade

type CacheFacade interface {
	//Peek : only useful in lru cache, peek means no update LRU order.
	Peek(key interface{}) (interface{}, bool)
	//Get : get from cache, in lru cache, key order also be updated.
	Get(key interface{}) (interface{}, bool)
	//Set : set to cache
	Set(key interface{}, value interface{})
	//Delete : delete key from cache
	Delete(key interface{})
}

CacheFacade : cache facade, could be lru cache, map, or both mixed.

func NewFacadeLRU

func NewFacadeLRU(capacity int64) CacheFacade

NewFacadeLRU : creates a new empty cache with the given capacity.

func NewFacadeMap

func NewFacadeMap() CacheFacade

type CacheGen

type CacheGen func() CacheFacade

CacheGen : cache facade generator

type DeleteFn

type DeleteFn func(ctx context.Context, d interface{}) error

DeleteFn : delete data function. input: ctx->context:can be ignored in case; d->input data. output: error->if failed, return err.

type FacadeLRU

type FacadeLRU struct {
	cache.LRUCache
}

FacadeLRU : extend LRUCache to interface CacheFacade

func (*FacadeLRU) Delete

func (m *FacadeLRU) Delete(key interface{})

Delete : delete key from cache

func (*FacadeLRU) Get

func (m *FacadeLRU) Get(key interface{}) (interface{}, bool)

Get : get from cache, in lru cache, key order also be updated.

func (*FacadeLRU) Peek

func (m *FacadeLRU) Peek(key interface{}) (interface{}, bool)

Peek : only useful in lru cache, peek means no update LRU order.

func (*FacadeLRU) Set

func (m *FacadeLRU) Set(key interface{}, value interface{})

Set : set to cache

type FacadeMap

type FacadeMap struct {
	cache.Map
}

FacadeMap : extend cache.Map to interface CacheFacade

func (*FacadeMap) Peek

func (m *FacadeMap) Peek(key interface{}) (interface{}, bool)

type Hashed2Int

type Hashed2Int interface {
	//HashedInt : the hashed int
	HashedInt() int
}

Hashed2Int : can be hashed to int

type Int

type Int int

Int : int implement HashedInt

func (Int) HashedInt

func (v Int) HashedInt() int

type Int16

type Int16 int16

Int16 : int16 implement HashedInt

func (Int16) HashedInt

func (v Int16) HashedInt() int

type Int32

type Int32 int32

Int32 : int32 implement HashedInt, use crc32 to hash

func (Int32) HashedInt

func (v Int32) HashedInt() int

type Int32CRC

type Int32CRC int32

Int32CRC : int32 implement HashedInt, use crc32 to hash

func (Int32CRC) HashedInt

func (v Int32CRC) HashedInt() int

type Int64

type Int64 int64

Int64 : int64 implement HashedInt, use crc32 to hash

func (Int64) HashedInt

func (v Int64) HashedInt() int

type Int64CRC

type Int64CRC int64

Int64CRC : int64 implement HashedInt, use crc32 to hash

func (Int64CRC) HashedInt

func (v Int64CRC) HashedInt() int

type Int8

type Int8 int8

Int8 : byte implement HashedInt

func (Int8) HashedInt

func (v Int8) HashedInt() int

type IntCRC

type IntCRC int

IntCRC : int implement HashedInt, use crc32 to hash

func (IntCRC) HashedInt

func (v IntCRC) HashedInt() int

type IsNotFoundFn

type IsNotFoundFn func(err error) bool

IsNotFoundFn : to detective an error is "not found" or not.

type OpAdd

type OpAdd struct {
	// contains filtered or unexported fields
}

OpAdd : wrapped add command

func (*OpAdd) GetK

func (o *OpAdd) GetK() interface{}

type OpCode

type OpCode interface {
	GetK() interface{}
}

func NewAdd

func NewAdd(a RenewDataFn, k interface{}, data interface{}) OpCode

func NewDelete

func NewDelete(d DeleteFn, k interface{}) OpCode

func NewLoad

func NewLoad(l RenewDataFn, k interface{}) OpCode

func NewMixUpdOrAddIfNull

func NewMixUpdOrAddIfNull(l RenewDataFn, u UpdateDataFn, a RenewDataFn, i IsNotFoundFn,
	k interface{}, data interface{}) OpCode

func NewMixUpsertThenLoad

func NewMixUpsertThenLoad(p UpdateDataFn, l RenewDataFn, k interface{}, data interface{}) OpCode

func NewMixUpsertThenRenewInCache

func NewMixUpsertThenRenewInCache(p UpdateDataFn, k interface{}, data interface{}) OpCode

func NewUpdate

func NewUpdate(l RenewDataFn, u UpdateDataFn, k interface{}, data interface{}) OpCode

type OpDelete

type OpDelete struct {
	// contains filtered or unexported fields
}

OpDelete : wrapped delete command

func (*OpDelete) GetK

func (o *OpDelete) GetK() interface{}

type OpLoad

type OpLoad struct {
	// contains filtered or unexported fields
}

OpLoad : wrapped load command

func (*OpLoad) GetK

func (o *OpLoad) GetK() interface{}

type OpMixUpdOrAddIfNull

type OpMixUpdOrAddIfNull struct {
	// contains filtered or unexported fields
}

OpMixUpdOrAddIfNull : 1.load. 2. update it if existed. 3. add it if not existed.

load; if not found -> add. else update.

func (*OpMixUpdOrAddIfNull) GetK

func (o *OpMixUpdOrAddIfNull) GetK() interface{}

type OpMixUpsertThenLoad

type OpMixUpsertThenLoad struct {
	// contains filtered or unexported fields
}

OpMixUpsertThenLoad : upsert it first, then load it if not in cache.

1. upsert. 2. if in cache, refresh cache. 3. if not in cache, reload from cache.

func (*OpMixUpsertThenLoad) GetK

func (o *OpMixUpsertThenLoad) GetK() interface{}

type OpMixUpsertThenRenewInCache

type OpMixUpsertThenRenewInCache struct {
	// contains filtered or unexported fields
}

OpMixUpsertThenRenewInCache : upsert it first, then renew in cache item.

1. upsert. 2. if in cache, renew cache. 3. if not in cache, do nothing.

func (*OpMixUpsertThenRenewInCache) GetK

func (o *OpMixUpsertThenRenewInCache) GetK() interface{}

type OpType

type OpType int

OpType : op type

type OpUpdate

type OpUpdate struct {
	// contains filtered or unexported fields
}

OpUpdate : wrapped update command Actually : update should after load data

func (*OpUpdate) GetK

func (o *OpUpdate) GetK() interface{}

type Option

type Option func(o *_Option)

Option mux option function

func WithDeep

func WithDeep(deepSize int) Option

WithDeep setup queue deep

func WithSize

func WithSize(muxSize int) Option

WithSize setup mux size

type Q

type Q struct {
	// contains filtered or unexported fields
}

Q actor queue structure define

func NewQ

func NewQ(reqMaxNum int) *Q

NewQ new queue

func (*Q) AddPriorReq

func (a *Q) AddPriorReq(req interface{}) error

AddPriorReq add normal request to the normal queue first place.

func (*Q) AddReq

func (a *Q) AddReq(req interface{}) error

AddReq add normal request to the normal queue end place.

func (*Q) AddReqAnyway

func (a *Q) AddReqAnyway(req interface{}, ts time.Duration) error

AddReqAnyway add normal request to the normal queue end place anyway if queue full, sleep then try

func (*Q) Close

func (a *Q) Close()

Close : close the queue

func (*Q) IsClosed

func (a *Q) IsClosed() bool

IsClosed is closed or not

func (*Q) Pop

func (a *Q) Pop() (interface{}, error)

Pop consume an item, if list is empty, it's been blocked

func (*Q) PopAnyway

func (a *Q) PopAnyway() (interface{}, error)

PopAnyway consume an item like Pop, but it can consume even the queue is closed.

func (*Q) WaitClose

func (a *Q) WaitClose(ctx context.Context) error

WaitClose wait close, must call in another go routine

type R

type R struct {
	// contains filtered or unexported fields
}

R : combine interface and error, for return result

type RenewDataFn

type RenewDataFn func(ctx context.Context, d interface{}) (v interface{}, err error)

RenewDataFn : renew data function, including load/add, excluding delete. input: ctx->context:can be ignored in case; d->input data output: v-> return cache value; err->if failed, return err

type String

type String string

String : string implement HashedInt, use crc32 to hash

func (String) HashedInt

func (v String) HashedInt() int

type UInt

type UInt uint

UInt : uint implement HashedInt

func (UInt) HashedInt

func (v UInt) HashedInt() int

type UInt16

type UInt16 uint16

UInt16 : uint16 implement HashedInt

func (UInt16) HashedInt

func (v UInt16) HashedInt() int

type UInt32

type UInt32 uint32

UInt32 : uint32 implement HashedInt, use crc32 to hash

func (UInt32) HashedInt

func (v UInt32) HashedInt() int

type UInt32CRC

type UInt32CRC uint32

UInt32CRC : uint32 implement HashedInt, use crc32 to hash

func (UInt32CRC) HashedInt

func (v UInt32CRC) HashedInt() int

type UInt64

type UInt64 uint64

UInt64 : uint64 implement HashedInt, use crc32 to hash

func (UInt64) HashedInt

func (v UInt64) HashedInt() int

type UInt64CRC

type UInt64CRC uint64

UInt64CRC : uint64 implement HashedInt, use crc32 to hash

func (UInt64CRC) HashedInt

func (v UInt64CRC) HashedInt() int

type UIntCRC

type UIntCRC uint

UIntCRC : uint implement HashedInt, use crc32 to hash

func (UIntCRC) HashedInt

func (v UIntCRC) HashedInt() int

type UpdateDataFn

type UpdateDataFn func(ctx context.Context, d interface{}, e interface{}) (v interface{}, err error)

UpdateDataFn : update data function. input: ctx->context:can be ignored in case; d->input data; e->the existed item. output: v-> return cache value; err->if failed, return err. actually e is from load function if it's not in cache.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker : a go routine to handle queue work

func NewWorker

func NewWorker(qSize int, wg *sync.WaitGroup, ca CacheFacade) *Worker

func (*Worker) DoAdd

func (w *Worker) DoAdd(ctx context.Context, addFn RenewDataFn, k interface{}, data interface{}) (interface{}, error)

DoAdd : add item

func (*Worker) DoDelete

func (w *Worker) DoDelete(ctx context.Context, deleteFn DeleteFn, k interface{}) (interface{}, error)

DoDelete : delete item

func (*Worker) DoGet

func (w *Worker) DoGet(ctx context.Context, loadFn RenewDataFn, k interface{}) (interface{}, error)

DoGet : get from cache first if not load from db

func (*Worker) DoUpdOrAddIfNull

func (w *Worker) DoUpdOrAddIfNull(ctx context.Context,
	loadFn RenewDataFn, updFn UpdateDataFn, addFn RenewDataFn, isNotFoundFn IsNotFoundFn,
	k interface{}, data interface{}) (interface{}, error)

DoUpdOrAddIfNull : 1. load. 2. update if existed. 3. add if not existed.

func (*Worker) DoUpdate

func (w *Worker) DoUpdate(ctx context.Context,
	loadFn RenewDataFn, updFn UpdateDataFn, k interface{}, data interface{}) (interface{}, error)

DoUpdate : update item

func (*Worker) DoUpsertThenLoad

func (w *Worker) DoUpsertThenLoad(ctx context.Context,
	upsertFn UpdateDataFn, loadFn RenewDataFn, k interface{}, data interface{}) (interface{}, error)

DoUpsertThenLoad : 1. upsert. 2. update cache if cache hit. 3. load cache if cache miss.

func (*Worker) DoUpsertThenRenewInCache

func (w *Worker) DoUpsertThenRenewInCache(ctx context.Context,
	upsertFn UpdateDataFn, k interface{}, data interface{}) (interface{}, error)

DoUpsertThenRenewInCache : 1. upsert. 2. update cache if cache hit.

func (*Worker) Start

func (w *Worker) Start()

Start : start handler go routine

func (*Worker) Stop

func (w *Worker) Stop()

Stop : close queue, not accept input anymore.

type WorkerGrp

type WorkerGrp struct {
	// contains filtered or unexported fields
}

WorkerGrp : worker group

func NewWorkGrp

func NewWorkGrp(cg CacheGen, opts ...Option) *WorkerGrp

NewWorkGrp : new work group

func NewWorkGrpWithLRU

func NewWorkGrpWithLRU(lruCap int64, opts ...Option) *WorkerGrp

NewWorkGrpWithLRU : new work group bind with lru cache.

func NewWorkGrpWithMapCache

func NewWorkGrpWithMapCache(opts ...Option) *WorkerGrp

NewWorkGrpWithMapCache : new work group bind with map cache.

func (*WorkerGrp) DeepSize

func (w *WorkerGrp) DeepSize() int

DeepSize : get deep size

func (*WorkerGrp) DoAdd

func (w *WorkerGrp) DoAdd(ctx context.Context, addFn RenewDataFn, k Hashed2Int, data interface{}) (interface{}, error)

DoAdd : add item

func (*WorkerGrp) DoDelete

func (w *WorkerGrp) DoDelete(ctx context.Context, deleteFn DeleteFn, k Hashed2Int) (interface{}, error)

DoDelete : delete item

func (*WorkerGrp) DoGet

func (w *WorkerGrp) DoGet(ctx context.Context, loadFn RenewDataFn, k Hashed2Int) (interface{}, error)

DoGet : get from cache first if not load from db

func (*WorkerGrp) DoUpdOrAddIfNull

func (w *WorkerGrp) DoUpdOrAddIfNull(ctx context.Context,
	loadFn RenewDataFn, updFn UpdateDataFn, addFn RenewDataFn, isNotFoundFn IsNotFoundFn,
	k Hashed2Int, data interface{}) (interface{}, error)

DoUpdOrAddIfNull : 1. load. 2. update if existed. 3. add if not existed.

func (*WorkerGrp) DoUpdate

func (w *WorkerGrp) DoUpdate(ctx context.Context,
	loadFn RenewDataFn, updFn UpdateDataFn, k Hashed2Int, data interface{}) (interface{}, error)

DoUpdate : update item

func (*WorkerGrp) DoUpsertThenLoad

func (w *WorkerGrp) DoUpsertThenLoad(ctx context.Context,
	upsertFn UpdateDataFn, loadFn RenewDataFn, k Hashed2Int, data interface{}) (interface{}, error)

DoUpsertThenLoad : 1. upsert. 2. update cache if cache hit. 3. load cache if cache miss.

func (*WorkerGrp) DoUpsertThenRenewInCache

func (w *WorkerGrp) DoUpsertThenRenewInCache(ctx context.Context,
	upsertFn UpdateDataFn, k Hashed2Int, data interface{}) (interface{}, error)

DoUpsertThenRenewInCache : 1. upsert. 2. update cache if cache hit.

func (*WorkerGrp) MuxSize

func (w *WorkerGrp) MuxSize() int

MuxSize : get mux size

func (*WorkerGrp) Start

func (w *WorkerGrp) Start()

Start : start all work go routine

func (*WorkerGrp) Stop

func (w *WorkerGrp) Stop()

Stop : stop

func (*WorkerGrp) WaitStop

func (w *WorkerGrp) WaitStop(ctx context.Context) error

WaitStop : wait stop

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL