stores

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2016 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TypeMemory is the store type name for memory based stores
	TypeMemory = "MEMORY"
	// TypeFile is the store type name for file based stores
	TypeFile = "FILE"
)
View Source
const (
	// AllChannels allows to get state for all channels.
	AllChannels = "*"
)

Variables

View Source
var (
	ErrTooManyChannels = errors.New("too many channels")
	ErrTooManySubs     = errors.New("too many subscriptions per channel")
)

Errors.

View Source
var DefaultChannelLimits = ChannelLimits{
	MaxChannels: 100,
	MaxNumMsgs:  1000000,
	MaxMsgBytes: 1000000 * 1024,
	MaxSubs:     1000,
}

DefaultChannelLimits are the channel limits that a Store must use when none are specified to the Store constructor. Store limits can be changed with the Store.SetChannelLimits() method.

View Source
var DefaultFileStoreOptions = FileStoreOptions{
	BufferSize:           2 * 1024 * 1024,
	CompactEnabled:       true,
	CompactInterval:      5 * 60,
	CompactFragmentation: 50,
	CompactMinFileSize:   1024 * 1024,
	DoCRC:                true,
	CRCPolynomial:        int64(crc32.IEEE),
}

DefaultFileStoreOptions defines the default options for a File Store.

Functions

func NewFileStore

func NewFileStore(rootDir string, limits *ChannelLimits, options ...FileStoreOption) (*FileStore, *RecoveredState, error)

NewFileStore returns a factory for stores backed by files, and recovers any state present. If not limits are provided, the store will be created with DefaultChannelLimits.

func Noticef

func Noticef(format string, v ...interface{})

Noticef logs a notice statement

Types

type ChannelLimits

type ChannelLimits struct {
	// How many channels are allowed.
	MaxChannels int
	// How many messages per channel are allowed.
	MaxNumMsgs int
	// How many bytes (messages payloads) per channel are allowed.
	MaxMsgBytes uint64
	// How old messages on a channel can be before being removed.
	MaxMsgAge time.Duration
	// How many subscriptions per channel are allowed.
	MaxSubs int
}

ChannelLimits defines some limits on the store interface

type ChannelStore

type ChannelStore struct {
	// UserData is set when the channel is created.
	UserData interface{}
	// Subs is the Subscriptions Store.
	Subs SubStore
	// Msgs is the Messages Store.
	Msgs MsgStore
}

ChannelStore contains a reference to both Subscription and Message stores.

type Client

type Client struct {
	spb.ClientInfo
	UserData interface{}
}

Client represents a client with ID, Heartbeat Inbox and user data sets when adding it to the store.

type FileMsgStore

type FileMsgStore struct {
	// contains filtered or unexported fields
}

FileMsgStore is a per channel message file store.

func (*FileMsgStore) Close

func (ms *FileMsgStore) Close() error

Close closes the store.

func (*FileMsgStore) FirstAndLastSequence

func (gms *FileMsgStore) FirstAndLastSequence() (uint64, uint64)

FirstAndLastSequence returns sequences for the first and last messages stored.

func (*FileMsgStore) FirstMsg

func (gms *FileMsgStore) FirstMsg() *pb.MsgProto

FirstMsg returns the first message stored.

func (*FileMsgStore) FirstSequence

func (gms *FileMsgStore) FirstSequence() uint64

FirstSequence returns sequence for first message stored.

func (*FileMsgStore) Flush added in v0.2.0

func (ms *FileMsgStore) Flush() error

Flush flushes outstanding data into the store.

func (*FileMsgStore) GetSequenceFromTimestamp

func (gms *FileMsgStore) GetSequenceFromTimestamp(timestamp int64) uint64

GetSequenceFromTimestamp returns the sequence of the first message whose timestamp is greater or equal to given timestamp.

func (*FileMsgStore) LastMsg

func (gms *FileMsgStore) LastMsg() *pb.MsgProto

LastMsg returns the last message stored.

func (*FileMsgStore) LastSequence

func (gms *FileMsgStore) LastSequence() uint64

LastSequence returns sequence for last message stored.

func (*FileMsgStore) Lookup

func (gms *FileMsgStore) Lookup(seq uint64) *pb.MsgProto

Lookup returns the stored message with given sequence number.

func (*FileMsgStore) State

func (gms *FileMsgStore) State() (numMessages int, byteSize uint64, err error)

State returns some statistics related to this store

func (*FileMsgStore) Store

func (ms *FileMsgStore) Store(reply string, data []byte) (*pb.MsgProto, error)

Store a given message.

type FileStore

type FileStore struct {
	// contains filtered or unexported fields
}

FileStore is the storage interface for STAN servers, backed by files.

func (*FileStore) AddClient

func (fs *FileStore) AddClient(clientID, hbInbox string, userData interface{}) (*Client, bool, error)

AddClient stores information about the client identified by `clientID`.

func (*FileStore) Close

func (fs *FileStore) Close() error

Close closes all stores.

func (*FileStore) CreateChannel

func (fs *FileStore) CreateChannel(channel string, userData interface{}) (*ChannelStore, bool, error)

CreateChannel creates a ChannelStore for the given channel, and returns `true` to indicate that the channel is new, false if it already exists.

func (*FileStore) DeleteClient

func (fs *FileStore) DeleteClient(clientID string) *Client

DeleteClient invalidates the client identified by `clientID`.

func (*FileStore) GetClient

func (gs *FileStore) GetClient(clientID string) *Client

GetClient returns the stored Client, or nil if it does not exist.

func (*FileStore) GetClients

func (gs *FileStore) GetClients() map[string]*Client

GetClients returns all stored Client objects, as a map keyed by client IDs.

func (*FileStore) GetClientsCount

func (gs *FileStore) GetClientsCount() int

GetClientsCount returns the number of registered clients

func (*FileStore) HasChannel

func (gs *FileStore) HasChannel() bool

HasChannel returns true if this store has any channel

func (*FileStore) Init

func (fs *FileStore) Init(info *spb.ServerInfo) error

Init is used to persist server's information after the first start

func (*FileStore) LookupChannel

func (gs *FileStore) LookupChannel(channel string) *ChannelStore

LookupChannel returns a ChannelStore for the given channel.

func (*FileStore) MsgsState

func (gs *FileStore) MsgsState(channel string) (numMessages int, byteSize uint64, err error)

State returns message store statistics for a given channel ('*' for all)

func (*FileStore) Name

func (gs *FileStore) Name() string

Name returns the type name of this store

func (*FileStore) SetChannelLimits

func (gs *FileStore) SetChannelLimits(limits ChannelLimits)

SetChannelLimits sets the limit for the messages and subscriptions stores.

type FileStoreOption

type FileStoreOption func(*FileStoreOptions) error

FileStoreOption is a function on the options for a File Store

func AllOptions

func AllOptions(opts *FileStoreOptions) FileStoreOption

AllOptions is a convenient option to pass all options from a FileStoreOptions structure to the constructor.

func BufferSize added in v0.2.0

func BufferSize(size int) FileStoreOption

BufferSize is a FileStore option that sets the size of the buffer used during store writes. This can help improve write performance.

func CRCPolynomial added in v0.2.0

func CRCPolynomial(polynomial int64) FileStoreOption

CRCPolynomial is a FileStore option that defines the polynomial to use to create the table used for CRC-32 Checksum. See https://golang.org/pkg/hash/crc32/#MakeTable

func CompactEnabled

func CompactEnabled(enabled bool) FileStoreOption

CompactEnabled is a FileStore option that enables or disables file compaction. The value false will disable compaction.

func CompactFragmentation

func CompactFragmentation(fragmentation int) FileStoreOption

CompactFragmentation is a FileStore option that defines the fragmentation ratio below which compaction would not occur. For instance, specifying 50 means that if other variables would allow for compaction, the compaction would occur only after 50% of the file has data that is no longer valid.

func CompactInterval

func CompactInterval(seconds int) FileStoreOption

CompactInterval is a FileStore option that defines the minimum compaction interval. Compaction is not timer based, but instead when things get "deleted". This value prevents compaction to happen too often.

func CompactMinFileSize

func CompactMinFileSize(fileSize int64) FileStoreOption

CompactMinFileSize is a FileStore option that defines the minimum file size below which compaction would not occur. Specify `-1` if you don't want any minimum.

func DoCRC added in v0.2.0

func DoCRC(enableCRC bool) FileStoreOption

DoCRC is a FileStore option that defines if a CRC checksum verification should be performed when records are read from disk.

type FileStoreOptions

type FileStoreOptions struct {
	// BufferSize is the size of the buffer used during store operations.
	BufferSize int

	// CompactEnabled allows to enable/disable files compaction.
	CompactEnabled bool

	// CompactInterval indicates the minimum interval (in seconds) between compactions.
	CompactInterval int

	// CompactFragmentation indicates the minimum ratio of fragmentation
	// to trigger compaction. For instance, 50 means that compaction
	// would not happen until fragmentation is more than 50%.
	CompactFragmentation int

	// CompactMinFileSize indicates the minimum file size before compaction
	// can be performed, regardless of the current file fragmentation.
	CompactMinFileSize int64

	// DoCRC enables (or disables) CRC checksum verification on read operations.
	DoCRC bool

	// CRCPoly is a polynomial used to make the table used in CRC computation.
	CRCPolynomial int64
}

FileStoreOptions can be used to customize a File Store

type FileSubStore

type FileSubStore struct {
	// contains filtered or unexported fields
}

FileSubStore is a subscription store in files.

func (*FileSubStore) AckSeqPending

func (ss *FileSubStore) AckSeqPending(subid, seqno uint64) error

AckSeqPending records that the given message seqno has been acknowledged by the given subscription.

func (*FileSubStore) AddSeqPending

func (ss *FileSubStore) AddSeqPending(subid, seqno uint64) error

AddSeqPending adds the given message seqno to the given subscription.

func (*FileSubStore) Close

func (ss *FileSubStore) Close() error

Close closes this store

func (*FileSubStore) CreateSub

func (ss *FileSubStore) CreateSub(sub *spb.SubState) error

CreateSub records a new subscription represented by SubState. On success, it returns an id that is used by the other methods.

func (*FileSubStore) DeleteSub

func (ss *FileSubStore) DeleteSub(subid uint64)

DeleteSub invalidates this subscription.

func (*FileSubStore) Flush added in v0.2.0

func (ss *FileSubStore) Flush() error

Flush persists buffered operations to disk.

func (*FileSubStore) UpdateSub

func (ss *FileSubStore) UpdateSub(sub *spb.SubState) error

UpdateSub updates a given subscription represented by SubState.

type MemoryMsgStore

type MemoryMsgStore struct {
	// contains filtered or unexported fields
}

MemoryMsgStore is a per channel message store in memory

func (*MemoryMsgStore) Close

func (gms *MemoryMsgStore) Close() error

Close closes this store.

func (*MemoryMsgStore) FirstAndLastSequence

func (gms *MemoryMsgStore) FirstAndLastSequence() (uint64, uint64)

FirstAndLastSequence returns sequences for the first and last messages stored.

func (*MemoryMsgStore) FirstMsg

func (gms *MemoryMsgStore) FirstMsg() *pb.MsgProto

FirstMsg returns the first message stored.

func (*MemoryMsgStore) FirstSequence

func (gms *MemoryMsgStore) FirstSequence() uint64

FirstSequence returns sequence for first message stored.

func (*MemoryMsgStore) Flush added in v0.2.0

func (gms *MemoryMsgStore) Flush() error

func (*MemoryMsgStore) GetSequenceFromTimestamp

func (gms *MemoryMsgStore) GetSequenceFromTimestamp(timestamp int64) uint64

GetSequenceFromTimestamp returns the sequence of the first message whose timestamp is greater or equal to given timestamp.

func (*MemoryMsgStore) LastMsg

func (gms *MemoryMsgStore) LastMsg() *pb.MsgProto

LastMsg returns the last message stored.

func (*MemoryMsgStore) LastSequence

func (gms *MemoryMsgStore) LastSequence() uint64

LastSequence returns sequence for last message stored.

func (*MemoryMsgStore) Lookup

func (gms *MemoryMsgStore) Lookup(seq uint64) *pb.MsgProto

Lookup returns the stored message with given sequence number.

func (*MemoryMsgStore) State

func (gms *MemoryMsgStore) State() (numMessages int, byteSize uint64, err error)

State returns some statistics related to this store

func (*MemoryMsgStore) Store

func (ms *MemoryMsgStore) Store(reply string, data []byte) (*pb.MsgProto, error)

Store a given message.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is a factory for message and subscription stores.

func NewMemoryStore

func NewMemoryStore(limits *ChannelLimits) (*MemoryStore, error)

NewMemoryStore returns a factory for stores held in memory. If not limits are provided, the store will be created with DefaultChannelLimits.

func (*MemoryStore) AddClient

func (gs *MemoryStore) AddClient(clientID, hbInbox string, userData interface{}) (*Client, bool, error)

AddClient stores information about the client identified by `clientID`.

func (*MemoryStore) Close

func (gs *MemoryStore) Close() error

Close closes all stores

func (*MemoryStore) CreateChannel

func (ms *MemoryStore) CreateChannel(channel string, userData interface{}) (*ChannelStore, bool, error)

CreateChannel creates a ChannelStore for the given channel, and returns `true` to indicate that the channel is new, false if it already exists.

func (*MemoryStore) DeleteClient

func (gs *MemoryStore) DeleteClient(clientID string) *Client

DeleteClient deletes the client identified by `clientID`.

func (*MemoryStore) GetClient

func (gs *MemoryStore) GetClient(clientID string) *Client

GetClient returns the stored Client, or nil if it does not exist.

func (*MemoryStore) GetClients

func (gs *MemoryStore) GetClients() map[string]*Client

GetClients returns all stored Client objects, as a map keyed by client IDs.

func (*MemoryStore) GetClientsCount

func (gs *MemoryStore) GetClientsCount() int

GetClientsCount returns the number of registered clients

func (*MemoryStore) HasChannel

func (gs *MemoryStore) HasChannel() bool

HasChannel returns true if this store has any channel

func (*MemoryStore) Init

func (gs *MemoryStore) Init(info *spb.ServerInfo) error

Init can be used to initialize the store with server's information.

func (*MemoryStore) LookupChannel

func (gs *MemoryStore) LookupChannel(channel string) *ChannelStore

LookupChannel returns a ChannelStore for the given channel.

func (*MemoryStore) MsgsState

func (gs *MemoryStore) MsgsState(channel string) (numMessages int, byteSize uint64, err error)

State returns message store statistics for a given channel ('*' for all)

func (*MemoryStore) Name

func (gs *MemoryStore) Name() string

Name returns the type name of this store

func (*MemoryStore) SetChannelLimits

func (gs *MemoryStore) SetChannelLimits(limits ChannelLimits)

SetChannelLimits sets the limit for the messages and subscriptions stores.

type MemorySubStore

type MemorySubStore struct {
	// contains filtered or unexported fields
}

MemorySubStore is a subscription store in memory

func (*MemorySubStore) AckSeqPending

func (*MemorySubStore) AckSeqPending(subid, seqno uint64) error

AckSeqPending records that the given message seqno has been acknowledged by the given subscription.

func (*MemorySubStore) AddSeqPending

func (*MemorySubStore) AddSeqPending(subid, seqno uint64) error

AddSeqPending adds the given message seqno to the given subscription.

func (*MemorySubStore) Close

func (gss *MemorySubStore) Close() error

Close closes this store

func (*MemorySubStore) CreateSub

func (gss *MemorySubStore) CreateSub(sub *spb.SubState) error

CreateSub records a new subscription represented by SubState. On success, it records the subscription's ID in SubState.ID. This ID is to be used by the other SubStore methods.

func (*MemorySubStore) DeleteSub

func (gss *MemorySubStore) DeleteSub(subid uint64)

DeleteSub invalidates this subscription.

func (*MemorySubStore) Flush added in v0.2.0

func (gss *MemorySubStore) Flush() error

Flush is for stores that may buffer operations and need them to be persisted.

func (*MemorySubStore) UpdateSub

func (gss *MemorySubStore) UpdateSub(sub *spb.SubState) error

UpdateSub updates a given subscription represented by SubState.

type MsgStore

type MsgStore interface {
	// State returns some statistics related to this store.
	State() (numMessages int, byteSize uint64, err error)

	// Store stores a message.
	Store(reply string, data []byte) (*pb.MsgProto, error)

	// Lookup returns the stored message with given sequence number.
	Lookup(seq uint64) *pb.MsgProto

	// FirstSequence returns sequence for first message stored, 0 if no
	// message is stored.
	FirstSequence() uint64

	// LastSequence returns sequence for last message stored, 0 if no
	// message is stored.
	LastSequence() uint64

	// FirstAndLastSequence returns sequences for the first and last messages stored,
	// 0 if no message is stored.
	FirstAndLastSequence() (uint64, uint64)

	// GetSequenceFromTimestamp returns the sequence of the first message whose
	// timestamp is greater or equal to given timestamp.
	GetSequenceFromTimestamp(timestamp int64) uint64

	// FirstMsg returns the first message stored.
	FirstMsg() *pb.MsgProto

	// LastMsg returns the last message stored.
	LastMsg() *pb.MsgProto

	// Flush is for stores that may buffer operations and need them to be persisted.
	Flush() error

	// Close closes the store.
	Close() error
}

MsgStore is the interface for storage of Messages on a given channel.

type PendingAcks

type PendingAcks map[uint64]*pb.MsgProto

PendingAcks is a map of messages waiting to be acknowledged, keyed by message sequence number.

type RecoveredState

type RecoveredState struct {
	Info    *spb.ServerInfo
	Clients []*Client
	Subs    RecoveredSubscriptions
}

RecoveredState allows the server to reconstruct its state after a restart.

type RecoveredSubState

type RecoveredSubState struct {
	Sub     *spb.SubState
	Pending PendingAcks
}

RecoveredSubState represents a recovered Subscription with a map of pending messages.

type RecoveredSubscriptions

type RecoveredSubscriptions map[string][]*RecoveredSubState

RecoveredSubscriptions is a map of recovered subscriptions, keyed by channel name.

type Store

type Store interface {
	// Init can be used to initialize the store with server's information.
	Init(info *spb.ServerInfo) error

	// Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc...).
	Name() string

	// SetChannelLimits sets limits per channel. The action is not expected
	// to be retroactive.
	SetChannelLimits(limits ChannelLimits)

	// CreateChannel creates a ChannelStore for the given channel, and returns
	// `true` to indicate that the channel is new, false if it already exists.
	CreateChannel(channel string, userData interface{}) (*ChannelStore, bool, error)

	// LookupChannel returns a ChannelStore for the given channel, nil if channel
	// does not exist.
	LookupChannel(channel string) *ChannelStore

	// HasChannel returns true if this store has any channel.
	HasChannel() bool

	// MsgsState returns message store statistics for a given channel, or all
	// if 'channel' is AllChannels.
	MsgsState(channel string) (numMessages int, byteSize uint64, err error)

	// AddClient stores information about the client identified by `clientID`.
	// If a Client is already registered, this call returns the currently
	// registered Client object, and the boolean set to false to indicate
	// that the client is not new.
	AddClient(clientID, hbInbox string, userData interface{}) (*Client, bool, error)

	// GetClient returns the stored Client, or nil if it does not exist.
	GetClient(clientID string) *Client

	// GetClients returns a map of all stored Client objects, keyed by client IDs.
	// The returned map is a copy of the state maintained by the store so that
	// it is safe for the caller to walk through the map while clients may be
	// added/deleted from the store.
	GetClients() map[string]*Client

	// GetClientsCount returns the number of registered clients.
	GetClientsCount() int

	// DeleteClient removes the client identified by `clientID` from the store
	// and returns it to the caller.
	DeleteClient(clientID string) *Client

	// Close closes all stores.
	Close() error
}

Store is the storage interface for STAN servers.

If an implementation has a Store constructor with ChannelLimits, it should be noted that the limits don't apply to any state being recovered, for Store implementations supporting recovery.

type SubStore

type SubStore interface {
	// CreateSub records a new subscription represented by SubState. On success,
	// it records the subscription's ID in SubState.ID. This ID is to be used
	// by the other SubStore methods.
	CreateSub(*spb.SubState) error

	// UpdateSub updates a given subscription represented by SubState.
	UpdateSub(*spb.SubState) error

	// DeleteSub invalidates the subscription 'subid'.
	DeleteSub(subid uint64)

	// AddSeqPending adds the given message 'seqno' to the subscription 'subid'.
	AddSeqPending(subid, seqno uint64) error

	// AckSeqPending records that the given message 'seqno' has been acknowledged
	// by the subscription 'subid'.
	AckSeqPending(subid, seqno uint64) error

	// Flush is for stores that may buffer operations and need them to be persisted.
	Flush() error

	// Close closes the subscriptions store.
	Close() error
}

SubStore is the interface for storage of Subscriptions on a given channel.

Implementations of this interface should not attempt to validate that a subscription is valid (that is, has not been deleted) when processing updates.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL