jsonfileprovider

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const EOLChar = '\n'
View Source
const STREAM_WRITER_FILE_STATE_CLOSED = 2
View Source
const STREAM_WRITER_FILE_STATE_NONE = 0
View Source
const STREAM_WRITER_FILE_STATE_OPENED = 1

Variables

This section is empty.

Functions

func GetStreamCatalogFilepath

func GetStreamCatalogFilepath(dataDirectory string) string

func NewStorageProvider

func NewStorageProvider(logger *zap.Logger, conf *config.Config) (storageprovider.IStorageProvider, error)

Types

type FileStorage

type FileStorage struct {
	// contains filtered or unexported fields
}

func (*FileStorage) BuildIndex

func (s *FileStorage) BuildIndex(streamUUID types.StreamUUID) (interface{}, error)

func (*FileStorage) CreateDataDirectory

func (s *FileStorage) CreateDataDirectory() error

func (*FileStorage) CreateStreamDirectory

func (s *FileStorage) CreateStreamDirectory(streamUUID types.StreamUUID) error

func (*FileStorage) CreateStreamsDirectory

func (s *FileStorage) CreateStreamsDirectory() error

func (*FileStorage) DeleteStream

func (s *FileStorage) DeleteStream(streamUUID types.StreamUUID) error

func (*FileStorage) GenerateNewStreamUuid

func (s *FileStorage) GenerateNewStreamUuid() types.StreamUUID

func (*FileStorage) GetDataDirectory

func (s *FileStorage) GetDataDirectory() string

func (*FileStorage) GetMetaDataFilePath

func (s *FileStorage) GetMetaDataFilePath(streamUUID types.StreamUUID) string

func (*FileStorage) GetStreamDataFilePath

func (s *FileStorage) GetStreamDataFilePath(streamUUID types.StreamUUID) string

func (*FileStorage) GetStreamDirectoryPath

func (s *FileStorage) GetStreamDirectoryPath(streamUUID types.StreamUUID) string

func (*FileStorage) GetStreamIndexFilePath

func (s *FileStorage) GetStreamIndexFilePath(streamUUID types.StreamUUID) string

func (*FileStorage) GetStreamsDirectoryPath

func (s *FileStorage) GetStreamsDirectoryPath() string

func (*FileStorage) Init

func (s *FileStorage) Init() error

func (*FileStorage) LoadStreamFromUUID

func (s *FileStorage) LoadStreamFromUUID(streamUUID types.StreamUUID) (*types.StreamInfo, error)

func (*FileStorage) LoadStreams

func (s *FileStorage) LoadStreams() (types.StreamInfoList, error)

func (*FileStorage) LoadStreamsFromUUIDs

func (s *FileStorage) LoadStreamsFromUUIDs(streamUUIDs types.StreamUUIDList) (types.StreamInfoList, error)

func (*FileStorage) NewStreamIteratorHandler

func (s *FileStorage) NewStreamIteratorHandler(streamUUID types.StreamUUID, iteratorUUID types.StreamIteratorUUID) (types.IStreamIteratorHandler, error)

func (*FileStorage) NewStreamWriter

func (s *FileStorage) NewStreamWriter(info *types.StreamInfo) (buffering.IStreamWriter, error)

func (*FileStorage) OnCreateStream

func (s *FileStorage) OnCreateStream(info *types.StreamInfo) error

func (*FileStorage) SaveStreamCatalog

func (s *FileStorage) SaveStreamCatalog(streamUUIDs types.StreamUUIDList) error

func (*FileStorage) Stop

func (s *FileStorage) Stop() error

func (*FileStorage) StreamExists

func (s *FileStorage) StreamExists(streamUUID types.StreamUUID) bool

type MsgOffset

type MsgOffset = int64

type StreamCatalogFile

type StreamCatalogFile struct {
	// contains filtered or unexported fields
}

func NewStreamCatalogFile

func NewStreamCatalogFile(logger *zap.Logger, filepath string) *StreamCatalogFile

func (*StreamCatalogFile) CatalogFileExists

func (s *StreamCatalogFile) CatalogFileExists() bool

func (*StreamCatalogFile) CreateEmptyCatalogFile

func (s *StreamCatalogFile) CreateEmptyCatalogFile() error

func (*StreamCatalogFile) EnsureCatalogFileExists

func (s *StreamCatalogFile) EnsureCatalogFileExists() error

func (*StreamCatalogFile) Init

func (s *StreamCatalogFile) Init() error

func (*StreamCatalogFile) LoadStreamCatalog

func (s *StreamCatalogFile) LoadStreamCatalog() (types.StreamUUIDList, error)

func (*StreamCatalogFile) SaveStreamCatalog

func (s *StreamCatalogFile) SaveStreamCatalog(streamUUIDs types.StreamUUIDList) error

func (*StreamCatalogFile) Stop

func (s *StreamCatalogFile) Stop() error

type StreamIndexFile

type StreamIndexFile struct {
	// contains filtered or unexported fields
}

func NewStreamIndex

func NewStreamIndex(streamUUID uuid.UUID, filename string, logger *zap.Logger) *StreamIndexFile

func (*StreamIndexFile) BuildIndex

func (idx *StreamIndexFile) BuildIndex(dataFilePath string) (*StreamIndexStats, error)

func (*StreamIndexFile) Close

func (idx *StreamIndexFile) Close() error

func (*StreamIndexFile) GetOffsetAfterLastMessage

func (idx *StreamIndexFile) GetOffsetAfterLastMessage() (MessageId, MsgOffset, error)

func (*StreamIndexFile) GetOffsetAfterMessageId

func (idx *StreamIndexFile) GetOffsetAfterMessageId(messageId MessageId) (MessageId, MsgOffset, error)

func (*StreamIndexFile) GetOffsetAtMessageId

func (idx *StreamIndexFile) GetOffsetAtMessageId(messageId MessageId) (MessageId, MsgOffset, error)

func (*StreamIndexFile) GetOffsetAtTimestamp

func (idx *StreamIndexFile) GetOffsetAtTimestamp(timestamp *time.Time) (MessageId, MsgOffset, error)

func (*StreamIndexFile) GetOffsetFirstMessage

func (idx *StreamIndexFile) GetOffsetFirstMessage() (MessageId, MsgOffset, error)

func (*StreamIndexFile) GetOffsetLastMessage

func (idx *StreamIndexFile) GetOffsetLastMessage() (MessageId, MsgOffset, error)

func (*StreamIndexFile) Log

func (idx *StreamIndexFile) Log()

type StreamIndexStats

type StreamIndexStats struct {
	CptMessages       int64
	FileSize          int64
	FirstMsgId        MessageId
	LastMsgId         MessageId
	FirstMsgTimestamp time.Time
	LastMsgTimestamp  time.Time
}

type StreamIteratorHandlerFile

type StreamIteratorHandlerFile struct {
	FileOffset int64
	// contains filtered or unexported fields
}

func NewStreamIteratorHandlerFile

func NewStreamIteratorHandlerFile(streamUUID types.StreamUUID, iteratorUUID types.StreamIteratorUUID, filename string, idx *StreamIndexFile, logger *zap.Logger) *StreamIteratorHandlerFile

func (*StreamIteratorHandlerFile) Close

func (h *StreamIteratorHandlerFile) Close() error

func (*StreamIteratorHandlerFile) GetNextRecord

func (h *StreamIteratorHandlerFile) GetNextRecord() (types.MessageId, interface{}, bool, bool, error)

func (*StreamIteratorHandlerFile) Open

func (h *StreamIteratorHandlerFile) Open() error

func (*StreamIteratorHandlerFile) SaveSeek

func (h *StreamIteratorHandlerFile) SaveSeek() error

func (*StreamIteratorHandlerFile) Seek

type StreamWriterFile

type StreamWriterFile struct {
	// contains filtered or unexported fields
}

func NewStreamWriterFile

func NewStreamWriterFile(info *types.StreamInfo, fileDataPath string, fileIndexPath string, fileMetaInfoPath string, logger *zap.Logger, logVerbosity int) *StreamWriterFile

func (*StreamWriterFile) Close

func (w *StreamWriterFile) Close() error

func (*StreamWriterFile) Init

func (w *StreamWriterFile) Init() error

func (*StreamWriterFile) Open

func (w *StreamWriterFile) Open() error

func (*StreamWriterFile) SaveFileMetaInfo

func (w *StreamWriterFile) SaveFileMetaInfo() error

func (*StreamWriterFile) Write

func (w *StreamWriterFile) Write(records *[]types.DeferedStreamRecord) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL