store

package
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2023 License: AGPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrBatchFull = errors.New("batch is full")

ErrBatchFull indicates that the batch is full

View Source
var ErrNotConnected = errors.New("not connected to target server/service")

ErrNotConnected - indicates that the target connection is not active.

Functions

func StreamItems

func StreamItems[I any](store Store[I], target Target, doneCh <-chan struct{}, logger logger)

StreamItems reads the keys from the store and replays the corresponding item to the target.

Types

type Batch

type Batch[K key, T any] struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Batch represents an ordered batch

func NewBatch

func NewBatch[K key, T any](limit uint32) *Batch[K, T]

NewBatch creates a new batch

func (*Batch[K, T]) Add

func (b *Batch[K, T]) Add(key K, item T) error

Add adds the item to the batch

func (*Batch[K, T]) GetAll

func (b *Batch[K, T]) GetAll() (orderedKeys []K, orderedItems []T, err error)

GetAll fetches the items and resets the batch Returned items are not referenced by the batch

func (*Batch[K, T]) GetByKey

func (b *Batch[K, T]) GetByKey(key K) (T, bool)

GetByKey will get the batch item by the provided key

func (*Batch[K, T]) IsFull

func (b *Batch[K, T]) IsFull() bool

IsFull checks if the batch is full or not

func (*Batch[K, T]) Len

func (b *Batch[K, T]) Len() int

Len returns the no of items in the batch

type Key

type Key struct {
	Name   string
	IsLast bool
}

Key denotes the key present in the store.

type QueueStore

type QueueStore[_ any] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

QueueStore - Filestore for persisting items.

func NewQueueStore

func NewQueueStore[I any](directory string, limit uint64, ext string) *QueueStore[I]

NewQueueStore - Creates an instance for QueueStore.

func (*QueueStore[_]) Del

func (store *QueueStore[_]) Del(key string) error

Del - Deletes an entry from the store.

func (*QueueStore[_]) DelList

func (store *QueueStore[_]) DelList(keys []string) error

DelList - Deletes a list of entries from the store. Returns an error even if one key fails to be deleted.

func (*QueueStore[_]) Extension

func (store *QueueStore[_]) Extension() string

Extension will return the file extension used for the items stored in the queue.

func (*QueueStore[I]) Get

func (store *QueueStore[I]) Get(key string) (item I, err error)

Get - gets an item from the store.

func (*QueueStore[_]) Len

func (store *QueueStore[_]) Len() int

Len returns the entry count.

func (*QueueStore[_]) List

func (store *QueueStore[_]) List() ([]string, error)

List - lists all files registered in the store.

func (*QueueStore[_]) Open

func (store *QueueStore[_]) Open() error

Open - Creates the directory if not present.

func (*QueueStore[I]) Put

func (store *QueueStore[I]) Put(item I) error

Put - puts an item to the store.

type Store

type Store[I any] interface {
	Put(item I) error
	Get(key string) (I, error)
	Len() int
	List() ([]string, error)
	Del(key string) error
	DelList(key []string) error
	Open() error
	Extension() string
}

Store - Used to persist items.

type Target

type Target interface {
	Name() string
	SendFromStore(key Key) error
}

Target - store target interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL