Documentation ¶
Index ¶
- Constants
- Variables
- func PrintHeaderEntry(e HeaderEntry, title string)
- func PrintReceivedEntry(e *FileEntry, c *StreamClient, s *StreamServer) error
- func PrintResultEntry(e ResultEntry)
- func TimeoutWrite(client *client, data []byte, timeout time.Duration) (int, error)
- type AOStatus
- type ClientStatus
- type Command
- type CommandError
- type Config
- type EntryType
- type FileEntry
- type HeaderEntry
- type ProcessEntryFunc
- type ResultEntry
- type StreamBookmark
- type StreamClient
- func (c *StreamClient) ExecCommandGetBookmark(fromBookmark []byte) (FileEntry, error)
- func (c *StreamClient) ExecCommandGetEntry(fromEntry uint64) (FileEntry, error)
- func (c *StreamClient) ExecCommandGetHeader() (HeaderEntry, error)
- func (c *StreamClient) ExecCommandStart(fromEntry uint64) error
- func (c *StreamClient) ExecCommandStartBookmark(fromBookmark []byte) error
- func (c *StreamClient) ExecCommandStop() error
- func (c *StreamClient) GetFromStream() uint64
- func (c *StreamClient) GetTotalEntries() uint64
- func (c *StreamClient) IsStarted() bool
- func (c *StreamClient) ResetProcessEntryFunc()
- func (c *StreamClient) SetProcessEntryFunc(f ProcessEntryFunc)
- func (c *StreamClient) Start() error
- type StreamFile
- type StreamRelay
- type StreamServer
- func (s *StreamServer) AddStreamBookmark(bookmark []byte) (uint64, error)
- func (s *StreamServer) AddStreamEntry(etype EntryType, data []byte) (uint64, error)
- func (s *StreamServer) BookmarkPrintDump()
- func (s *StreamServer) CommitAtomicOp() error
- func (s *StreamServer) GetBookmark(bookmark []byte) (uint64, error)
- func (s *StreamServer) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error)
- func (s *StreamServer) GetEntry(entryNum uint64) (FileEntry, error)
- func (s *StreamServer) GetFirstEventAfterBookmark(bookmark []byte) (FileEntry, error)
- func (s *StreamServer) GetHeader() HeaderEntry
- func (s *StreamServer) RollbackAtomicOp() error
- func (s *StreamServer) Start() error
- func (s *StreamServer) StartAtomicOp() error
- func (s *StreamServer) TruncateFile(entryNum uint64) error
- func (s *StreamServer) UpdateEntryData(entryNum uint64, etype EntryType, data []byte) error
- type StreamType
Constants ¶
const ( PageHeaderSize = 4096 // PageHeaderSize is the size of header page (4 KB) PageDataSize = 1024 * 1024 // PageDataSize is the size of one data page (1 MB) PtPadding = 0 // PtPadding is packet type for pad PtHeader = 1 // PtHeader is packet type just for the header page PtData = 2 // PtData is packet type for data entry PtDataRsp = 0xfe // PtDataRsp is packet type for command response with data PtResult = 0xff // PtResult is packet type not stored/present in file (just for client command result) EtBookmark = 0xb0 // EtBookmark is entry type for bookmarks FixedSizeFileEntry = 17 // FixedSizeFileEntry is the fixed size in bytes for a data file entry (1+4+4+8) FixedSizeResultEntry = 9 // FixedSizeResultEntry is the fixed size in bytes for a result entry (1+4+4) )
const EntryTypeNotFound = math.MaxUint32
EntryTypeNotFound is the entry type value for CmdEntry/CmdBookmark when entry/bookmark not found
Variables ¶
var ( // ErrInvalidCommand is returned when the command is invalid ErrInvalidCommand = fmt.Errorf("invalid command") // ErrResultCommandError is returned when the command is invalid ErrResultCommandError = fmt.Errorf("result command error") // ErrNilConnection is returned when the connection is nil ErrNilConnection = fmt.Errorf("nil connection") // ErrReadingDataEntry is returned when there is an error reading data entry ErrReadingDataEntry = fmt.Errorf("error reading data entry") // ErrReadingResultEntry is returned when there is an error reading result entry ErrReadingResultEntry = fmt.Errorf("error reading result entry") // ErrGettingHeaderInfo is returned when there is an error getting header info ErrGettingHeaderInfo = fmt.Errorf("error getting header info") // ErrInvalidBinaryHeader is returned when the binary header is invalid ErrInvalidBinaryHeader = fmt.Errorf("invalid binary header info") // ErrInvalidFileMissingHeaderPage is returned when the file is invalid, missing header page ErrInvalidFileMissingHeaderPage = fmt.Errorf("invalid file, missing header page") // ErrBadFileSizeCutDataPage is returned when the file size is bad, cut data page ErrBadFileSizeCutDataPage = fmt.Errorf("bad file size, cut data page") // ErrBadFileFormat is returned when the file format is bad ErrBadFileFormat = fmt.Errorf("bad file format") // ErrInvalidHeaderBadPacketType is returned when the header is invalid, bad packet type ErrInvalidHeaderBadPacketType = fmt.Errorf("invalid header, bad packet type") // ErrInvalidHeaderBadHeaderLength is returned when the header is invalid, bad header length ErrInvalidHeaderBadHeaderLength = fmt.Errorf("invalid header, bad header length") // ErrInvalidHeaderBadStreamType is returned when the header is invalid, bad stream type ErrInvalidHeaderBadStreamType = fmt.Errorf("invalid header, bad stream type") // ErrInvalidBinaryEntry is returned when the binary entry is invalid ErrInvalidBinaryEntry = fmt.Errorf("invalid binary entry") // ErrDecodingBinaryDataEntry is returned when there is an error decoding binary data entry ErrDecodingBinaryDataEntry = fmt.Errorf("error decoding binary data entry") // ErrExpectingPacketTypeData is returned when there is an error expecting packet type data ErrExpectingPacketTypeData = fmt.Errorf("expecting packet type data") // ErrDecodingLengthDataEntry is returned when there is an error decoding length data entry ErrDecodingLengthDataEntry = fmt.Errorf("error decoding length data entry") // ErrPageNotStartingWithEntryData is returned when the page is not starting with entry data ErrPageNotStartingWithEntryData = fmt.Errorf("page not starting with entry data") // ErrCurrentPositionOutsideDataPage is returned when the current position is outside data page ErrCurrentPositionOutsideDataPage = fmt.Errorf("current position outside data page") // ErrEntryNotFound is returned when the entry is not found ErrEntryNotFound = fmt.Errorf("entry not found") // ErrInvalidEntryNumberNotCommittedInFile is returned when the entry number is invalid, not committed in the file ErrInvalidEntryNumberNotCommittedInFile = fmt.Errorf("invalid entry number, not committed in the file") // ErrEntryNumberMismatch is returned when the entry number doesn't match ErrEntryNumberMismatch = fmt.Errorf("entry number doesn't match") // ErrUpdateEntryTypeNotAllowed is returned when the update entry type is not allowed ErrUpdateEntryTypeNotAllowed = fmt.Errorf("update entry to a different entry type not allowed") // ErrUpdateEntryDifferentSize is returned when the update entry is a different size ErrUpdateEntryDifferentSize = fmt.Errorf("update entry to a different size not allowed") // ErrAtomicOpNotAllowed is returned when the atomic operation is not allowed ErrAtomicOpNotAllowed = fmt.Errorf("atomicop not allowed, server is not started") // ErrStartAtomicOpNotAllowed is returned when the start atomic operation is not allowed ErrStartAtomicOpNotAllowed = fmt.Errorf("start atomicop not allowed, atomicop already started") // ErrAddEntryNotAllowed is returned when the add entry is not allowed ErrAddEntryNotAllowed = fmt.Errorf("add entry not allowed, atomicop is not started") // ErrCommitNotAllowed is returned when the commit is not allowed ErrCommitNotAllowed = fmt.Errorf("commit not allowed, atomicop not in started state") // ErrRollbackNotAllowed is returned when the rollback is not allowed ErrRollbackNotAllowed = fmt.Errorf("rollback not allowed, atomicop not in started state") // ErrInvalidEntryNumber is returned when the entry number is invalid ErrInvalidEntryNumber = fmt.Errorf("invalid entry number, doesn't exist") // ErrUpdateNotAllowed is returned when the update is not allowed ErrUpdateNotAllowed = fmt.Errorf("update not allowed, it's in current atomic operation") // ErrClientAlreadyStarted is returned when the client is already started ErrClientAlreadyStarted = fmt.Errorf("client already started") // ErrClientAlreadyStopped is returned when the client is already stopped ErrClientAlreadyStopped = fmt.Errorf("client already stopped") // ErrHeaderCommandNotAllowed is returned when the header command is not allowed ErrHeaderCommandNotAllowed = fmt.Errorf("header command not allowed") // ErrEntryCommandNotAllowed is returned when the entry command is not allowed ErrEntryCommandNotAllowed = fmt.Errorf("entry command not allowed") // ErrStartCommandInvalidParamFromEntry is returned when the start command is invalid, param from entry ErrStartCommandInvalidParamFromEntry = fmt.Errorf("start command invalid param from entry") // ErrStartBookmarkInvalidParamFromBookmark is returned when the start bookmark is invalid, param from bookmark ErrStartBookmarkInvalidParamFromBookmark = fmt.Errorf("start bookmark invalid param from bookmark") // ErrInvalidBinaryResultEntry is returned when the binary result entry is invalid ErrInvalidBinaryResultEntry = fmt.Errorf("invalid binary result entry") // ErrDecodingBinaryResultEntry is returned when there is an error decoding binary result entry ErrDecodingBinaryResultEntry = fmt.Errorf("error decoding binary result entry") // ErrTruncateNotAllowed is returned when there is an atomic operation in progress ErrTruncateNotAllowed = fmt.Errorf("truncate not allowed, atomic operation in progress") // ErrBookmarkCommandNotAllowed is returned when the bookmark command is not allowed ErrBookmarkCommandNotAllowed = fmt.Errorf("bookmark command not allowed") // ErrExecCommandNotAllowed is returned when execute TCP command is not allowed ErrExecCommandNotAllowed = fmt.Errorf("execute command not allowed, client is not started") // ErrBookmarkNotFound is returned when the bookmark is not found ErrBookmarkNotFound = fmt.Errorf("bookmark not found") // ErrBookmarkMaxLength is returned when the bookmark length exceeds maximum length ErrBookmarkMaxLength = fmt.Errorf("bookmark max length") // ErrInvalidBookmarkRange is returned when the bookmark range is invalid ErrInvalidBookmarkRange = fmt.Errorf("invalid bookmark range") )
var ( // StrClientStatus for client status description StrClientStatus = map[ClientStatus]string{ // contains filtered or unexported fields } // StrCommand for TCP commands description StrCommand = map[Command]string{ CmdStart: "Start", CmdStop: "Stop", CmdHeader: "Header", CmdStartBookmark: "StartBookmark", CmdEntry: "Entry", CmdBookmark: "Bookmark", } // StrCommandErrors for TCP command errors description StrCommandErrors = map[CommandError]string{ CmdErrOK: "OK", CmdErrAlreadyStarted: "Already started", CmdErrAlreadyStopped: "Already stopped", CmdErrBadFromEntry: "Bad from entry", CmdErrBadFromBookmark: "Bad from bookmark", CmdErrInvalidCommand: "Invalid command", } )
Functions ¶
func PrintHeaderEntry ¶ added in v0.1.9
func PrintHeaderEntry(e HeaderEntry, title string)
PrintHeaderEntry prints file header information
func PrintReceivedEntry ¶ added in v0.1.0
func PrintReceivedEntry(e *FileEntry, c *StreamClient, s *StreamServer) error
PrintReceivedEntry prints received entry (default callback function)
func PrintResultEntry ¶
func PrintResultEntry(e ResultEntry)
PrintResultEntry prints result entry type
Types ¶
type Command ¶
type Command uint64
Command type for the TCP client commands
const ( CmdStart Command = iota + 1 // CmdStart for the start from entry TCP client command CmdStop // CmdStop for the stop TCP client command CmdHeader // CmdHeader for the header TCP client command CmdStartBookmark // CmdStartBookmark for the start from bookmark TCP client command CmdEntry // CmdEntry for the get entry TCP client command CmdBookmark // CmdBookmark for the get bookmark TCP client command )
func (Command) IsACommand ¶ added in v0.1.4
IsACommand checks if a command is a valid command
type CommandError ¶
type CommandError uint32
CommandError type for the command responses
const ( CmdErrOK CommandError = iota // CmdErrOK for no error CmdErrAlreadyStarted // CmdErrAlreadyStarted for client already started error CmdErrAlreadyStopped // CmdErrAlreadyStopped for client already stopped error CmdErrBadFromEntry // CmdErrBadFromEntry for invalid starting entry number CmdErrBadFromBookmark // CmdErrBadFromBookmark for invalid starting bookmark CmdErrInvalidCommand CommandError = 9 // CmdErrInvalidCommand for invalid/unknown command error )
type Config ¶
type Config struct { // Port to listen on Port uint16 `mapstructure:"Port"` // Filename of the binary data file Filename string `mapstructure:"Filename"` // WriteTimeout for write opeations on client connections WriteTimeout time.Duration // InactivityTimeout is the timeout to kill an inactive client connection InactivityTimeout time.Duration // InactivityCheckInterval is the time interval to check for // client connections that have reached the inactivity timeout to kill them InactivityCheckInterval time.Duration // Log Log log.Config `mapstructure:"Log"` }
Config type for datastreamer server
type FileEntry ¶
type FileEntry struct { Length uint32 // Total length of the entry (17 bytes + length(data)) Type EntryType // 0xb0:Bookmark, 1:Event1, 2:Event2,... Number uint64 // Entry number (sequential starting with 0) Data []byte // contains filtered or unexported fields }
FileEntry type for a data file entry
func DecodeBinaryToFileEntry ¶
DecodeBinaryToFileEntry decodes from binary bytes slice to file entry type
type HeaderEntry ¶
type HeaderEntry struct { Version uint8 // Stream file version SystemID uint64 // System identifier (e.g. ChainID) TotalLength uint64 // Total bytes used in the file TotalEntries uint64 // Total number of data entries (packet type PtData) // contains filtered or unexported fields }
HeaderEntry type for a header entry
type ProcessEntryFunc ¶ added in v0.1.0
type ProcessEntryFunc func(*FileEntry, *StreamClient, *StreamServer) error
ProcessEntryFunc type of the callback function to process the received entry
type ResultEntry ¶
type ResultEntry struct {
// contains filtered or unexported fields
}
ResultEntry type for a result entry
func DecodeBinaryToResultEntry ¶
func DecodeBinaryToResultEntry(b []byte) (ResultEntry, error)
DecodeBinaryToResultEntry decodes from binary bytes slice to a result entry type
type StreamBookmark ¶ added in v0.0.9
type StreamBookmark struct {
// contains filtered or unexported fields
}
StreamBookmark type to manage index of bookmarks
func NewBookmark ¶ added in v0.1.9
func NewBookmark(fn string) (*StreamBookmark, error)
NewBookmark creates bookmark struct and opens or creates the bookmark database
func (*StreamBookmark) AddBookmark ¶ added in v0.0.9
func (b *StreamBookmark) AddBookmark(bookmark []byte, entryNum uint64) error
AddBookmark inserts or updates a bookmark
func (*StreamBookmark) GetBookmark ¶ added in v0.0.9
func (b *StreamBookmark) GetBookmark(bookmark []byte) (uint64, error)
GetBookmark gets a bookmark value
func (*StreamBookmark) PrintDump ¶ added in v0.0.9
func (b *StreamBookmark) PrintDump() error
PrintDump prints all bookmarks stored in the database
type StreamClient ¶
type StreamClient struct { ID string // Client id // contains filtered or unexported fields }
StreamClient type to manage a data stream client
func NewClient ¶
func NewClient(server string, streamType StreamType) (*StreamClient, error)
NewClient creates a new data stream client
func NewClientWithLogsConfig ¶ added in v0.2.1
func NewClientWithLogsConfig(server string, streamType StreamType, logsConfig log.Config) (*StreamClient, error)
NewClientWithLogsConfig creates a new data stream client with logs configuration
func (*StreamClient) ExecCommandGetBookmark ¶ added in v0.2.0
func (c *StreamClient) ExecCommandGetBookmark(fromBookmark []byte) (FileEntry, error)
ExecCommandGetBookmark executes client TCP command to get a bookmark
func (*StreamClient) ExecCommandGetEntry ¶ added in v0.2.0
func (c *StreamClient) ExecCommandGetEntry(fromEntry uint64) (FileEntry, error)
ExecCommandGetEntry executes client TCP command to get an entry
func (*StreamClient) ExecCommandGetHeader ¶ added in v0.2.0
func (c *StreamClient) ExecCommandGetHeader() (HeaderEntry, error)
ExecCommandGetHeader executes client TCP command to get the header
func (*StreamClient) ExecCommandStart ¶ added in v0.2.0
func (c *StreamClient) ExecCommandStart(fromEntry uint64) error
ExecCommandStart executes client TCP command to start streaming from entry
func (*StreamClient) ExecCommandStartBookmark ¶ added in v0.2.0
func (c *StreamClient) ExecCommandStartBookmark(fromBookmark []byte) error
ExecCommandStartBookmark executes client TCP command to start streaming from bookmark
func (*StreamClient) ExecCommandStop ¶ added in v0.2.0
func (c *StreamClient) ExecCommandStop() error
ExecCommandStop executes client TCP command to stop streaming
func (*StreamClient) GetFromStream ¶ added in v0.2.0
func (c *StreamClient) GetFromStream() uint64
GetFromStream returns streaming start entry number from the latest start command executed
func (*StreamClient) GetTotalEntries ¶ added in v0.2.0
func (c *StreamClient) GetTotalEntries() uint64
GetTotalEntries returns total entries number from the latest header command executed
func (*StreamClient) IsStarted ¶ added in v0.2.6
func (c *StreamClient) IsStarted() bool
IsStarted returns if the client is started
func (*StreamClient) ResetProcessEntryFunc ¶ added in v0.2.6
func (c *StreamClient) ResetProcessEntryFunc()
ResetProcessEntryFunc resets the callback function to the default one
func (*StreamClient) SetProcessEntryFunc ¶ added in v0.1.0
func (c *StreamClient) SetProcessEntryFunc(f ProcessEntryFunc)
SetProcessEntryFunc sets the callback function to process entry
func (*StreamClient) Start ¶
func (c *StreamClient) Start() error
Start connects to the data stream server and starts getting data from the server
type StreamFile ¶
type StreamFile struct {
// contains filtered or unexported fields
}
StreamFile type to manage a binary stream file
func NewStreamFile ¶ added in v0.1.9
func NewStreamFile(fn string, version uint8, systemID uint64, st StreamType) (*StreamFile, error)
NewStreamFile creates stream file struct and opens or creates the stream binary data file
func (*StreamFile) AddFileEntry ¶
func (f *StreamFile) AddFileEntry(e FileEntry) error
AddFileEntry writes new data entry to the data stream file
type StreamRelay ¶ added in v0.1.0
type StreamRelay struct {
// contains filtered or unexported fields
}
StreamRelay type to manage a data stream relay
func NewRelay ¶ added in v0.1.0
func NewRelay(server string, port uint16, version uint8, systemID uint64, streamType StreamType, fileName string, writeTimeout time.Duration, inactivityTimeout time.Duration, inactivityCheckInterval time.Duration, cfg *log.Config) (*StreamRelay, error)
NewRelay creates a new data stream relay
func (*StreamRelay) Start ¶ added in v0.1.0
func (r *StreamRelay) Start() error
Start connects and syncs with master server then opens access to relay clients
type StreamServer ¶
type StreamServer struct {
// contains filtered or unexported fields
}
StreamServer type to manage a data stream server
func NewServer ¶ added in v0.1.0
func NewServer(port uint16, version uint8, systemID uint64, streamType StreamType, fileName string, writeTimeout time.Duration, inactivityTimeout time.Duration, inactivityCheckInterval time.Duration, cfg *log.Config) (*StreamServer, error)
NewServer creates a new data stream server
func (*StreamServer) AddStreamBookmark ¶ added in v0.0.9
func (s *StreamServer) AddStreamBookmark(bookmark []byte) (uint64, error)
AddStreamBookmark adds a new bookmark in the current atomic operation
func (*StreamServer) AddStreamEntry ¶
func (s *StreamServer) AddStreamEntry(etype EntryType, data []byte) (uint64, error)
AddStreamEntry adds a new entry in the current atomic operation
func (*StreamServer) BookmarkPrintDump ¶ added in v0.0.9
func (s *StreamServer) BookmarkPrintDump()
BookmarkPrintDump prints all bookmarks
func (*StreamServer) CommitAtomicOp ¶
func (s *StreamServer) CommitAtomicOp() error
CommitAtomicOp commits the current atomic operation and streams it to the clients
func (*StreamServer) GetBookmark ¶ added in v0.0.9
func (s *StreamServer) GetBookmark(bookmark []byte) (uint64, error)
GetBookmark returns the entry number pointed by the bookmark
func (*StreamServer) GetDataBetweenBookmarks ¶ added in v0.1.19
func (s *StreamServer) GetDataBetweenBookmarks(bookmarkFrom, bookmarkTo []byte) ([]byte, error)
GetDataBetweenBookmarks returns the data between two bookmarks
func (*StreamServer) GetEntry ¶ added in v0.0.7
func (s *StreamServer) GetEntry(entryNum uint64) (FileEntry, error)
GetEntry searches in the stream file and returns the data for the requested entry
func (*StreamServer) GetFirstEventAfterBookmark ¶ added in v0.0.9
func (s *StreamServer) GetFirstEventAfterBookmark(bookmark []byte) (FileEntry, error)
GetFirstEventAfterBookmark searches in the stream file by bookmark and returns the first event entry data
func (*StreamServer) GetHeader ¶ added in v0.0.7
func (s *StreamServer) GetHeader() HeaderEntry
GetHeader returns the current committed header
func (*StreamServer) RollbackAtomicOp ¶
func (s *StreamServer) RollbackAtomicOp() error
RollbackAtomicOp cancels the current atomic operation and rollbacks the changes
func (*StreamServer) Start ¶
func (s *StreamServer) Start() error
Start opens access to TCP clients and starts broadcasting
func (*StreamServer) StartAtomicOp ¶
func (s *StreamServer) StartAtomicOp() error
StartAtomicOp starts a new atomic operation
func (*StreamServer) TruncateFile ¶ added in v0.1.4
func (s *StreamServer) TruncateFile(entryNum uint64) error
TruncateFile truncates stream data file from an entry number onwards
func (*StreamServer) UpdateEntryData ¶ added in v0.1.4
func (s *StreamServer) UpdateEntryData(entryNum uint64, etype EntryType, data []byte) error
UpdateEntryData updates the internal data of an entry