stores

package
v0.25.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2023 License: Apache-2.0 Imports: 27 Imported by: 32

Documentation

Index

Constants

View Source
const (
	// CryptoStoreEnvKeyName is the environment variable name
	// that the CryptoStore looks up if no key is passed as
	// a parameter.
	CryptoStoreEnvKeyName = "NATS_STREAMING_ENCRYPTION_KEY"

	// CryptoCipherAES is the name of the AES cipher to use for encryption
	CryptoCipherAES = "AES"

	// CryptoCipherChaChaPoly is the name of the ChaChaPoly cipher to use for encryption
	CryptoCipherChaChaPoly = "CHACHA"

	// CryptoCipherAutoSelect if passed to NewCryptoStore() will cause the cipher to
	// be auto-selected based on the platform the executable is built for.
	CryptoCipherAutoSelect = ""
)
View Source
const (
	CryptoCodeAES    = byte(1)
	CryptoCodeChaCha = byte(2)
)

These constants define a code for each of the supported ciphers

View Source
const (
	// TypeMemory is the store type name for memory based stores
	TypeMemory = "MEMORY"
	// TypeFile is the store type name for file based stores
	TypeFile = "FILE"
	// TypeSQL is the store type name for sql based stores
	TypeSQL = "SQL"
	// TypeRaft is the store type name for the raft stores
	TypeRaft = "RAFT"
)

Variables

View Source
var (
	ErrCryptoStoreRequiresKey = errors.New("encryption key required")
	ErrCipherNotSupported     = errors.New("encryption cipher not supported")
)

CryptStore specific errors

View Source
var (
	ErrTooManyChannels  = errors.New("too many channels")
	ErrTooManySubs      = errors.New("too many subscriptions per channel")
	ErrNotSupported     = errors.New("not supported")
	ErrAlreadyExists    = errors.New("already exists")
	ErrNotFound         = errors.New("not found")
	ErrNoSrvButChannels = errors.New("no server state recovered but channels present")
)

Errors.

View Source
var DefaultFileStoreOptions = FileStoreOptions{
	BufferSize:           2 * 1024 * 1024,
	CompactEnabled:       true,
	CompactInterval:      5 * 60,
	CompactFragmentation: 50,
	CompactMinFileSize:   1024 * 1024,
	DoCRC:                true,
	CRCPolynomial:        int64(crc32.IEEE),
	DoSync:               true,
	SliceMaxBytes:        64 * 1024 * 1024,
	ParallelRecovery:     1,
	ReadBufferSize:       2 * 1024 * 1024,
	AutoSync:             time.Minute,
}

DefaultFileStoreOptions defines the default options for a File Store.

View Source
var DefaultStoreLimits = StoreLimits{
	100,
	ChannelLimits{
		MsgStoreLimits{
			MaxMsgs:  1000000,
			MaxBytes: 1000000 * 1024,
		},
		SubStoreLimits{
			MaxSubscriptions: 1000,
		},
		0,
	},
	nil,
}

DefaultStoreLimits are the limits that a Store must use when none are specified to the Store constructor. Store limits can be changed with the Store.SetLimits() method.

Functions

func FileStoreTestSetBackgroundTaskInterval added in v0.6.0

func FileStoreTestSetBackgroundTaskInterval(wait time.Duration)

FileStoreTestSetBackgroundTaskInterval is used by tests to reduce the interval at which some tasks are performed in the background

Types

type Channel added in v0.6.0

type Channel struct {
	// Subs is the Subscriptions Store.
	Subs SubStore
	// Msgs is the Messages Store.
	Msgs MsgStore
}

Channel contains a reference to both Subscription and Message stores.

type ChannelLimits

type ChannelLimits struct {
	// Limits for message stores
	MsgStoreLimits
	// Limits for subscriptions stores
	SubStoreLimits
	// How long without any active subscription and no new message
	// before this channel can be deleted.
	MaxInactivity time.Duration `json:"max_inactivity"`
}

ChannelLimits defines limits for a given channel

type Client

type Client struct {
	spb.ClientInfo
}

Client represents a client with ID and Heartbeat Inbox.

type CryptoMsgStore added in v0.12.0

type CryptoMsgStore struct {
	sync.Mutex
	MsgStore
	// contains filtered or unexported fields
}

CryptoMsgStore is a store wrappeing a SubStore implementation and adds encryption support.

func (*CryptoMsgStore) FirstMsg added in v0.12.0

func (cms *CryptoMsgStore) FirstMsg() (*pb.MsgProto, error)

FirstMsg implements the MsgStore interface

func (*CryptoMsgStore) LastMsg added in v0.12.0

func (cms *CryptoMsgStore) LastMsg() (*pb.MsgProto, error)

LastMsg implements the MsgStore interface

func (*CryptoMsgStore) Lookup added in v0.12.0

func (cms *CryptoMsgStore) Lookup(seq uint64) (*pb.MsgProto, error)

Lookup implements the MsgStore interface

func (*CryptoMsgStore) Store added in v0.12.0

func (cms *CryptoMsgStore) Store(msg *pb.MsgProto) (uint64, error)

Store implements the MsgStore interface

type CryptoStore added in v0.12.0

type CryptoStore struct {
	sync.Mutex
	Store
	// contains filtered or unexported fields
}

CryptoStore is a store wrapping a store implementation and adds encryption support.

func NewCryptoStore added in v0.12.0

func NewCryptoStore(s Store, encryptionCipher string, encryptionKey []byte) (*CryptoStore, error)

