sbt

package
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2023 License: MIT Imports: 16 Imported by: 0

README

SBT

Serial Binary Table is a configurable binary format for storing tabular data. It is designed to be fast and efficient to read and write, considering much smaller storage usage compared to CSV or JSON.

Features

  • Fast and efficient to read and write
  • Concrete types and static binary format
  • Easy to handle
  • Easy to iterate over contents
  • Split file and auto compression support for huge collections (e.g. orderbook data)
  • WIP Indexing for fast lookups

Limitations

  • Thread safety
  • No support for dynamic types, columns have fixed size
  • No support for advanced querying like SQL
  • Little endian only (for now)

Usage

You must define the data type of the table before you can use it, implementing the Row interface:

package main

type TestRow struct {
	Symbol string
	Price  uint32
}

func (h *TestRow) Factory() sbt.Row {
	return new(TestRow)
}

func (h *TestRow) Columns() sbt.RowSpec {
	return sbt.NewRowSpec(
		sbt.ColumnTypeString.New("name", 8), // 8 bytes for the fixed size string
		sbt.ColumnTypeUInt32.New("value"),
	)
}

func (h *TestRow) Encode(ctx *sbt.Encoder) error {
	ctx.EncodeStringPadded(h.Symbol, 8)
	ctx.EncodeUInt32(h.Price)

	return nil
}

func (h *TestRow) Decode(ctx *sbt.Decoder) error {
	h.Symbol = ctx.DecodeStringPadded(8)
	h.Price = ctx.DecodeUInt32()

	return nil
}
Single file

To load an SBT file (will be created if it doesn't exist):

b, err := sbt.Load[*TestRow, TestRow]("test.sbt")

if err != nil {
    panic(err)
}

log.Printf("file size: %v bytes | num rows: %v", b.Size(), b.NumRows())

if err = b.Close(); err != nil {
    panic(err)
}

You can use any of sbt.Open, sbt.OpenRead, sbt.Create or sbt.Load for opening a file based on your need.

To iterate over the contents of the file:

// create a new iterator
it := b.Iter()
// it.Close can be used to stop the iteration early within the loop
defer it.Close()

for item := range it.Next() {
    // item is a tuple[int64, *TestRow]
    // use item.Value() to access the row
}

if err := it.Error(); err != nil {
    panic(err)
}
Multiple files (split file)

You can use the MultiContainer to store multiple SBT files in a single directory, all with the same format. This is useful for storing huge collections of data, e.g. orderbook data in multiple continuous files which can be automatically compressed by the MultiContainer on predefined intervals.

mc, err := NewMultiContainer[*TestRow, TestRow](".", "testprefix",
    WithLog(log.Default), // allow logging to stdout
    WithCompressionScheduler(10), // enable file split and compression every 10 seconds
)
if err != nil {
    panic(err)
}

defer func() {
    if err := mc.Close(); err != nil {
        panic(err)
    }
}()

Documentation

Index

Constants

View Source
const (
	Bucket10   int64 = 10
	Bucket100  int64 = 100
	Bucket1k   int64 = 1000
	Bucket10k  int64 = 10000
	Bucket100k int64 = 100000
	Bucket1m   int64 = 1000000
	Bucket10m  int64 = 10000000
)
View Source
const MagicNumber uint16 = 0x5B70

Variables

View Source
var ErrClosed = errors.New("use of closed BulkIO")

Functions

func ByteToStringPadded

func ByteToStringPadded(b []byte) string

ByteToStringPadded converts a null padded byte slice to a string.

func StringToBytePadded

func StringToBytePadded(s string, length int) []byte

StringToBytePadded converts a string to a byte slice with a fixed length. If the string is longer than the length, it will be truncated. If the string is shorter than the length, it will be padded with NULL.

Types

type BulkAppendContext

type BulkAppendContext[P generics.Ptr[RT], RT any] struct {
	// contains filtered or unexported fields
}

func NewBulkAppendContext

func NewBulkAppendContext[P generics.Ptr[RT], RT any](bucketSize int64) *BulkAppendContext[P, RT]

func (*BulkAppendContext[P, RT]) Append

func (w *BulkAppendContext[P, RT]) Append(c *Container[P, RT], row P) error

Append will append a row to the bucket.

func (*BulkAppendContext[P, RT]) Close

func (w *BulkAppendContext[P, RT]) Close(c *Container[P, RT]) error

Close will ensure all remaining rows are added.

type BulkIO

type BulkIO[P generics.Ptr[RT], RT any] interface {
	Close(c *Container[P, RT]) error
}

type Column

type Column struct {
	Name string     `json:"name"`
	Type ColumnType `json:"type"`
	Size uint8      `json:"size"`
}

func NewColumn

func NewColumn(name string, typ ColumnType, size ...uint8) (c Column)

NewColumn creates a new column.

If size is not specified, it will be calculated based on the type.

type ColumnPrinter

type ColumnPrinter[P generics.Ptr[RowType], RowType any] func(row P) []any

type ColumnType

type ColumnType string
const (
	ColumnTypeString  ColumnType = "str"
	ColumnTypeBinary  ColumnType = "bin"
	ColumnTypeBool    ColumnType = "bool"
	ColumnTypeInt8    ColumnType = "i8"
	ColumnTypeInt16   ColumnType = "i16"
	ColumnTypeInt32   ColumnType = "i32"
	ColumnTypeInt64   ColumnType = "i64"
	ColumnTypeUInt8   ColumnType = "u8"
	ColumnTypeUInt16  ColumnType = "u16"
	ColumnTypeUInt32  ColumnType = "u32"
	ColumnTypeUInt64  ColumnType = "u64"
	ColumnTypeFloat32 ColumnType = "f32"
	ColumnTypeFloat64 ColumnType = "f64"
)

func (ColumnType) New

func (c ColumnType) New(name string, size ...uint8) Column

type Container

type Container[P generics.Ptr[RowType], RowType any] struct {
	// contains filtered or unexported fields
}

Container is a serial binary table data type.

The format contains a header and rows of contents.

Header defines the data types and column formats.

Contents are rows of specified data types with predefined columns and sizes.

It's useful for storing typed streams of data fast and efficiently.

It's not thread-safe.

func Create

func Create[P generics.Ptr[RowType], RowType any](
	filename string,
) (b *Container[P, RowType], err error)

Create creates a Container file.

func Load

func Load[P generics.Ptr[RowType], RowType any](
	filename string,
) (b *Container[P, RowType], err error)

Load opens or creates a Container file.

func Open

func Open[P generics.Ptr[RowType], RowType any](
	filename string,
) (b *Container[P, RowType], err error)

Open opens a Container file.

func OpenRead

func OpenRead[P generics.Ptr[RowType], RowType any](
	filename string,
) (b *Container[P, RowType], err error)

OpenRead opens a Container file for reading.

func (*Container[P, RowType]) Append

func (c *Container[P, RowType]) Append(row P) (err error)

Append appends a row to the Container file.

func (*Container[P, RowType]) AsIterable

func (c *Container[P, RowType]) AsIterable() generics.Iterable[containers.Tuple[int64, P]]

func (*Container[P, RowType]) BulkAppend

func (c *Container[P, RowType]) BulkAppend(rows []P) (err error)

BulkAppend appends a bulk of rows to the Container file.

func (*Container[P, RowType]) BulkRead

func (c *Container[P, RowType]) BulkRead(pos int64, rows []P) (n int64, err error)

BulkRead reads a bulk of rows at a specified position.

rows is a slice of rows to read into. The length of the slice is the number of rows to read.

Use NumRows and pos 0 to read all rows, considering memory constraints, otherwise use Iter.

returns the number of rows read and an error.

func (*Container[P, RowType]) BulkSet

func (c *Container[P, RowType]) BulkSet(index int64, rows []P) (err error)

BulkSet sets a bulk of rows at the given index.

func (*Container[P, RowType]) Close

func (c *Container[P, RowType]) Close() (err error)

Close closes the Container file.

func (*Container[P, RowType]) Filename

func (c *Container[P, RowType]) Filename() string

Filename returns the filename of the Container file.

func (*Container[P, RowType]) Header

func (c *Container[P, RowType]) Header() RowSpec

Header returns the header of the Container file.

func (*Container[P, RowType]) Iter

func (c *Container[P, RowType]) Iter() *generics.Iterator[containers.Tuple[int64, P]]

func (*Container[P, RowType]) IterBucketSize

func (c *Container[P, RowType]) IterBucketSize(bucketSize int64) *generics.Iterator[containers.Tuple[int64, P]]

func (*Container[P, RowType]) IterHandler

func (c *Container[P, RowType]) IterHandler(iter *generics.Iterator[containers.Tuple[int64, P]])

func (*Container[P, RowType]) NumRows

func (c *Container[P, RowType]) NumRows() int64

NumRows returns the number of rows.

func (*Container[P, RowType]) Print

func (c *Container[P, RowType]) Print(
	out io.Writer,
	start, count int64,
	pf ColumnPrinter[P, RowType],
) error

Print prints the rows in the Container file to the specified writer.

func (*Container[P, RowType]) ReadAt

func (c *Container[P, RowType]) ReadAt(pos int64, row P) (err error)

ReadAt reads a row at a specified position.

func (*Container[P, RowType]) SeekContent

func (c *Container[P, RowType]) SeekContent() (err error)

SeekContent seeks to the content section of the Container file.

func (*Container[P, RowType]) SeekEnd

func (c *Container[P, RowType]) SeekEnd() (err error)

SeekEnd seeks to the end of the Container file.

func (*Container[P, RowType]) Set

func (c *Container[P, RowType]) Set(row P, index int64) (err error)

Set sets a row at the given index.

func (*Container[P, RowType]) Size

func (c *Container[P, RowType]) Size() int64

Size returns file size

func (*Container[P, RowType]) Version

func (c *Container[P, RowType]) Version() uint8

Version returns the flags of the Container file.

type Decoder

type Decoder struct {
	RowSerializerBase
}

Decoder is passed to Row.Decode as the encoding context and helper.

func NewDecoder

func NewDecoder(buffer []byte) *Decoder

NewDecoder

func (*Decoder) DecodeBool

func (d *Decoder) DecodeBool() bool

DecodeBool

func (*Decoder) DecodeBytes

func (d *Decoder) DecodeBytes(size int) []byte

DecodeBytes

func (*Decoder) DecodeFloat32

func (d *Decoder) DecodeFloat32() float32

DecodeFloat32

func (*Decoder) DecodeFloat64

func (d *Decoder) DecodeFloat64() float64

DecodeFloat64

func (*Decoder) DecodeInt16

func (d *Decoder) DecodeInt16() int16

DecodeInt16

func (*Decoder) DecodeInt32

func (d *Decoder) DecodeInt32() int32

DecodeInt32

func (*Decoder) DecodeInt64

func (d *Decoder) DecodeInt64() int64

DecodeInt64

func (*Decoder) DecodeInt8

func (d *Decoder) DecodeInt8() int8

DecodeInt8

func (*Decoder) DecodeStringPadded

func (d *Decoder) DecodeStringPadded(size int) string

DecodeStringPadded

func (*Decoder) DecodeTime

func (d *Decoder) DecodeTime() time.Time

DecodeTime

func (*Decoder) DecodeUInt16

func (d *Decoder) DecodeUInt16() uint16

DecodeUInt16

func (*Decoder) DecodeUInt32

func (d *Decoder) DecodeUInt32() uint32

DecodeUInt32

func (*Decoder) DecodeUInt64

func (d *Decoder) DecodeUInt64() uint64

DecodeUInt64

func (*Decoder) DecodeUInt8

func (d *Decoder) DecodeUInt8() uint8

DecodeUInt8

type Encoder

type Encoder struct {
	RowSerializerBase
}

Encoder is passed to Row.Encode as the encoding context and helper.

func NewEncoder

func NewEncoder(buffer []byte) *Encoder

NewEncoder

func (*Encoder) EncodeBool

func (e *Encoder) EncodeBool(v bool)

EncodeBool

func (*Encoder) EncodeBytesPadded

func (e *Encoder) EncodeBytesPadded(b []byte, size int)

EncodeBytesPadded

func (*Encoder) EncodeFloat32

func (e *Encoder) EncodeFloat32(v float32)

EncodeFloat32

func (*Encoder) EncodeFloat64

func (e *Encoder) EncodeFloat64(v float64)

EncodeFloat64

func (*Encoder) EncodeInt16

func (e *Encoder) EncodeInt16(v int16)

EncodeInt16

func (*Encoder) EncodeInt32

func (e *Encoder) EncodeInt32(v int32)

EncodeInt32

func (*Encoder) EncodeInt64

func (e *Encoder) EncodeInt64(v int64)

EncodeInt64

func (*Encoder) EncodeInt8

func (e *Encoder) EncodeInt8(v int8)

EncodeInt8

func (*Encoder) EncodeStringPadded

func (e *Encoder) EncodeStringPadded(s string, size int)

EncodeStringPadded

func (*Encoder) EncodeTime

func (e *Encoder) EncodeTime(t time.Time)

EncodeTime

func (*Encoder) EncodeUInt16

func (e *Encoder) EncodeUInt16(v uint16)

EncodeUInt16

func (*Encoder) EncodeUInt32

func (e *Encoder) EncodeUInt32(v uint32)

EncodeUInt32

func (*Encoder) EncodeUInt64

func (e *Encoder) EncodeUInt64(v uint64)

EncodeUInt64

func (*Encoder) EncodeUInt8

func (e *Encoder) EncodeUInt8(v uint8)

EncodeUInt8

type Row

type Row interface {
	Factory() Row
	Encode(ctx *Encoder) error
	Decode(ctx *Decoder) error
	Columns() RowSpec
}

type RowSerializerBase

type RowSerializerBase struct {
	// contains filtered or unexported fields
}

func (*RowSerializerBase) Bytes

func (s *RowSerializerBase) Bytes() []byte

Bytes returns the byte slice of the serializer.

func (*RowSerializerBase) Reset

func (s *RowSerializerBase) Reset(buffer []byte)

Reset resets the encoder.

type RowSpec

type RowSpec []Column

func NewRowSpec

func NewRowSpec(columns ...Column) (r RowSpec)

func (RowSpec) RowSize

func (s RowSpec) RowSize() (size uint8)

RowSize returns the size of a row.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL