Documentation ¶
Index ¶
- Constants
- Variables
- func Noticef(format string, v ...interface{})
- type ChannelLimits
- type ChannelStore
- type Client
- type DelegateStore
- func (ms *DelegateStore) AddClient(clientID, hbInbox string, userData interface{}) (*Client, bool, error)
- func (ms *DelegateStore) Close() error
- func (ms *DelegateStore) CreateChannel(channel string, userData interface{}) (*ChannelStore, bool, error)
- func (ms *DelegateStore) DeleteClient(clientID string) *Client
- func (ms *DelegateStore) GetClient(clientID string) *Client
- func (ms *DelegateStore) GetClients() map[string]*Client
- func (ms *DelegateStore) GetClientsCount() int
- func (ms *DelegateStore) GetExclusiveLock() (bool, error)
- func (ms *DelegateStore) HasChannel() bool
- func (ms *DelegateStore) Init(si *spb.ServerInfo) error
- func (ms *DelegateStore) LookupChannel(channel string) *ChannelStore
- func (ms *DelegateStore) MsgsState(channel string) (numMessages int, byteSize uint64, err error)
- func (ms *DelegateStore) Name() string
- func (ms *DelegateStore) Recover() (*RecoveredState, error)
- func (ms *DelegateStore) SetLimits(limits *StoreLimits) error
- type FileMsgStore
- func (ms *FileMsgStore) Close() error
- func (gms *FileMsgStore) FirstAndLastSequence() (uint64, uint64)
- func (ms *FileMsgStore) FirstMsg() *pb.MsgProto
- func (gms *FileMsgStore) FirstSequence() uint64
- func (ms *FileMsgStore) Flush() error
- func (ms *FileMsgStore) GetSequenceFromTimestamp(timestamp int64) uint64
- func (ms *FileMsgStore) LastMsg() *pb.MsgProto
- func (gms *FileMsgStore) LastSequence() uint64
- func (ms *FileMsgStore) Lookup(seq uint64) *pb.MsgProto
- func (gms *FileMsgStore) State() (numMessages int, byteSize uint64, err error)
- func (ms *FileMsgStore) Store(data []byte) (uint64, error)
- type FileStore
- func (fs *FileStore) AddClient(clientID, hbInbox string, userData interface{}) (*Client, bool, error)
- func (fs *FileStore) Close() error
- func (fs *FileStore) CreateChannel(channel string, userData interface{}) (*ChannelStore, bool, error)
- func (fs *FileStore) DeleteClient(clientID string) *Client
- func (gs *FileStore) GetClient(clientID string) *Client
- func (gs *FileStore) GetClients() map[string]*Client
- func (gs *FileStore) GetClientsCount() int
- func (fs *FileStore) GetExclusiveLock() (bool, error)
- func (gs *FileStore) HasChannel() bool
- func (fs *FileStore) Init(info *spb.ServerInfo) error
- func (gs *FileStore) LookupChannel(channel string) *ChannelStore
- func (gs *FileStore) MsgsState(channel string) (numMessages int, byteSize uint64, err error)
- func (gs *FileStore) Name() string
- func (fs *FileStore) Recover() (*RecoveredState, error)
- func (gs *FileStore) SetLimits(limits *StoreLimits) error
- type FileStoreOption
- func AllOptions(opts *FileStoreOptions) FileStoreOption
- func BufferSize(size int) FileStoreOption
- func CRCPolynomial(polynomial int64) FileStoreOption
- func CompactEnabled(enabled bool) FileStoreOption
- func CompactFragmentation(fragmentation int) FileStoreOption
- func CompactInterval(seconds int) FileStoreOption
- func CompactMinFileSize(fileSize int64) FileStoreOption
- func DoCRC(enableCRC bool) FileStoreOption
- func DoSync(enableFileSync bool) FileStoreOption
- func FileDescriptorsLimit(limit int64) FileStoreOption
- func SliceConfig(maxMsgs int, maxBytes int64, maxAge time.Duration, script string) FileStoreOption
- type FileStoreOptions
- type FileSubStore
- func (ss *FileSubStore) AckSeqPending(subid, seqno uint64) error
- func (ss *FileSubStore) AddSeqPending(subid, seqno uint64) error
- func (ss *FileSubStore) Close() error
- func (ss *FileSubStore) CreateSub(sub *spb.SubState) error
- func (ss *FileSubStore) DeleteSub(subid uint64)
- func (ss *FileSubStore) Flush() error
- func (ss *FileSubStore) UpdateSub(sub *spb.SubState) error
- type MemoryMsgStore
- func (ms *MemoryMsgStore) Close() error
- func (gms *MemoryMsgStore) FirstAndLastSequence() (uint64, uint64)
- func (ms *MemoryMsgStore) FirstMsg() *pb.MsgProto
- func (gms *MemoryMsgStore) FirstSequence() uint64
- func (gms *MemoryMsgStore) Flush() error
- func (ms *MemoryMsgStore) GetSequenceFromTimestamp(timestamp int64) uint64
- func (ms *MemoryMsgStore) LastMsg() *pb.MsgProto
- func (gms *MemoryMsgStore) LastSequence() uint64
- func (ms *MemoryMsgStore) Lookup(seq uint64) *pb.MsgProto
- func (gms *MemoryMsgStore) State() (numMessages int, byteSize uint64, err error)
- func (ms *MemoryMsgStore) Store(data []byte) (uint64, error)
- type MemoryStore
- func (gs *MemoryStore) AddClient(clientID, hbInbox string, userData interface{}) (*Client, bool, error)
- func (gs *MemoryStore) Close() error
- func (ms *MemoryStore) CreateChannel(channel string, userData interface{}) (*ChannelStore, bool, error)
- func (gs *MemoryStore) DeleteClient(clientID string) *Client
- func (gs *MemoryStore) GetClient(clientID string) *Client
- func (gs *MemoryStore) GetClients() map[string]*Client
- func (gs *MemoryStore) GetClientsCount() int
- func (gs *MemoryStore) GetExclusiveLock() (bool, error)
- func (gs *MemoryStore) HasChannel() bool
- func (gs *MemoryStore) Init(info *spb.ServerInfo) error
- func (gs *MemoryStore) LookupChannel(channel string) *ChannelStore
- func (gs *MemoryStore) MsgsState(channel string) (numMessages int, byteSize uint64, err error)
- func (gs *MemoryStore) Name() string
- func (gs *MemoryStore) Recover() (*RecoveredState, error)
- func (gs *MemoryStore) SetLimits(limits *StoreLimits) error
- type MemorySubStore
- func (*MemorySubStore) AckSeqPending(subid, seqno uint64) error
- func (*MemorySubStore) AddSeqPending(subid, seqno uint64) error
- func (gss *MemorySubStore) Close() error
- func (gss *MemorySubStore) CreateSub(sub *spb.SubState) error
- func (gss *MemorySubStore) DeleteSub(subid uint64)
- func (gss *MemorySubStore) Flush() error
- func (gss *MemorySubStore) UpdateSub(sub *spb.SubState) error
- type MsgStore
- type MsgStoreLimits
- type PendingAcks
- type RecoveredState
- type RecoveredSubState
- type RecoveredSubscriptions
- type Store
- type StoreLimits
- type SubStore
- type SubStoreLimits
Constants ¶
const ( // TypeMemory is the store type name for memory based stores TypeMemory = "MEMORY" // TypeFile is the store type name for file based stores TypeFile = "FILE" )
const (
// AllChannels allows to get state for all channels.
AllChannels = "*"
)
Variables ¶
var ( ErrTooManyChannels = errors.New("too many channels") ErrTooManySubs = errors.New("too many subscriptions per channel") ErrNotSupported = errors.New("not supported") )
Errors.
var DefaultFileStoreOptions = FileStoreOptions{ BufferSize: 2 * 1024 * 1024, CompactEnabled: true, CompactInterval: 5 * 60, CompactFragmentation: 50, CompactMinFileSize: 1024 * 1024, DoCRC: true, CRCPolynomial: int64(crc32.IEEE), DoSync: true, SliceMaxBytes: 64 * 1024 * 1024, }
DefaultFileStoreOptions defines the default options for a File Store.
var DefaultStoreLimits = StoreLimits{ 100, ChannelLimits{ MsgStoreLimits{ MaxMsgs: 1000000, MaxBytes: 1000000 * 1024, }, SubStoreLimits{ MaxSubscriptions: 1000, }, }, nil, }
DefaultStoreLimits are the limits that a Store must use when none are specified to the Store constructor. Store limits can be changed with the Store.SetLimits() method.
Functions ¶
Types ¶
type ChannelLimits ¶
type ChannelLimits struct { // Limits for message stores MsgStoreLimits // Limits for subscriptions stores SubStoreLimits }
ChannelLimits defines limits for a given channel
type ChannelStore ¶
type ChannelStore struct { // UserData is set when the channel is created. UserData interface{} // Subs is the Subscriptions Store. Subs SubStore // Msgs is the Messages Store. Msgs MsgStore }
ChannelStore contains a reference to both Subscription and Message stores.
type Client ¶
type Client struct { spb.ClientInfo UserData interface{} }
Client represents a client with ID, Heartbeat Inbox and user data sets when adding it to the store.
type DelegateStore ¶ added in v0.4.0
type DelegateStore struct {
S Store
}
DelegateStore delegates all the Store's APIs calls to the store it was given when created. This is used by tests programs trying to create a mocked store with the ability to override some of the APIs. To do so, they would create an object that embeds a DelegateStore and simply override the API of their choice.
func (*DelegateStore) AddClient ¶ added in v0.4.0
func (ms *DelegateStore) AddClient(clientID, hbInbox string, userData interface{}) (*Client, bool, error)
AddClient implements the Store interface
func (*DelegateStore) Close ¶ added in v0.4.0
func (ms *DelegateStore) Close() error
Close implements the Store interface
func (*DelegateStore) CreateChannel ¶ added in v0.4.0
func (ms *DelegateStore) CreateChannel(channel string, userData interface{}) (*ChannelStore, bool, error)
CreateChannel implements the Store interface
func (*DelegateStore) DeleteClient ¶ added in v0.4.0
func (ms *DelegateStore) DeleteClient(clientID string) *Client
DeleteClient implements the Store interface
func (*DelegateStore) GetClient ¶ added in v0.4.0
func (ms *DelegateStore) GetClient(clientID string) *Client
GetClient implements the Store interface
func (*DelegateStore) GetClients ¶ added in v0.4.0
func (ms *DelegateStore) GetClients() map[string]*Client
GetClients implements the Store interface
func (*DelegateStore) GetClientsCount ¶ added in v0.4.0
func (ms *DelegateStore) GetClientsCount() int
GetClientsCount implements the Store interface
func (*DelegateStore) GetExclusiveLock ¶ added in v0.4.0
func (ms *DelegateStore) GetExclusiveLock() (bool, error)
GetExclusiveLock implements the Store interface
func (*DelegateStore) HasChannel ¶ added in v0.4.0
func (ms *DelegateStore) HasChannel() bool
HasChannel implements the Store interface
func (*DelegateStore) Init ¶ added in v0.4.0
func (ms *DelegateStore) Init(si *spb.ServerInfo) error
Init implements the Store interface
func (*DelegateStore) LookupChannel ¶ added in v0.4.0
func (ms *DelegateStore) LookupChannel(channel string) *ChannelStore
LookupChannel implements the Store interface
func (*DelegateStore) MsgsState ¶ added in v0.4.0
func (ms *DelegateStore) MsgsState(channel string) (numMessages int, byteSize uint64, err error)
MsgsState implements the Store interface
func (*DelegateStore) Name ¶ added in v0.4.0
func (ms *DelegateStore) Name() string
Name implements the Store interface
func (*DelegateStore) Recover ¶ added in v0.4.0
func (ms *DelegateStore) Recover() (*RecoveredState, error)
Recover implements the Store interface
func (*DelegateStore) SetLimits ¶ added in v0.4.0
func (ms *DelegateStore) SetLimits(limits *StoreLimits) error
SetLimits implements the Store interface
type FileMsgStore ¶
type FileMsgStore struct {
// contains filtered or unexported fields
}
FileMsgStore is a per channel message file store.
func (*FileMsgStore) FirstAndLastSequence ¶
FirstAndLastSequence returns sequences for the first and last messages stored.
func (*FileMsgStore) FirstMsg ¶
func (ms *FileMsgStore) FirstMsg() *pb.MsgProto
FirstMsg returns the first message stored.
func (*FileMsgStore) FirstSequence ¶
func (gms *FileMsgStore) FirstSequence() uint64
FirstSequence returns sequence for first message stored.
func (*FileMsgStore) Flush ¶ added in v0.2.0
func (ms *FileMsgStore) Flush() error
Flush flushes outstanding data into the store.
func (*FileMsgStore) GetSequenceFromTimestamp ¶
func (ms *FileMsgStore) GetSequenceFromTimestamp(timestamp int64) uint64
GetSequenceFromTimestamp returns the sequence of the first message whose timestamp is greater or equal to given timestamp.
func (*FileMsgStore) LastMsg ¶
func (ms *FileMsgStore) LastMsg() *pb.MsgProto
LastMsg returns the last message stored.
func (*FileMsgStore) LastSequence ¶
func (gms *FileMsgStore) LastSequence() uint64
LastSequence returns sequence for last message stored.
func (*FileMsgStore) Lookup ¶
func (ms *FileMsgStore) Lookup(seq uint64) *pb.MsgProto
Lookup returns the stored message with given sequence number.
type FileStore ¶
type FileStore struct {
// contains filtered or unexported fields
}
FileStore is the storage interface for STAN servers, backed by files.
func NewFileStore ¶
func NewFileStore(rootDir string, limits *StoreLimits, options ...FileStoreOption) (*FileStore, error)
NewFileStore returns a factory for stores backed by files. If not limits are provided, the store will be created with DefaultStoreLimits.
func (*FileStore) AddClient ¶
func (fs *FileStore) AddClient(clientID, hbInbox string, userData interface{}) (*Client, bool, error)
AddClient stores information about the client identified by `clientID`.
func (*FileStore) CreateChannel ¶
func (fs *FileStore) CreateChannel(channel string, userData interface{}) (*ChannelStore, bool, error)
CreateChannel creates a ChannelStore for the given channel, and returns `true` to indicate that the channel is new, false if it already exists.
func (*FileStore) DeleteClient ¶
DeleteClient invalidates the client identified by `clientID`.
func (*FileStore) GetClients ¶
GetClients returns all stored Client objects, as a map keyed by client IDs.
func (*FileStore) GetClientsCount ¶
func (gs *FileStore) GetClientsCount() int
GetClientsCount returns the number of registered clients
func (*FileStore) GetExclusiveLock ¶ added in v0.4.0
GetExclusiveLock implements the Store interface
func (*FileStore) HasChannel ¶
func (gs *FileStore) HasChannel() bool
HasChannel returns true if this store has any channel
func (*FileStore) Init ¶
func (fs *FileStore) Init(info *spb.ServerInfo) error
Init is used to persist server's information after the first start
func (*FileStore) LookupChannel ¶
func (gs *FileStore) LookupChannel(channel string) *ChannelStore
LookupChannel returns a ChannelStore for the given channel.
func (*FileStore) MsgsState ¶
State returns message store statistics for a given channel ('*' for all)
func (*FileStore) Name ¶
func (gs *FileStore) Name() string
Name returns the type name of this store
func (*FileStore) Recover ¶ added in v0.4.0
func (fs *FileStore) Recover() (*RecoveredState, error)
Recover implements the Store interface
func (*FileStore) SetLimits ¶ added in v0.3.0
func (gs *FileStore) SetLimits(limits *StoreLimits) error
SetLimits sets limits for this store
type FileStoreOption ¶
type FileStoreOption func(*FileStoreOptions) error
FileStoreOption is a function on the options for a File Store
func AllOptions ¶
func AllOptions(opts *FileStoreOptions) FileStoreOption
AllOptions is a convenient option to pass all options from a FileStoreOptions structure to the constructor.
func BufferSize ¶ added in v0.2.0
func BufferSize(size int) FileStoreOption
BufferSize is a FileStore option that sets the size of the buffer used during store writes. This can help improve write performance.
func CRCPolynomial ¶ added in v0.2.0
func CRCPolynomial(polynomial int64) FileStoreOption
CRCPolynomial is a FileStore option that defines the polynomial to use to create the table used for CRC-32 Checksum. See https://golang.org/pkg/hash/crc32/#MakeTable
func CompactEnabled ¶
func CompactEnabled(enabled bool) FileStoreOption
CompactEnabled is a FileStore option that enables or disables file compaction. The value false will disable compaction.
func CompactFragmentation ¶
func CompactFragmentation(fragmentation int) FileStoreOption
CompactFragmentation is a FileStore option that defines the fragmentation ratio below which compaction would not occur. For instance, specifying 50 means that if other variables would allow for compaction, the compaction would occur only after 50% of the file has data that is no longer valid.
func CompactInterval ¶
func CompactInterval(seconds int) FileStoreOption
CompactInterval is a FileStore option that defines the minimum compaction interval. Compaction is not timer based, but instead when things get "deleted". This value prevents compaction to happen too often.
func CompactMinFileSize ¶
func CompactMinFileSize(fileSize int64) FileStoreOption
CompactMinFileSize is a FileStore option that defines the minimum file size below which compaction would not occur. Specify `-1` if you don't want any minimum.
func DoCRC ¶ added in v0.2.0
func DoCRC(enableCRC bool) FileStoreOption
DoCRC is a FileStore option that defines if a CRC checksum verification should be performed when records are read from disk.
func DoSync ¶ added in v0.2.2
func DoSync(enableFileSync bool) FileStoreOption
DoSync is a FileStore option that defines if `File.Sync()` should be called during a `Flush()` call.
func FileDescriptorsLimit ¶ added in v0.4.0
func FileDescriptorsLimit(limit int64) FileStoreOption
FileDescriptorsLimit is a soft limit hinting at FileStore to try to limit the number of concurrent opened files to that limit.
func SliceConfig ¶ added in v0.3.4
SliceConfig is a FileStore option that allows the configuration of file slice limits and optional archive script file name.
type FileStoreOptions ¶
type FileStoreOptions struct { // BufferSize is the size of the buffer used during store operations. BufferSize int // CompactEnabled allows to enable/disable files compaction. CompactEnabled bool // CompactInterval indicates the minimum interval (in seconds) between compactions. CompactInterval int // CompactFragmentation indicates the minimum ratio of fragmentation // to trigger compaction. For instance, 50 means that compaction // would not happen until fragmentation is more than 50%. CompactFragmentation int // CompactMinFileSize indicates the minimum file size before compaction // can be performed, regardless of the current file fragmentation. CompactMinFileSize int64 // DoCRC enables (or disables) CRC checksum verification on read operations. DoCRC bool // CRCPoly is a polynomial used to make the table used in CRC computation. CRCPolynomial int64 // DoSync indicates if `File.Sync()“ is called during a flush. DoSync bool // Regardless of channel limits, the options below allow to split a message // log in smaller file chunks. If all those options were to be set to 0, // some file slice limit will be selected automatically based on the channel // limits. // SliceMaxMsgs defines how many messages can fit in a file slice (0 means // count is not checked). SliceMaxMsgs int // SliceMaxBytes defines how many bytes can fit in a file slice, including // the corresponding index file (0 means size is not checked). SliceMaxBytes int64 // SliceMaxAge defines the period of time covered by a slice starting when // the first message is stored (0 means time is not checked). SliceMaxAge time.Duration // SliceArchiveScript is the path to a script to be invoked when a file // slice (and the corresponding index file) is going to be removed. // The script will be invoked with the channel name and names of data and // index files (which both have been previously renamed with a '.bak' // extension). It is the responsibility of the script to move/remove // those files. SliceArchiveScript string // FileDescriptorsLimit is a soft limit hinting at FileStore to try to // limit the number of concurrent opened files to that limit. FileDescriptorsLimit int64 }
FileStoreOptions can be used to customize a File Store
type FileSubStore ¶
type FileSubStore struct {
// contains filtered or unexported fields
}
FileSubStore is a subscription store in files.
func (*FileSubStore) AckSeqPending ¶
func (ss *FileSubStore) AckSeqPending(subid, seqno uint64) error
AckSeqPending records that the given message seqno has been acknowledged by the given subscription.
func (*FileSubStore) AddSeqPending ¶
func (ss *FileSubStore) AddSeqPending(subid, seqno uint64) error
AddSeqPending adds the given message seqno to the given subscription.
func (*FileSubStore) CreateSub ¶
func (ss *FileSubStore) CreateSub(sub *spb.SubState) error
CreateSub records a new subscription represented by SubState. On success, it returns an id that is used by the other methods.
func (*FileSubStore) DeleteSub ¶
func (ss *FileSubStore) DeleteSub(subid uint64)
DeleteSub invalidates this subscription.
func (*FileSubStore) Flush ¶ added in v0.2.0
func (ss *FileSubStore) Flush() error
Flush persists buffered operations to disk.
type MemoryMsgStore ¶
type MemoryMsgStore struct {
// contains filtered or unexported fields
}
MemoryMsgStore is a per channel message store in memory
func (*MemoryMsgStore) Close ¶
func (ms *MemoryMsgStore) Close() error
Close implements the MsgStore interface
func (*MemoryMsgStore) FirstAndLastSequence ¶
FirstAndLastSequence returns sequences for the first and last messages stored.
func (*MemoryMsgStore) FirstMsg ¶
func (ms *MemoryMsgStore) FirstMsg() *pb.MsgProto
FirstMsg returns the first message stored.
func (*MemoryMsgStore) FirstSequence ¶
func (gms *MemoryMsgStore) FirstSequence() uint64
FirstSequence returns sequence for first message stored.
func (*MemoryMsgStore) GetSequenceFromTimestamp ¶
func (ms *MemoryMsgStore) GetSequenceFromTimestamp(timestamp int64) uint64
GetSequenceFromTimestamp returns the sequence of the first message whose timestamp is greater or equal to given timestamp.
func (*MemoryMsgStore) LastMsg ¶
func (ms *MemoryMsgStore) LastMsg() *pb.MsgProto
LastMsg returns the last message stored.
func (*MemoryMsgStore) LastSequence ¶
func (gms *MemoryMsgStore) LastSequence() uint64
LastSequence returns sequence for last message stored.
func (*MemoryMsgStore) Lookup ¶
func (ms *MemoryMsgStore) Lookup(seq uint64) *pb.MsgProto
Lookup returns the stored message with given sequence number.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is a factory for message and subscription stores.
func NewMemoryStore ¶
func NewMemoryStore(limits *StoreLimits) (*MemoryStore, error)
NewMemoryStore returns a factory for stores held in memory. If not limits are provided, the store will be created with DefaultStoreLimits.
func (*MemoryStore) AddClient ¶
func (gs *MemoryStore) AddClient(clientID, hbInbox string, userData interface{}) (*Client, bool, error)
AddClient stores information about the client identified by `clientID`.
func (*MemoryStore) CreateChannel ¶
func (ms *MemoryStore) CreateChannel(channel string, userData interface{}) (*ChannelStore, bool, error)
CreateChannel creates a ChannelStore for the given channel, and returns `true` to indicate that the channel is new, false if it already exists.
func (*MemoryStore) DeleteClient ¶
DeleteClient deletes the client identified by `clientID`.
func (*MemoryStore) GetClients ¶
GetClients returns all stored Client objects, as a map keyed by client IDs.
func (*MemoryStore) GetClientsCount ¶
func (gs *MemoryStore) GetClientsCount() int
GetClientsCount returns the number of registered clients
func (*MemoryStore) GetExclusiveLock ¶ added in v0.4.0
GetExclusiveLock implements the Store interface.
func (*MemoryStore) HasChannel ¶
func (gs *MemoryStore) HasChannel() bool
HasChannel returns true if this store has any channel
func (*MemoryStore) Init ¶
func (gs *MemoryStore) Init(info *spb.ServerInfo) error
Init can be used to initialize the store with server's information.
func (*MemoryStore) LookupChannel ¶
func (gs *MemoryStore) LookupChannel(channel string) *ChannelStore
LookupChannel returns a ChannelStore for the given channel.
func (*MemoryStore) MsgsState ¶
State returns message store statistics for a given channel ('*' for all)
func (*MemoryStore) Name ¶
func (gs *MemoryStore) Name() string
Name returns the type name of this store
func (*MemoryStore) Recover ¶ added in v0.4.0
func (gs *MemoryStore) Recover() (*RecoveredState, error)
Recover implements the Store interface.
func (*MemoryStore) SetLimits ¶ added in v0.3.0
func (gs *MemoryStore) SetLimits(limits *StoreLimits) error
SetLimits sets limits for this store
type MemorySubStore ¶
type MemorySubStore struct {
// contains filtered or unexported fields
}
MemorySubStore is a subscription store in memory
func (*MemorySubStore) AckSeqPending ¶
func (*MemorySubStore) AckSeqPending(subid, seqno uint64) error
AckSeqPending records that the given message seqno has been acknowledged by the given subscription.
func (*MemorySubStore) AddSeqPending ¶
func (*MemorySubStore) AddSeqPending(subid, seqno uint64) error
AddSeqPending adds the given message seqno to the given subscription.
func (*MemorySubStore) CreateSub ¶
CreateSub records a new subscription represented by SubState. On success, it records the subscription's ID in SubState.ID. This ID is to be used by the other SubStore methods.
func (*MemorySubStore) DeleteSub ¶
func (gss *MemorySubStore) DeleteSub(subid uint64)
DeleteSub invalidates this subscription.
type MsgStore ¶
type MsgStore interface { // State returns some statistics related to this store. State() (numMessages int, byteSize uint64, err error) // Store stores a message and returns the message sequence. Store(data []byte) (uint64, error) // Lookup returns the stored message with given sequence number. Lookup(seq uint64) *pb.MsgProto // FirstSequence returns sequence for first message stored, 0 if no // message is stored. FirstSequence() uint64 // LastSequence returns sequence for last message stored, 0 if no // message is stored. LastSequence() uint64 // FirstAndLastSequence returns sequences for the first and last messages stored, // 0 if no message is stored. FirstAndLastSequence() (uint64, uint64) // GetSequenceFromTimestamp returns the sequence of the first message whose // timestamp is greater or equal to given timestamp. GetSequenceFromTimestamp(timestamp int64) uint64 // FirstMsg returns the first message stored. FirstMsg() *pb.MsgProto // LastMsg returns the last message stored. LastMsg() *pb.MsgProto // Flush is for stores that may buffer operations and need them to be persisted. Flush() error // Close closes the store. Close() error }
MsgStore is the interface for storage of Messages on a given channel.
type MsgStoreLimits ¶ added in v0.3.0
type MsgStoreLimits struct { // How many messages are allowed. MaxMsgs int // How many bytes are allowed. MaxBytes int64 // How long messages are kept in the log (unit is seconds) MaxAge time.Duration }
MsgStoreLimits defines limits for a MsgStore. For global limits, a value of 0 means "unlimited". For per-channel limits, it means that the corresponding global limit is used.
type PendingAcks ¶
type PendingAcks map[uint64]struct{}
PendingAcks is a set of message sequences waiting to be acknowledged.
type RecoveredState ¶
type RecoveredState struct { Info *spb.ServerInfo Clients []*Client Subs RecoveredSubscriptions }
RecoveredState allows the server to reconstruct its state after a restart.
type RecoveredSubState ¶
type RecoveredSubState struct { Sub *spb.SubState Pending PendingAcks }
RecoveredSubState represents a recovered Subscription with a map of pending messages.
type RecoveredSubscriptions ¶
type RecoveredSubscriptions map[string][]*RecoveredSubState
RecoveredSubscriptions is a map of recovered subscriptions, keyed by channel name.
type Store ¶
type Store interface { // GetExclusiveLock is an advisory lock to prevent concurrent // access to the store from multiple instances. // This is not to protect individual API calls, instead, it // is meant to protect the store for the entire duration the // store is being used. This is why there is no `Unlock` API. // The lock should be released when the store is closed. // // If an exclusive lock can be immediately acquired (that is, // it should not block waiting for the lock to be acquired), // this call will return `true` with no error. Once a store // instance has acquired an exclusive lock, calling this // function has no effect and `true` with no error will again // be returned. // // If the lock cannot be acquired, this call will return // `false` with no error: the caller can try again later. // // If, however, the lock cannot be acquired due to a fatal // error, this call should return `false` and the error. // // It is important to note that the implementation should // make an effort to distinguish error conditions deemed // fatal (and therefore trying again would invariabily result // in the same error) and those deemed transient, in which // case no error should be returned to indicate that the // caller could try later. // // Implementations that do not support exclusive locks should // return `false` and `ErrNotSupported`. GetExclusiveLock() (bool, error) // Init can be used to initialize the store with server's information. Init(info *spb.ServerInfo) error // Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc...). Name() string // Recover returns the recovered state. // Implementations that do not persist state and therefore cannot // recover from a previous run MUST return nil, not an error. // However, an error must be returned for implementions that are // attempting to recover the state but fail to do so. Recover() (*RecoveredState, error) // SetLimits sets limits for this store. The action is not expected // to be retroactive. // The store implementation should make a deep copy as to not change // the content of the structure passed by the caller. // This call may return an error due to limits validation errors. SetLimits(limits *StoreLimits) error // CreateChannel creates a ChannelStore for the given channel, and returns // `true` to indicate that the channel is new, false if it already exists. // Limits defined for this channel in StoreLimits.PeChannel map, if present, // will apply. Otherwise, the global limits in StoreLimits will apply. CreateChannel(channel string, userData interface{}) (*ChannelStore, bool, error) // LookupChannel returns a ChannelStore for the given channel, nil if channel // does not exist. LookupChannel(channel string) *ChannelStore // HasChannel returns true if this store has any channel. HasChannel() bool // MsgsState returns message store statistics for a given channel, or all // if 'channel' is AllChannels. MsgsState(channel string) (numMessages int, byteSize uint64, err error) // AddClient stores information about the client identified by `clientID`. // If a Client is already registered, this call returns the currently // registered Client object, and the boolean set to false to indicate // that the client is not new. AddClient(clientID, hbInbox string, userData interface{}) (*Client, bool, error) // GetClient returns the stored Client, or nil if it does not exist. GetClient(clientID string) *Client // GetClients returns a map of all stored Client objects, keyed by client IDs. // The returned map is a copy of the state maintained by the store so that // it is safe for the caller to walk through the map while clients may be // added/deleted from the store. GetClients() map[string]*Client // GetClientsCount returns the number of registered clients. GetClientsCount() int // DeleteClient removes the client identified by `clientID` from the store // and returns it to the caller. DeleteClient(clientID string) *Client // Close closes this store (including all MsgStore and SubStore). // If an exlusive lock was acquired, the lock shall be released. Close() error }
Store is the storage interface for NATS Streaming servers.
If an implementation has a Store constructor with StoreLimits, it should be noted that the limits don't apply to any state being recovered, for Store implementations supporting recovery.
type StoreLimits ¶ added in v0.3.0
type StoreLimits struct { // How many channels are allowed. MaxChannels int // Global limits. Any 0 value means that the limit is ignored (unlimited). ChannelLimits // Per-channel limits. If a limit for a channel in this map is 0, // the corresponding global limit (specified above) is used. PerChannel map[string]*ChannelLimits }
StoreLimits define limits for a store.
func (*StoreLimits) AddPerChannel ¶ added in v0.3.0
func (sl *StoreLimits) AddPerChannel(name string, cl *ChannelLimits)
AddPerChannel stores limits for the given channel `name` in the StoreLimits. Inheritance (that is, specifying 0 for a limit means that the global limit should be used) is not applied in this call. This is done in StoreLimits.Build along with some validation.
func (*StoreLimits) Build ¶ added in v0.3.0
func (sl *StoreLimits) Build() error
Build sets the global limits into per-channel limits that are set to zero. This call also validates the limits. An error is returned if: * any limit is set to a negative value. * the number of per-channel is higher than StoreLimits.MaxChannels. * any per-channel limit is higher than the corresponding global limit.
type SubStore ¶
type SubStore interface { // CreateSub records a new subscription represented by SubState. On success, // it records the subscription's ID in SubState.ID. This ID is to be used // by the other SubStore methods. CreateSub(*spb.SubState) error // UpdateSub updates a given subscription represented by SubState. UpdateSub(*spb.SubState) error // DeleteSub invalidates the subscription 'subid'. DeleteSub(subid uint64) // AddSeqPending adds the given message 'seqno' to the subscription 'subid'. AddSeqPending(subid, seqno uint64) error // AckSeqPending records that the given message 'seqno' has been acknowledged // by the subscription 'subid'. AckSeqPending(subid, seqno uint64) error // Flush is for stores that may buffer operations and need them to be persisted. Flush() error // Close closes the subscriptions store. Close() error }
SubStore is the interface for storage of Subscriptions on a given channel.
Implementations of this interface should not attempt to validate that a subscription is valid (that is, has not been deleted) when processing updates.
type SubStoreLimits ¶ added in v0.3.0
type SubStoreLimits struct { // How many subscriptions are allowed. MaxSubscriptions int }
SubStoreLimits defines limits for a SubStore