NewCryptoStore returns a CryptoStore instance with given underlying store.

func (*CryptoStore) CreateChannel added in v0.12.0

func (cs *CryptoStore) CreateChannel(channel string) (*Channel, error)

CreateChannel implements the Store interface

func (*CryptoStore) Recover added in v0.12.0

func (cs *CryptoStore) Recover() (*RecoveredState, error)

Recover implements the Store interface

type EDStore added in v0.12.0

type EDStore struct {
	// contains filtered or unexported fields
}

EDStore provides encryption and decryption of data

func NewEDStore added in v0.12.0

func NewEDStore(encryptionCipher string, encryptionKey []byte, idx uint64) (*EDStore, error)

NewEDStore returns an instance of EDStore that adds Encrypt/Decrypt capabilities.

func (*EDStore) Decrypt added in v0.12.0

func (s *EDStore) Decrypt(dst []byte, cipherText []byte) ([]byte, error)

Decrypt returns the decrypted data or an error

func (*EDStore) Encrypt added in v0.12.0

func (s *EDStore) Encrypt(pbuf *[]byte, data []byte) ([]byte, error)

Encrypt returns the encrypted data or an error

func (*EDStore) EncryptionOffset added in v0.12.0

func (s *EDStore) EncryptionOffset() int

EncryptionOffset returns the encrypted data actually starts in an encrypted buffer.

type FileMsgStore

type FileMsgStore struct {
	// contains filtered or unexported fields
}

FileMsgStore is a per channel message file store.

func (*FileMsgStore) Close

func (ms *FileMsgStore) Close() error

Close closes the store.

func (*FileMsgStore) Empty added in v0.9.0

func (ms *FileMsgStore) Empty() error

Empty implements the MsgStore interface

func (*FileMsgStore) FirstAndLastSequence

func (gms *FileMsgStore) FirstAndLastSequence() (uint64, uint64, error)

FirstAndLastSequence returns sequences for the first and last messages stored.

func (*FileMsgStore) FirstMsg

func (ms *FileMsgStore) FirstMsg() (*pb.MsgProto, error)

FirstMsg returns the first message stored.

func (*FileMsgStore) FirstSequence

func (gms *FileMsgStore) FirstSequence() (uint64, error)

FirstSequence returns sequence for first message stored.

func (*FileMsgStore) Flush added in v0.2.0

func (ms *FileMsgStore) Flush() error

Flush flushes outstanding data into the store.

func (*FileMsgStore) GetSequenceFromTimestamp

func (ms *FileMsgStore) GetSequenceFromTimestamp(timestamp int64) (uint64, error)

GetSequenceFromTimestamp returns the sequence of the first message whose timestamp is greater or equal to given timestamp.

func (*FileMsgStore) LastMsg

func (ms *FileMsgStore) LastMsg() (*pb.MsgProto, error)

LastMsg returns the last message stored.

func (*FileMsgStore) LastSequence

func (gms *FileMsgStore) LastSequence() (uint64, error)

LastSequence returns sequence for last message stored.

func (*FileMsgStore) Lookup

func (ms *FileMsgStore) Lookup(seq uint64) (*pb.MsgProto, error)

Lookup returns the stored message with given sequence number.

func (*FileMsgStore) State

func (gms *FileMsgStore) State() (numMessages int, byteSize uint64, err error)

State returns some statistics related to this store

func (*FileMsgStore) Store

func (ms *FileMsgStore) Store(m *pb.MsgProto) (uint64, error)

Store a given message.

type FileStore

type FileStore struct {
	// contains filtered or unexported fields
}

FileStore is the storage interface for STAN servers, backed by files.

func NewFileStore

func NewFileStore(log logger.Logger, rootDir string, limits *StoreLimits, options ...FileStoreOption) (*FileStore, error)

NewFileStore returns a factory for stores backed by files. If not limits are provided, the store will be created with DefaultStoreLimits.

func (*FileStore) AddClient

func (fs *FileStore) AddClient(info *spb.ClientInfo) (*Client, error)

AddClient implements the Store interface

func (*FileStore) Close

func (fs *FileStore) Close() error

Close closes all stores.

func (*FileStore) CreateChannel

func (fs *FileStore) CreateChannel(channel string) (*Channel, error)

CreateChannel implements the Store interface

func (*FileStore) DeleteChannel added in v0.9.0

func (fs *FileStore) DeleteChannel(channel string) error

DeleteChannel implements the Store interface

func (*FileStore) DeleteClient

func (fs *FileStore) DeleteClient(clientID string) error

DeleteClient implements the Store interface

func (*FileStore) GetChannelLimits added in v0.9.0

func (gs *FileStore) GetChannelLimits(channel string) *ChannelLimits

GetChannelLimits implements the Store interface

func (*FileStore) GetExclusiveLock added in v0.4.0

func (fs *FileStore) GetExclusiveLock() (bool, error)

GetExclusiveLock implements the Store interface

func (*FileStore) Init

func (fs *FileStore) Init(info *spb.ServerInfo) error

Init is used to persist server's information after the first start

func (*FileStore) Name

func (gs *FileStore) Name() string

Name returns the type name of this store

func (*FileStore) Recover added in v0.4.0

func (fs *FileStore) Recover() (*RecoveredState, error)

Recover implements the Store interface

func (*FileStore) SetLimits added in v0.3.0

func (gs *FileStore) SetLimits(limits *StoreLimits) error

SetLimits sets limits for this store

type FileStoreOption

type FileStoreOption func(*FileStoreOptions) error

FileStoreOption is a function on the options for a File Store

func AllOptions

func AllOptions(opts *FileStoreOptions) FileStoreOption

AllOptions is a convenient option to pass all options from a FileStoreOptions structure to the constructor.

func AutoSync added in v0.16.0

func AutoSync(dur time.Duration) FileStoreOption

AutoSync is a FileStore option that defines how often each store is sync'ed on disk. Any value <= 0 will disable this feature.

func BufferSize added in v0.2.0

func BufferSize(size int) FileStoreOption

BufferSize is a FileStore option that sets the size of the buffer used during store writes. This can help improve write performance.

func CRCPolynomial added in v0.2.0

func CRCPolynomial(polynomial int64) FileStoreOption

CRCPolynomial is a FileStore option that defines the polynomial to use to create the table used for CRC-32 Checksum. See https://golang.org/pkg/hash/crc32/#MakeTable

func CompactEnabled

func CompactEnabled(enabled bool) FileStoreOption

CompactEnabled is a FileStore option that enables or disables file compaction. The value false will disable compaction.

func CompactFragmentation

func CompactFragmentation(fragmentation int) FileStoreOption

CompactFragmentation is a FileStore option that defines the fragmentation ratio below which compaction would not occur. For instance, specifying 50 means that if other variables would allow for compaction, the compaction would occur only after 50% of the file has data that is no longer valid.

func CompactInterval

func CompactInterval(seconds int) FileStoreOption

CompactInterval is a FileStore option that defines the minimum compaction interval. Compaction is not timer based, but instead when things get "deleted". This value prevents compaction to happen too often.

func CompactMinFileSize

func CompactMinFileSize(fileSize int64) FileStoreOption

CompactMinFileSize is a FileStore option that defines the minimum file size below which compaction would not occur. Specify `0` if you don't want any minimum.

func DoCRC added in v0.2.0

func DoCRC(enableCRC bool) FileStoreOption

DoCRC is a FileStore option that defines if a CRC checksum verification should be performed when records are read from disk.

func DoSync added in v0.2.2

func DoSync(enableFileSync bool) FileStoreOption

DoSync is a FileStore option that defines if `File.Sync()` should be called during a `Flush()` call.

func FileDescriptorsLimit added in v0.4.0

func FileDescriptorsLimit(limit int64) FileStoreOption

FileDescriptorsLimit is a soft limit hinting at FileStore to try to limit the number of concurrent opened files to that limit.

func ParallelRecovery added in v0.5.0

func ParallelRecovery(count int) FileStoreOption

ParallelRecovery is a FileStore option that allows the parallel recovery of channels. When running with SSDs, try to use a higher value than the default number of 1. When running with HDDs, performance may be better if it stays at 1.

func ReadBufferSize added in v0.16.0

func ReadBufferSize(size int) FileStoreOption

ReadBufferSize is a FileStore option that sets the size of the buffer used during store reads. This can help improve read performance.

func SliceConfig added in v0.3.4

func SliceConfig(maxMsgs int, maxBytes int64, maxAge time.Duration, script string) FileStoreOption

SliceConfig is a FileStore option that allows the configuration of file slice limits and optional archive script file name.

func TruncateUnexpectedEOF added in v0.10.0

func TruncateUnexpectedEOF(truncate bool) FileStoreOption

TruncateUnexpectedEOF indicates if on recovery the store should truncate a file that reports an unexpected end-of-file (EOF) on recovery. If set to true, the invalid record byte content is printed but the store will truncate the file prior to this bad record and proceed with recovery. Dataloss may occur.

type FileStoreOptions

type FileStoreOptions struct {
	// BufferSize is the size of the buffer used during store operations.
	BufferSize int

	// CompactEnabled allows to enable/disable files compaction.
	CompactEnabled bool

	// CompactInterval indicates the minimum interval (in seconds) between compactions.
	CompactInterval int

	// CompactFragmentation indicates the minimum ratio of fragmentation
	// to trigger compaction. For instance, 50 means that compaction
	// would not happen until fragmentation is more than 50%.
	CompactFragmentation int

	// CompactMinFileSize indicates the minimum file size before compaction
	// can be performed, regardless of the current file fragmentation.
	CompactMinFileSize int64

	// DoCRC enables (or disables) CRC checksum verification on read operations.
	DoCRC bool

	// CRCPoly is a polynomial used to make the table used in CRC computation.
	CRCPolynomial int64

	// DoSync indicates if `File.Sync()“ is called during a flush.
	DoSync bool

	// Regardless of channel limits, the options below allow to split a message
	// log in smaller file chunks. If all those options were to be set to 0,
	// some file slice limit will be selected automatically based on the channel
	// limits.
	// SliceMaxMsgs defines how many messages can fit in a file slice (0 means
	// count is not checked).
	SliceMaxMsgs int
	// SliceMaxBytes defines how many bytes can fit in a file slice, including
	// the corresponding index file (0 means size is not checked).
	SliceMaxBytes int64
	// SliceMaxAge defines the period of time covered by a slice starting when
	// the first message is stored (0 means time is not checked).
	SliceMaxAge time.Duration
	// SliceArchiveScript is the path to a script to be invoked when a file
	// slice (and the corresponding index file) is going to be removed.
	// The script will be invoked with the channel name and names of data and
	// index files (which both have been previously renamed with a '.bak'
	// extension). It is the responsibility of the script to move/remove
	// those files.
	SliceArchiveScript string

	// FileDescriptorsLimit is a soft limit hinting at FileStore to try to
	// limit the number of concurrent opened files to that limit.
	FileDescriptorsLimit int64

	// Number of channels recovered in parallel (default is 1).
	ParallelRecovery int

	// TruncateUnexpectedEOF is set to true means that if recovery reports
	// an error about unexpected end of file, the last bad record will be
	// removed (the file is truncated at the beginning of the first incomplete
	// record). Dataloss may occur.
	TruncateUnexpectedEOF bool

	// ReadBufferSize, if non zero, will cause the store to preload messages
	// (up to this total size) when looking up a message. We expect that the
	// client will be asking for the following sequential messages, so this
	// is a read ahead optimization.
	ReadBufferSize int

	// AutoSync defines how often the store will flush and sync the files in
	// the background. The default is set to 60 seconds.
	// This is useful when a file sync is not desired for each Flush() call
	// by setting DoSync to false.
	// Setting AutoSync to any value <= 0 will disable auto sync.
	AutoSync time.Duration
}

FileStoreOptions can be used to customize a File Store

type FileSubStore

type FileSubStore struct {
	// contains filtered or unexported fields
}

FileSubStore is a subscription store in files.

func (*FileSubStore) AckSeqPending

func (ss *FileSubStore) AckSeqPending(subid, seqno uint64) error

AckSeqPending records that the given message seqno has been acknowledged by the given subscription.

func (*FileSubStore) AddSeqPending

func (ss *FileSubStore) AddSeqPending(subid, seqno uint64) error

AddSeqPending adds the given message seqno to the given subscription.

func (*FileSubStore) Close

func (ss *FileSubStore) Close() error

Close closes this store

func (*FileSubStore) CreateSub

func (ss *FileSubStore) CreateSub(sub *spb.SubState) error

CreateSub records a new subscription represented by SubState. On success, it returns an id that is used by the other methods.

func (*FileSubStore) DeleteSub

func (ss *FileSubStore) DeleteSub(subid uint64) error

DeleteSub invalidates this subscription.

func (*FileSubStore) Flush added in v0.2.0

func (ss *FileSubStore) Flush() error

Flush persists buffered operations to disk.

func (*FileSubStore) UpdateSub

func (ss *FileSubStore) UpdateSub(sub *spb.SubState) error

UpdateSub updates a given subscription represented by SubState.

type MemoryMsgStore

type MemoryMsgStore struct {
	// contains filtered or unexported fields
}

MemoryMsgStore is a per channel message store in memory

func (*MemoryMsgStore) Close

func (ms *MemoryMsgStore) Close() error

Close implements the MsgStore interface

func (*MemoryMsgStore) Empty added in v0.9.0

func (ms *MemoryMsgStore) Empty() error

Empty implements the MsgStore interface

func (*MemoryMsgStore) FirstAndLastSequence

func (gms *MemoryMsgStore) FirstAndLastSequence() (uint64, uint64, error)

FirstAndLastSequence returns sequences for the first and last messages stored.

func (*MemoryMsgStore) FirstMsg

func (ms *MemoryMsgStore) FirstMsg() (*pb.MsgProto, error)

FirstMsg returns the first message stored.

func (*MemoryMsgStore) FirstSequence

func (gms *MemoryMsgStore) FirstSequence() (uint64, error)

FirstSequence returns sequence for first message stored.

func (*MemoryMsgStore) Flush added in v0.2.0

func (gms *MemoryMsgStore) Flush() error

func (*MemoryMsgStore) GetSequenceFromTimestamp

func (ms *MemoryMsgStore) GetSequenceFromTimestamp(timestamp int64) (uint64, error)

GetSequenceFromTimestamp returns the sequence of the first message whose timestamp is greater or equal to given timestamp.

func (*MemoryMsgStore) LastMsg

func (ms *MemoryMsgStore) LastMsg() (*pb.MsgProto, error)

LastMsg returns the last message stored.

func (*MemoryMsgStore) LastSequence

func (gms *MemoryMsgStore) LastSequence() (uint64, error)

LastSequence returns sequence for last message stored.

func (*MemoryMsgStore) Lookup

func (ms *MemoryMsgStore) Lookup(seq uint64) (*pb.MsgProto, error)

Lookup returns the stored message with given sequence number.

func (*MemoryMsgStore) State

func (gms *MemoryMsgStore) State() (numMessages int, byteSize uint64, err error)

State returns some statistics related to this store

func (*MemoryMsgStore) Store

func (ms *MemoryMsgStore) Store(m *pb.MsgProto) (uint64, error)

Store a given message.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is a factory for message and subscription stores.

func NewMemoryStore

func NewMemoryStore(log logger.Logger, limits *StoreLimits) (*MemoryStore, error)

NewMemoryStore returns a factory for stores held in memory. If not limits are provided, the store will be created with DefaultStoreLimits.

func (*MemoryStore) AddClient

func (gs *MemoryStore) AddClient(info *spb.ClientInfo) (*Client, error)

AddClient implements the Store interface

func (*MemoryStore) Close

func (gs *MemoryStore) Close() error

Close closes all stores

func (*MemoryStore) CreateChannel

func (ms *MemoryStore) CreateChannel(channel string) (*Channel, error)

CreateChannel implements the Store interface

func (*MemoryStore) DeleteChannel added in v0.9.0

func (gs *MemoryStore) DeleteChannel(channel string) error

DeleteChannel implements the Store interface

func (*MemoryStore) DeleteClient

func (gs *MemoryStore) DeleteClient(clientID string) error

DeleteClient implements the Store interface

func (*MemoryStore) GetChannelLimits added in v0.9.0

func (gs *MemoryStore) GetChannelLimits(channel string) *ChannelLimits

GetChannelLimits implements the Store interface

func (*MemoryStore) GetExclusiveLock added in v0.4.0

func (gs *MemoryStore) GetExclusiveLock() (bool, error)

GetExclusiveLock implements the Store interface.

func (*MemoryStore) Init

func (gs *MemoryStore) Init(info *spb.ServerInfo) error

Init can be used to initialize the store with server's information.

func (*MemoryStore) Name

func (gs *MemoryStore) Name() string

Name returns the type name of this store

func (*MemoryStore) Recover added in v0.4.0

func (gs *MemoryStore) Recover() (*RecoveredState, error)

Recover implements the Store interface.

func (*MemoryStore) SetLimits added in v0.3.0

func (gs *MemoryStore) SetLimits(limits *StoreLimits) error

SetLimits sets limits for this store

type MemorySubStore

type MemorySubStore struct {
	// contains filtered or unexported fields
}

MemorySubStore is a subscription store in memory

func (*MemorySubStore) AckSeqPending

func (*MemorySubStore) AckSeqPending(subid, seqno uint64) error

AckSeqPending records that the given message seqno has been acknowledged by the given subscription.

func (*MemorySubStore) AddSeqPending

func (*MemorySubStore) AddSeqPending(subid, seqno uint64) error

AddSeqPending adds the given message seqno to the given subscription.

func (*MemorySubStore) Close

func (gss *MemorySubStore) Close() error

Close closes this store

func (*MemorySubStore) CreateSub

func (gss *MemorySubStore) CreateSub(sub *spb.SubState) error

CreateSub records a new subscription represented by SubState. On success, it records the subscription's ID in SubState.ID. This ID is to be used by the other SubStore methods.

func (*MemorySubStore) DeleteSub

func (gss *MemorySubStore) DeleteSub(subid uint64) error

DeleteSub invalidates this subscription.

func (*MemorySubStore) Flush added in v0.2.0

func (gss *MemorySubStore) Flush() error

Flush is for stores that may buffer operations and need them to be persisted.

func (*MemorySubStore) UpdateSub

func (gss *MemorySubStore) UpdateSub(sub *spb.SubState) error

UpdateSub updates a given subscription represented by SubState.

type MsgStore

type MsgStore interface {
	// State returns some statistics related to this store.
	State() (numMessages int, byteSize uint64, err error)

	// Store stores a message and returns the message sequence.
	Store(msg *pb.MsgProto) (uint64, error)

	// Lookup returns the stored message with given sequence number.
	Lookup(seq uint64) (*pb.MsgProto, error)

	// FirstSequence returns sequence for first message stored, 0 if no
	// message is stored.
	FirstSequence() (uint64, error)

	// LastSequence returns sequence for last message stored, 0 if no
	// message is stored.
	LastSequence() (uint64, error)

	// FirstAndLastSequence returns sequences for the first and last messages stored,
	// 0 if no message is stored.
	FirstAndLastSequence() (uint64, uint64, error)

	// GetSequenceFromTimestamp returns the sequence of the first message whose
	// timestamp is greater or equal to given timestamp.
	GetSequenceFromTimestamp(timestamp int64) (uint64, error)

	// FirstMsg returns the first message stored.
	FirstMsg() (*pb.MsgProto, error)

	// LastMsg returns the last message stored.
	LastMsg() (*pb.MsgProto, error)

	// Flush is for stores that may buffer operations and need them to be persisted.
	Flush() error

	// Empty removes all messages from the store
	Empty() error

	// Close closes the store.
	Close() error
}

MsgStore is the interface for storage of Messages on a given channel.

type MsgStoreLimits added in v0.3.0

type MsgStoreLimits struct {
	// How many messages are allowed.
	MaxMsgs int `json:"max_msgs"`
	// How many bytes are allowed.
	MaxBytes int64 `json:"max_bytes"`
	// How long messages are kept in the log (unit is seconds)
	MaxAge time.Duration `json:"max_age"`
}

MsgStoreLimits defines limits for a MsgStore. For global limits, a value of 0 means "unlimited". For per-channel limits, it means that the corresponding global limit is used.

type PendingAcks

type PendingAcks map[uint64]struct{}

PendingAcks is a set of message sequences waiting to be acknowledged.

type RaftStore added in v0.9.0

type RaftStore struct {
	sync.Mutex
	Store
	// contains filtered or unexported fields
}

RaftStore is an hybrid store for server running in clustering mode. This store persists/recovers ServerInfo and messages, but is a no-op for clients and subscriptions since we rely on raft log for that. It still creates/deletes subscriptions so that we on recovery we can ensure that we don't reuse any subscription ID.

func NewRaftStore added in v0.9.0

func NewRaftStore(log logger.Logger, s Store, limits *StoreLimits) *RaftStore

NewRaftStore returns an instarce of a RaftStore

func (*RaftStore) AddClient added in v0.9.0

func (s *RaftStore) AddClient(info *spb.ClientInfo) (*Client, error)

AddClient implements the Store interface

func (*RaftStore) CreateChannel added in v0.9.0

func (s *RaftStore) CreateChannel(channel string) (*Channel, error)

CreateChannel implements the Store interface

func (*RaftStore) DeleteClient added in v0.9.0

func (s *RaftStore) DeleteClient(clientID string) error

DeleteClient implements the Store interface

func (*RaftStore) Name added in v0.9.0

func (s *RaftStore) Name() string

Name implements the Store interface

func (*RaftStore) Recover added in v0.9.0

func (s *RaftStore) Recover() (*RecoveredState, error)

Recover implements the Store interface

type RaftSubStore added in v0.9.0

type RaftSubStore struct {
	// contains filtered or unexported fields
}

RaftSubStore implements the SubStore interface

func (*RaftSubStore) AckSeqPending added in v0.9.0

func (ss *RaftSubStore) AckSeqPending(subid, seqno uint64) error

AckSeqPending records that the given message 'seqno' has been acknowledged by the subscription 'subid'.

func (*RaftSubStore) AddSeqPending added in v0.9.0

func (ss *RaftSubStore) AddSeqPending(subid, seqno uint64) error

AddSeqPending adds the given message 'seqno' to the subscription 'subid'.

func (*RaftSubStore) Close added in v0.14.1

func (gss *RaftSubStore) Close() error

Close closes this store

func (*RaftSubStore) CreateSub added in v0.14.1

func (ss *RaftSubStore) CreateSub(sub *spb.SubState) error

CreateSub implements the SubStore interface

func (*RaftSubStore) DeleteSub added in v0.14.1

func (gss *RaftSubStore) DeleteSub(subid uint64) error

DeleteSub invalidates this subscription.

func (*RaftSubStore) Flush added in v0.14.1

func (gss *RaftSubStore) Flush() error

Flush is for stores that may buffer operations and need them to be persisted.

func (*RaftSubStore) UpdateSub added in v0.9.0

func (ss *RaftSubStore) UpdateSub(*spb.SubState) error

UpdateSub implements the SubStore interface

type RecoveredChannel added in v0.6.0

type RecoveredChannel struct {
	Channel       *Channel
	Subscriptions []*RecoveredSubscription
}

RecoveredChannel represents a channel that has been recovered, with all its subscriptions

type RecoveredState

type RecoveredState struct {
	Info     *spb.ServerInfo
	Clients  []*Client
	Channels map[string]*RecoveredChannel
}

RecoveredState allows the server to reconstruct its state after a restart.

type RecoveredSubscription added in v0.6.0

type RecoveredSubscription struct {
	Sub     *spb.SubState
	Pending PendingAcks
}

RecoveredSubscription represents a recovered Subscription with a map of pending messages.

type SQLMsgStore added in v0.7.0

type SQLMsgStore struct {
	// contains filtered or unexported fields
}

SQLMsgStore is a per channel message store backed by an SQL Database

func (*SQLMsgStore) Close added in v0.7.0

func (ms *SQLMsgStore) Close() error

Close implements the MsgStore interface

func (*SQLMsgStore) Empty added in v0.9.0

func (ms *SQLMsgStore) Empty() error

Empty implements the MsgStore interface

func (*SQLMsgStore) FirstAndLastSequence added in v0.7.0

func (gms *SQLMsgStore) FirstAndLastSequence() (uint64, uint64, error)

FirstAndLastSequence returns sequences for the first and last messages stored.

func (*SQLMsgStore) FirstMsg added in v0.7.0

func (ms *SQLMsgStore) FirstMsg() (*pb.MsgProto, error)

FirstMsg implements the MsgStore interface

func (*SQLMsgStore) FirstSequence added in v0.7.0

func (gms *SQLMsgStore) FirstSequence() (uint64, error)

FirstSequence returns sequence for first message stored.

func (*SQLMsgStore) Flush added in v0.7.0

func (ms *SQLMsgStore) Flush() error

Flush implements the MsgStore interface

func (*SQLMsgStore) GetSequenceFromTimestamp added in v0.7.0

func (ms *SQLMsgStore) GetSequenceFromTimestamp(timestamp int64) (uint64, error)

GetSequenceFromTimestamp implements the MsgStore interface

func (*SQLMsgStore) LastMsg added in v0.7.0

func (ms *SQLMsgStore) LastMsg() (*pb.MsgProto, error)

LastMsg implements the MsgStore interface

func (*SQLMsgStore) LastSequence added in v0.7.0

func (gms *SQLMsgStore) LastSequence() (uint64, error)

LastSequence returns sequence for last message stored.

func (*SQLMsgStore) Lookup added in v0.7.0

func (ms *SQLMsgStore) Lookup(seq uint64) (*pb.MsgProto, error)

Lookup implements the MsgStore interface

func (*SQLMsgStore) State added in v0.7.0

func (gms *SQLMsgStore) State() (numMessages int, byteSize uint64, err error)

State returns some statistics related to this store

func (*SQLMsgStore) Store added in v0.7.0

func (ms *SQLMsgStore) Store(m *pb.MsgProto) (uint64, error)

Store implements the MsgStore interface

type SQLStore added in v0.7.0

type SQLStore struct {
	// contains filtered or unexported fields
}

SQLStore is a factory for message and subscription stores backed by a SQL Database.

func NewSQLStore added in v0.7.0

func NewSQLStore(log logger.Logger, driver, source string, limits *StoreLimits, options ...SQLStoreOption) (*SQLStore, error)

NewSQLStore returns a factory for stores held in memory. If not limits are provided, the store will be created with DefaultStoreLimits.

func (*SQLStore) AddClient added in v0.7.0

func (s *SQLStore) AddClient(info *spb.ClientInfo) (*Client, error)

AddClient implements the Store interface

func (*SQLStore) Close added in v0.7.0

func (s *SQLStore) Close() error

Close implements the Store interface

func (*SQLStore) CreateChannel added in v0.7.0

func (s *SQLStore) CreateChannel(channel string) (*Channel, error)

CreateChannel implements the Store interface

func (*SQLStore) DeleteChannel added in v0.9.0

func (s *SQLStore) DeleteChannel(channel string) error

DeleteChannel implements the Store interface

func (*SQLStore) DeleteClient added in v0.7.0

func (s *SQLStore) DeleteClient(clientID string) error

DeleteClient implements the Store interface

func (*SQLStore) GetChannelLimits added in v0.9.0

func (gs *SQLStore) GetChannelLimits(channel string) *ChannelLimits

GetChannelLimits implements the Store interface

func (*SQLStore) GetExclusiveLock added in v0.7.0

func (s *SQLStore) GetExclusiveLock() (bool, error)

GetExclusiveLock implements the Store interface

func (*SQLStore) Init added in v0.7.0

func (s *SQLStore) Init(info *spb.ServerInfo) error

Init implements the Store interface

func (*SQLStore) Name added in v0.7.0

func (gs *SQLStore) Name() string

Name returns the type name of this store

func (*SQLStore) Recover added in v0.7.0

func (s *SQLStore) Recover() (*RecoveredState, error)

Recover implements the Store interface

func (*SQLStore) SetLimits added in v0.7.0

func (gs *SQLStore) SetLimits(limits *StoreLimits) error

SetLimits sets limits for this store

type SQLStoreOption added in v0.7.0

type SQLStoreOption func(*SQLStoreOptions) error

SQLStoreOption is a function on the options for a SQL Store

func SQLAllOptions added in v0.7.0

func SQLAllOptions(opts *SQLStoreOptions) SQLStoreOption

SQLAllOptions is a convenient option to pass all options from a SQLStoreOptions structure to the constructor.

func SQLBulkInsertLimit added in v0.20.0

func SQLBulkInsertLimit(limit int) SQLStoreOption

SQLBulkInsertLimit sets the BulkInsertLimit option

func SQLMaxOpenConns added in v0.7.0

func SQLMaxOpenConns(max int) SQLStoreOption

SQLMaxOpenConns sets the MaxOpenConns option

func SQLNoCaching added in v0.7.0

func SQLNoCaching(noCaching bool) SQLStoreOption

SQLNoCaching sets the NoCaching option

type SQLStoreOptions added in v0.7.0

type SQLStoreOptions struct {
	Driver string
	Source string

	// By default, MsgStore.Store(), SubStore.AddSeqPending() and
	// SubStore.AckSeqPending() are storing the actions in memory, and
	// actual SQL statements are executed only when MsgStore.Flush()
	// and SubStore.Flush() are called.
	// If this option is set to `true`, each call to aforementioned
	// APIs will cause execution of their respective SQL statements.
	NoCaching bool

	// If this is non 0, and NoCaching is not enabled, the server will perform
	// bulk insert of messages. This is the limit of values added to the SQL statement
	// "INSERT INTO Messages (..) VALUES (..)[,(..)*]".
	BulkInsertLimit int

	// Maximum number of open connections to the database.
	// If <= 0, then there is no limit on the number of open connections.
	// The default is 0 (unlimited).
	MaxOpenConns int
}

SQLStoreOptions are used to configure the SQL Store.

func DefaultSQLStoreOptions added in v0.7.0

func DefaultSQLStoreOptions() *SQLStoreOptions

DefaultSQLStoreOptions returns default store options for an SQL Store

type SQLSubStore added in v0.7.0

type SQLSubStore struct {
	// contains filtered or unexported fields
}

SQLSubStore is a subscription store backed by an SQL Database

func (*SQLSubStore) AckSeqPending added in v0.7.0

func (ss *SQLSubStore) AckSeqPending(subid, seqno uint64) error

AckSeqPending implements the SubStore interface

func (*SQLSubStore) AddSeqPending added in v0.7.0

func (ss *SQLSubStore) AddSeqPending(subid, seqno uint64) error

AddSeqPending implements the SubStore interface

func (*SQLSubStore) Close added in v0.7.0

func (ss *SQLSubStore) Close() error

Close implements the SubStore interface

func (*SQLSubStore) CreateSub added in v0.7.0

func (ss *SQLSubStore) CreateSub(sub *spb.SubState) error

CreateSub implements the SubStore interface

func (*SQLSubStore) DeleteSub added in v0.7.0

func (ss *SQLSubStore) DeleteSub(subid uint64) error

DeleteSub implements the SubStore interface

func (*SQLSubStore) Flush added in v0.7.0

func (ss *SQLSubStore) Flush() error

Flush implements the SubStore interface

func (*SQLSubStore) UpdateSub added in v0.7.0

func (ss *SQLSubStore) UpdateSub(sub *spb.SubState) error

UpdateSub implements the SubStore interface

type Store

