datastreamer

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2023 License: AGPL-3.0, AGPL-3.0-or-later Imports: 14 Imported by: 17

Documentation

Index

Constants

View Source
const (
	PtPadding = 0    // PtPadding is packet type for pad
	PtHeader  = 1    // PtHeader is packet type just for the header page
	PtData    = 2    // PtData is packet type for data entry
	PtResult  = 0xff // PtResult is packet type not stored/present in file (just for client command result)

	FixedSizeFileEntry   = 17 // FixedSizeFileEntry is the fixed size in bytes for a data file entry (1+4+4+8)
	FixedSizeResultEntry = 9  // FixedSizeResultEntry is the fixed size in bytes for a result entry (1+4+4)
)
View Source
const (

	// CmdStart for start TCP client command
	CmdStart Command = 1
	// CmdStop for stop TCP client command
	CmdStop Command = 2
	// CmdHeader for header TCP client command
	CmdHeader Command = 3

	// CmdErrOK for no error
	CmdErrOK CommandError = 0
	// CmdErrAlreadyStarted for client already started error
	CmdErrAlreadyStarted CommandError = 1
	// CmdErrAlreadyStopped for client already stopped error
	CmdErrAlreadyStopped CommandError = 2
	// CmdErrBadFromEntry for invalid starting entry number
	CmdErrBadFromEntry CommandError = 3
	// CmdErrInvalidCommand for invalid/unknown command error
	CmdErrInvalidCommand CommandError = 9
)

Variables

View Source
var (
	// StrClientStatus for client status description
	StrClientStatus = map[ClientStatus]string{
					// contains filtered or unexported fields
	}

	// StrCommand for TCP commands description
	StrCommand = map[Command]string{
		CmdStart:  "Start",
		CmdStop:   "Stop",
		CmdHeader: "Header",
	}

	// StrCommandErrors for TCP command errors description
	StrCommandErrors = map[CommandError]string{
		CmdErrOK:             "OK",
		CmdErrAlreadyStarted: "Already started",
		CmdErrAlreadyStopped: "Already stopped",
		CmdErrBadFromEntry:   "Bad from entry",
		CmdErrInvalidCommand: "Invalid command",
	}
)

Functions

func PrintResultEntry

func PrintResultEntry(e ResultEntry)

PrintResultEntry prints result entry type

Types

type AOStatus

type AOStatus uint64

AOStatus type for the atomic operation internal states

type ClientStatus

type ClientStatus uint64

ClientStatus type for the status of the client

type Command

type Command uint64

Command type for the TCP client commands

type CommandError

type CommandError uint32

CommandError type for the command responses

type Config

type Config struct {
	// Port to listen on
	Port uint16 `mapstructure:"Port"`
	// Filename of the binary data file
	Filename string `mapstructure:"Filename"`
	// Log
	Log log.Config `mapstructure:"Log"`
}

Config type for datastreamer server

type EntityDefinition

type EntityDefinition struct {
	Name       string
	StreamType StreamType
	Definition reflect.Type
}

EntityDefinition type to print entity events fields

type EntryType

type EntryType uint32

EntryType type for the entry event types

type FileEntry

type FileEntry struct {
	Length    uint32    // Length of the entry
	EntryType EntryType // e.g. 1:L2 block, 2:L2 tx,...
	EntryNum  uint64    // Entry number (sequential starting with 0)
	Data      []byte
	// contains filtered or unexported fields
}

FileEntry type for a data file entry

func DecodeBinaryToFileEntry

func DecodeBinaryToFileEntry(b []byte) (FileEntry, error)

DecodeBinaryToFileEntry decodes from binary bytes slice to file entry type

type HeaderEntry

type HeaderEntry struct {
	TotalLength  uint64 // Total bytes used in the file
	TotalEntries uint64 // Total number of data entries (entry type 2)
	// contains filtered or unexported fields
}

HeaderEntry type for a header entry

type ResultEntry

type ResultEntry struct {
	// contains filtered or unexported fields
}

ResultEntry type for a result entry

func DecodeBinaryToResultEntry

func DecodeBinaryToResultEntry(b []byte) (ResultEntry, error)

DecodeBinaryToResultEntry decodes from binary bytes slice to a result entry type

type StreamClient

type StreamClient struct {
	FromEntry uint64      // Set starting entry data (for Start command)
	Header    HeaderEntry // Header info received (from Header command)
	// contains filtered or unexported fields
}

StreamClient type to manage a data stream client

func NewClient

func NewClient(server string, streamType StreamType) (StreamClient, error)

NewClient creates a new data stream client

func (*StreamClient) ExecCommand

func (c *StreamClient) ExecCommand(cmd Command) error

ExecCommand executes a valid client TCP command

func (*StreamClient) SetEntriesDef added in v0.0.7

func (c *StreamClient) SetEntriesDef(entriesDef map[EntryType]EntityDefinition)

SetEntriesDef sets the event data fields definition

func (*StreamClient) Start

func (c *StreamClient) Start() error

Start connects to the data stream server and starts getting data from the server

type StreamFile

type StreamFile struct {
	// contains filtered or unexported fields
}

StreamFile type to manage a binary stream file

func PrepareStreamFile

func PrepareStreamFile(fn string, st StreamType) (StreamFile, error)

PrepareStreamFile creates stream file struct and opens or creates the stream binary data file

func (*StreamFile) AddFileEntry

func (f *StreamFile) AddFileEntry(e FileEntry) error

AddFileEntry writes new data entry to the data stream file

type StreamServer

type StreamServer struct {
	// contains filtered or unexported fields
}

StreamServer type to manage a data stream server

func New

func New(port uint16, streamType StreamType, fileName string, cfg *log.Config) (StreamServer, error)

New creates a new data stream server

func (*StreamServer) AddStreamEntry

func (s *StreamServer) AddStreamEntry(etype EntryType, data []byte) (uint64, error)

AddStreamEntry adds a new entry in the current atomic operation

func (*StreamServer) CommitAtomicOp

func (s *StreamServer) CommitAtomicOp() error

CommitAtomicOp commits the current atomic operation and streams it to the clients

func (*StreamServer) GetEntry added in v0.0.7

func (s *StreamServer) GetEntry(entryNum uint64) (FileEntry, error)

GetEntry searches in the stream file and returns the data for the requested entry

func (*StreamServer) GetHeader added in v0.0.7

func (s *StreamServer) GetHeader() HeaderEntry

GetHeader returns the current committed header

func (*StreamServer) RollbackAtomicOp

func (s *StreamServer) RollbackAtomicOp() error

RollbackAtomicOp cancels the current atomic operation and rollbacks the changes

func (*StreamServer) SetEntriesDef added in v0.0.7

func (s *StreamServer) SetEntriesDef(entriesDef map[EntryType]EntityDefinition)

SetEntriesDef sets the event data fields definition

func (*StreamServer) Start

func (s *StreamServer) Start() error

Start opens access to TCP clients and starts broadcasting

func (*StreamServer) StartAtomicOp

func (s *StreamServer) StartAtomicOp() error

StartAtomicOp starts a new atomic operation

type StreamType

type StreamType uint64

StreamType type for the stream types

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL