Documentation
¶
Index ¶
- Constants
- func GetStreamCatalogFilepath(dataDirectory string) string
- func NewStorageProvider(logger *zap.Logger, conf *config.Config) (storageprovider.IStorageProvider, error)
- type FileStorage
- func (s *FileStorage) BuildIndex(streamUUID types.StreamUUID) (interface{}, error)
- func (s *FileStorage) CreateDataDirectory() error
- func (s *FileStorage) CreateStreamDirectory(streamUUID types.StreamUUID) error
- func (s *FileStorage) CreateStreamsDirectory() error
- func (s *FileStorage) DeleteStream(streamUUID types.StreamUUID) error
- func (s *FileStorage) GenerateNewStreamUuid() types.StreamUUID
- func (s *FileStorage) GetDataDirectory() string
- func (s *FileStorage) GetMetaDataFilePath(streamUUID types.StreamUUID) string
- func (s *FileStorage) GetStreamDataFilePath(streamUUID types.StreamUUID) string
- func (s *FileStorage) GetStreamDirectoryPath(streamUUID types.StreamUUID) string
- func (s *FileStorage) GetStreamIndexFilePath(streamUUID types.StreamUUID) string
- func (s *FileStorage) GetStreamsDirectoryPath() string
- func (s *FileStorage) Init() error
- func (s *FileStorage) LoadStreamFromUUID(streamUUID types.StreamUUID) (*types.StreamInfo, error)
- func (s *FileStorage) LoadStreams() (types.StreamInfoList, error)
- func (s *FileStorage) LoadStreamsFromUUIDs(streamUUIDs types.StreamUUIDList) (types.StreamInfoList, error)
- func (s *FileStorage) NewStreamIteratorHandler(streamUUID types.StreamUUID, iteratorUUID types.StreamIteratorUUID) (types.IStreamIteratorHandler, error)
- func (s *FileStorage) NewStreamWriter(info *types.StreamInfo) (buffering.IStreamWriter, error)
- func (s *FileStorage) OnCreateStream(info *types.StreamInfo) error
- func (s *FileStorage) SaveStreamCatalog(streamUUIDs types.StreamUUIDList) error
- func (s *FileStorage) Stop() error
- func (s *FileStorage) StreamExists(streamUUID types.StreamUUID) bool
- type MsgOffset
- type StreamCatalogFile
- func (s *StreamCatalogFile) CatalogFileExists() bool
- func (s *StreamCatalogFile) CreateEmptyCatalogFile() error
- func (s *StreamCatalogFile) EnsureCatalogFileExists() error
- func (s *StreamCatalogFile) Init() error
- func (s *StreamCatalogFile) LoadStreamCatalog() (types.StreamUUIDList, error)
- func (s *StreamCatalogFile) SaveStreamCatalog(streamUUIDs types.StreamUUIDList) error
- func (s *StreamCatalogFile) Stop() error
- type StreamIndexFile
- func (idx *StreamIndexFile) BuildIndex(dataFilePath string) (*StreamIndexStats, error)
- func (idx *StreamIndexFile) Close() error
- func (idx *StreamIndexFile) GetOffsetAfterLastMessage() (MessageId, MsgOffset, error)
- func (idx *StreamIndexFile) GetOffsetAfterMessageId(messageId MessageId) (MessageId, MsgOffset, error)
- func (idx *StreamIndexFile) GetOffsetAtMessageId(messageId MessageId) (MessageId, MsgOffset, error)
- func (idx *StreamIndexFile) GetOffsetAtTimestamp(timestamp *time.Time) (MessageId, MsgOffset, error)
- func (idx *StreamIndexFile) GetOffsetFirstMessage() (MessageId, MsgOffset, error)
- func (idx *StreamIndexFile) GetOffsetLastMessage() (MessageId, MsgOffset, error)
- func (idx *StreamIndexFile) Log()
- type StreamIndexStats
- type StreamIteratorHandlerFile
- func (h *StreamIteratorHandlerFile) Close() error
- func (h *StreamIteratorHandlerFile) GetNextRecord() (types.MessageId, interface{}, bool, bool, error)
- func (h *StreamIteratorHandlerFile) Open() error
- func (h *StreamIteratorHandlerFile) SaveSeek() error
- func (h *StreamIteratorHandlerFile) Seek(request *types.StreamIteratorRequest) error
- type StreamWriterFile
Constants ¶
View Source
const EOLChar = '\n'
View Source
const STREAM_WRITER_FILE_STATE_CLOSED = 2
View Source
const STREAM_WRITER_FILE_STATE_NONE = 0
View Source
const STREAM_WRITER_FILE_STATE_OPENED = 1
Variables ¶
This section is empty.
Functions ¶
func NewStorageProvider ¶
func NewStorageProvider(logger *zap.Logger, conf *config.Config) (storageprovider.IStorageProvider, error)
Types ¶
type FileStorage ¶
type FileStorage struct {
// contains filtered or unexported fields
}
func (*FileStorage) BuildIndex ¶
func (s *FileStorage) BuildIndex(streamUUID types.StreamUUID) (interface{}, error)
func (*FileStorage) CreateDataDirectory ¶
func (s *FileStorage) CreateDataDirectory() error
func (*FileStorage) CreateStreamDirectory ¶
func (s *FileStorage) CreateStreamDirectory(streamUUID types.StreamUUID) error
func (*FileStorage) CreateStreamsDirectory ¶
func (s *FileStorage) CreateStreamsDirectory() error
func (*FileStorage) DeleteStream ¶
func (s *FileStorage) DeleteStream(streamUUID types.StreamUUID) error
func (*FileStorage) GenerateNewStreamUuid ¶
func (s *FileStorage) GenerateNewStreamUuid() types.StreamUUID
func (*FileStorage) GetDataDirectory ¶
func (s *FileStorage) GetDataDirectory() string
func (*FileStorage) GetMetaDataFilePath ¶
func (s *FileStorage) GetMetaDataFilePath(streamUUID types.StreamUUID) string
func (*FileStorage) GetStreamDataFilePath ¶
func (s *FileStorage) GetStreamDataFilePath(streamUUID types.StreamUUID) string
func (*FileStorage) GetStreamDirectoryPath ¶
func (s *FileStorage) GetStreamDirectoryPath(streamUUID types.StreamUUID) string
func (*FileStorage) GetStreamIndexFilePath ¶
func (s *FileStorage) GetStreamIndexFilePath(streamUUID types.StreamUUID) string
func (*FileStorage) GetStreamsDirectoryPath ¶
func (s *FileStorage) GetStreamsDirectoryPath() string
func (*FileStorage) Init ¶
func (s *FileStorage) Init() error
func (*FileStorage) LoadStreamFromUUID ¶
func (s *FileStorage) LoadStreamFromUUID(streamUUID types.StreamUUID) (*types.StreamInfo, error)
func (*FileStorage) LoadStreams ¶
func (s *FileStorage) LoadStreams() (types.StreamInfoList, error)
func (*FileStorage) LoadStreamsFromUUIDs ¶
func (s *FileStorage) LoadStreamsFromUUIDs(streamUUIDs types.StreamUUIDList) (types.StreamInfoList, error)
func (*FileStorage) NewStreamIteratorHandler ¶
func (s *FileStorage) NewStreamIteratorHandler(streamUUID types.StreamUUID, iteratorUUID types.StreamIteratorUUID) (types.IStreamIteratorHandler, error)
func (*FileStorage) NewStreamWriter ¶
func (s *FileStorage) NewStreamWriter(info *types.StreamInfo) (buffering.IStreamWriter, error)
func (*FileStorage) OnCreateStream ¶
func (s *FileStorage) OnCreateStream(info *types.StreamInfo) error
func (*FileStorage) SaveStreamCatalog ¶
func (s *FileStorage) SaveStreamCatalog(streamUUIDs types.StreamUUIDList) error
func (*FileStorage) Stop ¶
func (s *FileStorage) Stop() error
func (*FileStorage) StreamExists ¶
func (s *FileStorage) StreamExists(streamUUID types.StreamUUID) bool
type StreamCatalogFile ¶
type StreamCatalogFile struct {
// contains filtered or unexported fields
}
func NewStreamCatalogFile ¶
func NewStreamCatalogFile(logger *zap.Logger, filepath string) *StreamCatalogFile
func (*StreamCatalogFile) CatalogFileExists ¶
func (s *StreamCatalogFile) CatalogFileExists() bool
func (*StreamCatalogFile) CreateEmptyCatalogFile ¶
func (s *StreamCatalogFile) CreateEmptyCatalogFile() error
func (*StreamCatalogFile) EnsureCatalogFileExists ¶
func (s *StreamCatalogFile) EnsureCatalogFileExists() error
func (*StreamCatalogFile) Init ¶
func (s *StreamCatalogFile) Init() error
func (*StreamCatalogFile) LoadStreamCatalog ¶
func (s *StreamCatalogFile) LoadStreamCatalog() (types.StreamUUIDList, error)
func (*StreamCatalogFile) SaveStreamCatalog ¶
func (s *StreamCatalogFile) SaveStreamCatalog(streamUUIDs types.StreamUUIDList) error
func (*StreamCatalogFile) Stop ¶
func (s *StreamCatalogFile) Stop() error
type StreamIndexFile ¶
type StreamIndexFile struct {
// contains filtered or unexported fields
}
func NewStreamIndex ¶
func (*StreamIndexFile) BuildIndex ¶
func (idx *StreamIndexFile) BuildIndex(dataFilePath string) (*StreamIndexStats, error)
func (*StreamIndexFile) Close ¶
func (idx *StreamIndexFile) Close() error
func (*StreamIndexFile) GetOffsetAfterLastMessage ¶
func (idx *StreamIndexFile) GetOffsetAfterLastMessage() (MessageId, MsgOffset, error)
func (*StreamIndexFile) GetOffsetAfterMessageId ¶
func (idx *StreamIndexFile) GetOffsetAfterMessageId(messageId MessageId) (MessageId, MsgOffset, error)
func (*StreamIndexFile) GetOffsetAtMessageId ¶
func (idx *StreamIndexFile) GetOffsetAtMessageId(messageId MessageId) (MessageId, MsgOffset, error)
func (*StreamIndexFile) GetOffsetAtTimestamp ¶
func (idx *StreamIndexFile) GetOffsetAtTimestamp(timestamp *time.Time) (MessageId, MsgOffset, error)
func (*StreamIndexFile) GetOffsetFirstMessage ¶
func (idx *StreamIndexFile) GetOffsetFirstMessage() (MessageId, MsgOffset, error)
func (*StreamIndexFile) GetOffsetLastMessage ¶
func (idx *StreamIndexFile) GetOffsetLastMessage() (MessageId, MsgOffset, error)
func (*StreamIndexFile) Log ¶
func (idx *StreamIndexFile) Log()
type StreamIndexStats ¶
type StreamIteratorHandlerFile ¶
type StreamIteratorHandlerFile struct { FileOffset int64 // contains filtered or unexported fields }
func NewStreamIteratorHandlerFile ¶
func NewStreamIteratorHandlerFile(streamUUID types.StreamUUID, iteratorUUID types.StreamIteratorUUID, filename string, idx *StreamIndexFile, logger *zap.Logger) *StreamIteratorHandlerFile
func (*StreamIteratorHandlerFile) Close ¶
func (h *StreamIteratorHandlerFile) Close() error
func (*StreamIteratorHandlerFile) GetNextRecord ¶
func (*StreamIteratorHandlerFile) Open ¶
func (h *StreamIteratorHandlerFile) Open() error
func (*StreamIteratorHandlerFile) SaveSeek ¶
func (h *StreamIteratorHandlerFile) SaveSeek() error
func (*StreamIteratorHandlerFile) Seek ¶
func (h *StreamIteratorHandlerFile) Seek(request *types.StreamIteratorRequest) error
type StreamWriterFile ¶
type StreamWriterFile struct {
// contains filtered or unexported fields
}
func NewStreamWriterFile ¶
func NewStreamWriterFile(info *types.StreamInfo, fileDataPath string, fileIndexPath string, fileMetaInfoPath string, logger *zap.Logger, logVerbosity int) *StreamWriterFile
func (*StreamWriterFile) Close ¶
func (w *StreamWriterFile) Close() error
func (*StreamWriterFile) Init ¶
func (w *StreamWriterFile) Init() error
func (*StreamWriterFile) Open ¶
func (w *StreamWriterFile) Open() error
func (*StreamWriterFile) SaveFileMetaInfo ¶
func (w *StreamWriterFile) SaveFileMetaInfo() error
func (*StreamWriterFile) Write ¶
func (w *StreamWriterFile) Write(records *[]types.DeferedStreamRecord) error
Click to show internal directories.
Click to hide internal directories.