type Store interface {
	// GetExclusiveLock is an advisory lock to prevent concurrent
	// access to the store from multiple instances.
	// This is not to protect individual API calls, instead, it
	// is meant to protect the store for the entire duration the
	// store is being used. This is why there is no `Unlock` API.
	// The lock should be released when the store is closed.
	//
	// If an exclusive lock can be immediately acquired (that is,
	// it should not block waiting for the lock to be acquired),
	// this call will return `true` with no error. Once a store
	// instance has acquired an exclusive lock, calling this
	// function has no effect and `true` with no error will again
	// be returned.
	//
	// If the lock cannot be acquired, this call will return
	// `false` with no error: the caller can try again later.
	//
	// If, however, the lock cannot be acquired due to a fatal
	// error, this call should return `false` and the error.
	//
	// It is important to note that the implementation should
	// make an effort to distinguish error conditions deemed
	// fatal (and therefore trying again would invariably result
	// in the same error) and those deemed transient, in which
	// case no error should be returned to indicate that the
	// caller could try later.
	//
	// Implementations that do not support exclusive locks should
	// return `false` and `ErrNotSupported`.
	GetExclusiveLock() (bool, error)

	// Init can be used to initialize the store with server's information.
	Init(info *spb.ServerInfo) error

	// Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc...).
	Name() string

	// Recover returns the recovered state.
	// Implementations that do not persist state and therefore cannot
	// recover from a previous run MUST return nil, not an error.
	// However, an error must be returned for implementations that are
	// attempting to recover the state but fail to do so.
	Recover() (*RecoveredState, error)

	// SetLimits sets limits for this store. The action is not expected
	// to be retroactive.
	// The store implementation should make a deep copy as to not change
	// the content of the structure passed by the caller.
	// This call may return an error due to limits validation errors.
	SetLimits(limits *StoreLimits) error

	// GetChannelLimits returns the limit for this channel. If the channel
	// does not exist, returns nil.
	GetChannelLimits(name string) *ChannelLimits

	// CreateChannel creates a Channel.
	// Implementations should return ErrAlreadyExists if the channel was
	// already created.
	// Limits defined for this channel in StoreLimits.PeChannel map, if present,
	// will apply. Otherwise, the global limits in StoreLimits will apply.
	CreateChannel(channel string) (*Channel, error)

	// DeleteChannel deletes a Channel.
	// Implementations should make sure that if no error is returned, the
	// channel would not be recovered after a restart, unless CreateChannel()
	// with the same channel is invoked.
	// If processing is expecting to be time consuming, work should be done
	// in the background as long as the above condition is guaranteed.
	// It is also acceptable for an implementation to have CreateChannel()
	// return an error if background deletion is still happening for a
	// channel of the same name.
	DeleteChannel(channel string) error

	// AddClient stores information about the client identified by `clientID`.
	AddClient(info *spb.ClientInfo) (*Client, error)

	// DeleteClient removes the client identified by `clientID` from the store.
	DeleteClient(clientID string) error

	// Close closes this store (including all MsgStore and SubStore).
	// If an exclusive lock was acquired, the lock shall be released.
	Close() error
}

Store is the storage interface for NATS Streaming servers.

If an implementation has a Store constructor with StoreLimits, it should be noted that the limits don't apply to any state being recovered, for Store implementations supporting recovery.

type StoreLimits added in v0.3.0

type StoreLimits struct {
	// How many channels are allowed.
	MaxChannels int `json:"max_channels"`
	// Global limits. Any 0 value means that the limit is ignored (unlimited).
	ChannelLimits
	// Per-channel limits. Special values for limits in this map:
	// - == 0 means that the corresponding global limit is used.
	// -  < 0 means that limit is ignored (unlimited).
	PerChannel map[string]*ChannelLimits `json:"channels,omitempty"`
}

StoreLimits define limits for a store.

func (*StoreLimits) AddPerChannel added in v0.3.0

func (sl *StoreLimits) AddPerChannel(name string, cl *ChannelLimits)

AddPerChannel stores limits for the given channel `name` in the StoreLimits. Inheritance (that is, specifying 0 for a limit means that the global limit should be used) is not applied in this call. This is done in StoreLimits.Build along with some validation.

func (*StoreLimits) Build added in v0.3.0

func (sl *StoreLimits) Build() error

Build sets the global limits into per-channel limits that are set to zero. This call also validates the limits. An error is returned if: * any global limit is set to a negative value. * the number of per-channel is higher than StoreLimits.MaxChannels. * a per-channel name is invalid

func (*StoreLimits) Clone added in v0.5.0

func (sl *StoreLimits) Clone() *StoreLimits

Clone returns a copy of the store limits

func (*StoreLimits) ClonePerChannelMap added in v0.6.0

func (sl *StoreLimits) ClonePerChannelMap() map[string]*ChannelLimits

ClonePerChannelMap returns a deep copy of the StoreLimits's PerChannel map

func (*StoreLimits) Print added in v0.5.0

func (sl *StoreLimits) Print() []string

Print returns an array of strings suitable for printing the store limits.

type SubStore

type SubStore interface {
	// CreateSub records a new subscription represented by SubState. On success,
	// it records the subscription's ID in SubState.ID. This ID is to be used
	// by the other SubStore methods.
	CreateSub(*spb.SubState) error

	// UpdateSub updates a given subscription represented by SubState.
	UpdateSub(*spb.SubState) error

	// DeleteSub invalidates the subscription 'subid'.
	DeleteSub(subid uint64) error

	// AddSeqPending adds the given message 'seqno' to the subscription 'subid'.
	AddSeqPending(subid, seqno uint64) error

	// AckSeqPending records that the given message 'seqno' has been acknowledged
	// by the subscription 'subid'.
	AckSeqPending(subid, seqno uint64) error

	// Flush is for stores that may buffer operations and need them to be persisted.
	Flush() error

	// Close closes the subscriptions store.
	Close() error
}

SubStore is the interface for storage of Subscriptions on a given channel.

Implementations of this interface should not attempt to validate that a subscription is valid (that is, has not been deleted) when processing updates.

type SubStoreLimits added in v0.3.0

type SubStoreLimits struct {
	// How many subscriptions are allowed.
	MaxSubscriptions int `json:"max_subscriptions"`
}

SubStoreLimits defines limits for a SubStore

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL