statestorage

package
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterStateStorage

func RegisterStateStorage(
	name config.StateStorageType, storageProvider StorageProvider,
) bool

RegisterStateStorage registers a config.StateStorageType to a Provider implementation which creates the Storage when requested

Types

type Manager added in v0.4.0

type Manager interface {
	Start() error
	Stop() error
	Get() (map[string]*Offset, error)
	Set(
		key string, value *Offset,
	) error
	StateEncoder(
		name string, encoder encoding.BinaryMarshaler,
	) error
	StateDecoder(
		name string, decoder encoding.BinaryUnmarshaler,
	) (present bool, err error)
	SetEncodedState(
		name string, encodedState []byte,
	)
	EncodedState(
		name string,
	) (encodedState []byte, present bool)
	SnapshotContext() (*watermark.SnapshotContext, error)
	SnapshotContextTransaction(
		snapshotName string, createIfNotExists bool, transaction func(snapshotContext *watermark.SnapshotContext) error,
	) error
}

func NewStateStorageManager added in v0.4.0

func NewStateStorageManager(
	stateStorage Storage,
) Manager

type Offset

type Offset struct {
	Timestamp      time.Time   `json:"timestamp"`
	Snapshot       bool        `json:"snapshot"`
	SnapshotName   *string     `json:"snapshot_name,omitempty"`
	SnapshotOffset int         `json:"snapshot_offset"`
	LSN            pgtypes.LSN `json:"lsn"`
}

func (*Offset) Equal

func (o *Offset) Equal(
	other *Offset,
) bool

func (*Offset) MarshalBinary

func (o *Offset) MarshalBinary() ([]byte, error)

func (*Offset) UnmarshalBinary

func (o *Offset) UnmarshalBinary(
	data []byte,
) error

type StateEncoderFunc

type StateEncoderFunc func() (data []byte, err error)

func (StateEncoderFunc) MarshalBinary

func (sef StateEncoderFunc) MarshalBinary() (data []byte, err error)

type Storage

type Storage interface {
	Start() error
	Stop() error
	Save() error
	Load() error
	Get() (map[string]*Offset, error)
	Set(
		key string, value *Offset,
	) error
	StateEncoder(
		name string, encoder encoding.BinaryMarshaler,
	) error
	StateDecoder(
		name string, decoder encoding.BinaryUnmarshaler,
	) (present bool, err error)
	EncodedState(
		name string,
	) (encodedState []byte, present bool)
	SetEncodedState(
		name string, encodedState []byte,
	)
}

func NewDummyStateStorage added in v0.3.1

func NewDummyStateStorage() Storage

func NewFileStateStorage added in v0.3.1

func NewFileStateStorage(
	path string,
) (Storage, error)

func NewStateStorage

func NewStateStorage(
	name config.StateStorageType, config *config.Config,
) (Storage, error)

NewStateStorage instantiates a new instance of the requested Storage when available, otherwise returns an error.

type StorageProvider added in v0.4.0

type StorageProvider = func(config *config.Config) (Storage, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL