Documentation ¶
Index ¶
- Constants
- Variables
- func PrintResultEntry(e ResultEntry)
- type AOStatus
- type ClientStatus
- type Command
- type CommandError
- type Config
- type EntityDefinition
- type EntryType
- type FileEntry
- type HeaderEntry
- type ResultEntry
- type StreamClient
- type StreamFile
- type StreamServer
- func (s *StreamServer) AddStreamEntry(etype EntryType, data []byte) (uint64, error)
- func (s *StreamServer) CommitAtomicOp() error
- func (s *StreamServer) GetEntry(entryNum uint64) (FileEntry, error)
- func (s *StreamServer) GetHeader() HeaderEntry
- func (s *StreamServer) RollbackAtomicOp() error
- func (s *StreamServer) SetEntriesDef(entriesDef map[EntryType]EntityDefinition)
- func (s *StreamServer) Start() error
- func (s *StreamServer) StartAtomicOp() error
- type StreamType
Constants ¶
const ( PtPadding = 0 // PtPadding is packet type for pad PtHeader = 1 // PtHeader is packet type just for the header page PtData = 2 // PtData is packet type for data entry PtResult = 0xff // PtResult is packet type not stored/present in file (just for client command result) FixedSizeFileEntry = 17 // FixedSizeFileEntry is the fixed size in bytes for a data file entry (1+4+4+8) FixedSizeResultEntry = 9 // FixedSizeResultEntry is the fixed size in bytes for a result entry (1+4+4) )
const ( // CmdStart for start TCP client command CmdStart Command = 1 // CmdStop for stop TCP client command CmdStop Command = 2 // CmdHeader for header TCP client command CmdHeader Command = 3 // CmdErrOK for no error CmdErrOK CommandError = 0 // CmdErrAlreadyStarted for client already started error CmdErrAlreadyStarted CommandError = 1 // CmdErrAlreadyStopped for client already stopped error CmdErrAlreadyStopped CommandError = 2 // CmdErrBadFromEntry for invalid starting entry number CmdErrBadFromEntry CommandError = 3 // CmdErrInvalidCommand for invalid/unknown command error CmdErrInvalidCommand CommandError = 9 )
Variables ¶
var ( // StrClientStatus for client status description StrClientStatus = map[ClientStatus]string{ // contains filtered or unexported fields } // StrCommand for TCP commands description StrCommand = map[Command]string{ CmdStart: "Start", CmdStop: "Stop", CmdHeader: "Header", } // StrCommandErrors for TCP command errors description StrCommandErrors = map[CommandError]string{ CmdErrOK: "OK", CmdErrAlreadyStarted: "Already started", CmdErrAlreadyStopped: "Already stopped", CmdErrBadFromEntry: "Bad from entry", CmdErrInvalidCommand: "Invalid command", } )
Functions ¶
func PrintResultEntry ¶
func PrintResultEntry(e ResultEntry)
PrintResultEntry prints result entry type
Types ¶
type Config ¶
type Config struct { // Port to listen on Port uint16 `mapstructure:"Port"` // Filename of the binary data file Filename string `mapstructure:"Filename"` // Log Log log.Config `mapstructure:"Log"` }
Config type for datastreamer server
type EntityDefinition ¶
type EntityDefinition struct { Name string StreamType StreamType Definition reflect.Type }
EntityDefinition type to print entity events fields
type FileEntry ¶
type FileEntry struct { Length uint32 // Length of the entry EntryType EntryType // e.g. 1:L2 block, 2:L2 tx,... EntryNum uint64 // Entry number (sequential starting with 0) Data []byte // contains filtered or unexported fields }
FileEntry type for a data file entry
func DecodeBinaryToFileEntry ¶
DecodeBinaryToFileEntry decodes from binary bytes slice to file entry type
type HeaderEntry ¶
type HeaderEntry struct { TotalLength uint64 // Total bytes used in the file TotalEntries uint64 // Total number of data entries (entry type 2) // contains filtered or unexported fields }
HeaderEntry type for a header entry
type ResultEntry ¶
type ResultEntry struct {
// contains filtered or unexported fields
}
ResultEntry type for a result entry
func DecodeBinaryToResultEntry ¶
func DecodeBinaryToResultEntry(b []byte) (ResultEntry, error)
DecodeBinaryToResultEntry decodes from binary bytes slice to a result entry type
type StreamClient ¶
type StreamClient struct { FromEntry uint64 // Set starting entry data (for Start command) Header HeaderEntry // Header info received (from Header command) // contains filtered or unexported fields }
StreamClient type to manage a data stream client
func NewClient ¶
func NewClient(server string, streamType StreamType) (StreamClient, error)
NewClient creates a new data stream client
func (*StreamClient) ExecCommand ¶
func (c *StreamClient) ExecCommand(cmd Command) error
ExecCommand executes a valid client TCP command
func (*StreamClient) SetEntriesDef ¶ added in v0.0.7
func (c *StreamClient) SetEntriesDef(entriesDef map[EntryType]EntityDefinition)
SetEntriesDef sets the event data fields definition
func (*StreamClient) Start ¶
func (c *StreamClient) Start() error
Start connects to the data stream server and starts getting data from the server
type StreamFile ¶
type StreamFile struct {
// contains filtered or unexported fields
}
StreamFile type to manage a binary stream file
func PrepareStreamFile ¶
func PrepareStreamFile(fn string, st StreamType) (StreamFile, error)
PrepareStreamFile creates stream file struct and opens or creates the stream binary data file
func (*StreamFile) AddFileEntry ¶
func (f *StreamFile) AddFileEntry(e FileEntry) error
AddFileEntry writes new data entry to the data stream file
type StreamServer ¶
type StreamServer struct {
// contains filtered or unexported fields
}
StreamServer type to manage a data stream server
func New ¶
func New(port uint16, streamType StreamType, fileName string, cfg *log.Config) (StreamServer, error)
New creates a new data stream server
func (*StreamServer) AddStreamEntry ¶
func (s *StreamServer) AddStreamEntry(etype EntryType, data []byte) (uint64, error)
AddStreamEntry adds a new entry in the current atomic operation
func (*StreamServer) CommitAtomicOp ¶
func (s *StreamServer) CommitAtomicOp() error
CommitAtomicOp commits the current atomic operation and streams it to the clients
func (*StreamServer) GetEntry ¶ added in v0.0.7
func (s *StreamServer) GetEntry(entryNum uint64) (FileEntry, error)
GetEntry searches in the stream file and returns the data for the requested entry
func (*StreamServer) GetHeader ¶ added in v0.0.7
func (s *StreamServer) GetHeader() HeaderEntry
GetHeader returns the current committed header
func (*StreamServer) RollbackAtomicOp ¶
func (s *StreamServer) RollbackAtomicOp() error
RollbackAtomicOp cancels the current atomic operation and rollbacks the changes
func (*StreamServer) SetEntriesDef ¶ added in v0.0.7
func (s *StreamServer) SetEntriesDef(entriesDef map[EntryType]EntityDefinition)
SetEntriesDef sets the event data fields definition
func (*StreamServer) Start ¶
func (s *StreamServer) Start() error
Start opens access to TCP clients and starts broadcasting
func (*StreamServer) StartAtomicOp ¶
func (s *StreamServer) StartAtomicOp() error
StartAtomicOp starts a new atomic operation