source

package
v0.0.0-...-41bdb7a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTypeExceptString = errors.New("type except string")
)

Functions

This section is empty.

Types

type BloomSourceImpl

type BloomSourceImpl struct {
	*DataSourceFromRedis
	// contains filtered or unexported fields
}

func NewBloomSourceImpl

func NewBloomSourceImpl(source *DataSourceFromRedis, channel string, log *logrus.Logger) *BloomSourceImpl

func (*BloomSourceImpl) Dump

func (bs *BloomSourceImpl) Dump(ctx context.Context, key interface{}, args ...interface{}) <-chan interface{}

func (*BloomSourceImpl) Get

func (bs *BloomSourceImpl) Get(ctx context.Context, key string, values ...interface{}) ([]interface{}, error)

func (*BloomSourceImpl) Set

func (bs *BloomSourceImpl) Set(ctx context.Context, key string, values ...interface{}) error

func (*BloomSourceImpl) UnWatch

func (bs *BloomSourceImpl) UnWatch(ctx context.Context) error

type CacheSourceImpl

type CacheSourceImpl struct {
	*DataSourceFromRedis
	// contains filtered or unexported fields
}

func NewCacheSourceImpl

func NewCacheSourceImpl(source *DataSourceFromRedis, channel string, log *logrus.Logger) *CacheSourceImpl

func (*CacheSourceImpl) Del

func (bs *CacheSourceImpl) Del(ctx context.Context, key interface{}, args ...interface{}) error

func (*CacheSourceImpl) Dump

func (bs *CacheSourceImpl) Dump(ctx context.Context, key interface{}, args ...interface{}) <-chan interface{}

func (*CacheSourceImpl) Get

func (bs *CacheSourceImpl) Get(ctx context.Context, key string, values ...interface{}) ([]interface{}, error)

func (*CacheSourceImpl) Set

func (bs *CacheSourceImpl) Set(ctx context.Context, key string, values ...interface{}) error

func (*CacheSourceImpl) UnWatch

func (bs *CacheSourceImpl) UnWatch(ctx context.Context) error

type CuckooSource

type CuckooSource interface {
	FilterSource
	Del(ctx context.Context, key string, value []byte) error
}

type CuckooSourceImpl

type CuckooSourceImpl struct {
	*DataSourceFromRedis
	// contains filtered or unexported fields
}

func NewCuckooSourceImpl

func NewCuckooSourceImpl(source *DataSourceFromRedis, channel string, log *logrus.Logger) *CuckooSourceImpl

func (*CuckooSourceImpl) Del

func (bs *CuckooSourceImpl) Del(ctx context.Context, key interface{}, args ...interface{}) error

func (*CuckooSourceImpl) Dump

func (bs *CuckooSourceImpl) Dump(ctx context.Context, key interface{}, args ...interface{}) <-chan interface{}

func (*CuckooSourceImpl) Get

func (bs *CuckooSourceImpl) Get(ctx context.Context, key string, values ...interface{}) ([]interface{}, error)

func (*CuckooSourceImpl) Set

func (bs *CuckooSourceImpl) Set(ctx context.Context, key string, values ...interface{}) error

func (*CuckooSourceImpl) UnWatch

func (bs *CuckooSourceImpl) UnWatch(ctx context.Context) error

type DataSource

type DataSource interface {
	Get(ctx context.Context, key string, args ...interface{}) ([]interface{}, error)
	Set(ctx context.Context, key string, args ...interface{}) error
	Watch(ctx context.Context, onMessage OnMessage) (<-chan error, context.CancelFunc)
	Scan(ctx context.Context, pattern string, onRecv OnScan) <-chan error
	Dump(ctx context.Context, key interface{}, args ...interface{}) <-chan interface{}
}

type DataSourceFromRedis

type DataSourceFromRedis struct {
	// contains filtered or unexported fields
}

func NewDataSourceFromRedis

func NewDataSourceFromRedis(rdb *redis.Client, watcher *WatchOptions) *DataSourceFromRedis

func (*DataSourceFromRedis) BFScanDump

func (d *DataSourceFromRedis) BFScanDump(ctx context.Context, key string, iter int64) ([]byte, int64, error)

func (*DataSourceFromRedis) CFScanDump

func (d *DataSourceFromRedis) CFScanDump(ctx context.Context, key string, iter int64) ([]byte, int64, error)

func (*DataSourceFromRedis) DelExec

func (d *DataSourceFromRedis) DelExec(ctx context.Context, cmd string, args ...interface{}) error

func (*DataSourceFromRedis) Expiration

func (d *DataSourceFromRedis) Expiration(ctx context.Context, key string) (time.Duration, error)

func (*DataSourceFromRedis) Get

func (d *DataSourceFromRedis) Get(ctx context.Context, key string, args ...interface{}) (string, error)

func (*DataSourceFromRedis) GetExec

func (d *DataSourceFromRedis) GetExec(ctx context.Context, cmd string, args ...interface{}) ([]interface{}, error)
func (d *DataSourceFromRedis) Set(ctx context.Context, key string, args ...interface{}) error {
	return fmt.Errorf("not implement")
}

func (*DataSourceFromRedis) PushMessages

func (d *DataSourceFromRedis) PushMessages(ctx context.Context, channel interface{}, message ...interface{}) error

func (*DataSourceFromRedis) Scan

func (d *DataSourceFromRedis) Scan(ctx context.Context, pattern string, onScan OnScan) <-chan error

func (*DataSourceFromRedis) SetExec

func (d *DataSourceFromRedis) SetExec(ctx context.Context, cmd string, args ...interface{}) error

func (*DataSourceFromRedis) TxWriteHandle

func (d *DataSourceFromRedis) TxWriteHandle(ctx context.Context, options *TxHandleKeysOptions) error

func (*DataSourceFromRedis) UnWatch

func (d *DataSourceFromRedis) UnWatch(ctx context.Context, cancel context.CancelFunc) error

func (*DataSourceFromRedis) Watch

func (d *DataSourceFromRedis) Watch(ctx context.Context, onMessage OnMessage) (<-chan error, context.CancelFunc)

Watch redis stream

type FilterSource

type FilterSource interface {
	DataSource
	Check(ctx context.Context, key string, value []byte) bool
	Add(ctx context.Context, key string, value []byte) error
}

type KeyValueCacheSource

type KeyValueCacheSource interface {
	DataSource
	GetValue(ctx context.Context, key string) ([]byte, error)
	SetValue(ctx context.Context, key string, value []byte) error
}

type OnMessage

type OnMessage func(ctx context.Context, from string, message interface{}) error

type OnScan

type OnScan func(context.Context, interface{}) error

type RedisDump

type RedisDump struct {
	Data []byte
	Iter uint64
}

type TxHandleKeysOptions

type TxHandleKeysOptions struct {
	Channel string
	Cmd     string
	CmdArgs []interface{}

	SendMessages []interface{}
}

type WatchOptions

type WatchOptions struct {
	// BatchSize is the maximum number of messages to read from the stream per call to XREAD.
	BatchSize int64
	// Block is the maximum amount of time to block for new messages.
	Block time.Duration

	// Channels is the list of streams to read from.
	Channels []interface{}
	// WatcheAt is the id of the redis xstream message to start reading from.
	WatcheAt string
}

func NewWatchOptions

func NewWatchOptions(channels []interface{}) *WatchOptions

type WithWatchOptions

type WithWatchOptions func(*WatchOptions)

func WithBatchSize

func WithBatchSize(batchSize int64) WithWatchOptions

func WithBlock

func WithBlock(block time.Duration) WithWatchOptions

func WithChannels

func WithChannels(channels []interface{}) WithWatchOptions

func WithWatcheAt

func WithWatcheAt(watcheAt string) WithWatchOptions

WatcheAt is the id of the redis xstream message to start reading from.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL