arcticdb

package module
v0.0.0-...-969a4e3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2022 License: Apache-2.0 Imports: 30 Imported by: 0

README


Go Reference Go Report Card Build Discord

This project is still in its infancy, consider it not production-ready, probably has various consistency and correctness problems and all API will change!

ArcticDB is an embeddable columnar database written in Go. It features semi-structured schemas (could also be described as typed wide-columns), and uses Apache Parquet for storage, and Apache Arrow at query time. Building on top of Apache Arrow, ArcticDB provides a query builder and various optimizers (it reminds of DataFrame-like APIs).

ArcticDB is optimized for use cases where the majority of interactions are writes, and when data is queried, a lot of data is queried at once (our use case at Polar Signals can be broadly described as Observability and specifically for Parca). It could also be described as a wide-column columnar database.

Read the annoucement blog post to learn about what made us create it: https://www.polarsignals.com/blog/posts/2022/05/04/introducing-arcticdb/

Why you should use ArcticDB

Columnar data stores have become incredibly popular for analytics data. Structuring data in columns instead of rows leverages the architecture of modern hardware, allowing for efficient processing of data. A columnar data store might be right for you if you have workloads where you write a lot of data and need to perform analytics on that data.

ArcticDB is similar to many other in-memory columnar databases such as DuckDB or InfluxDB IOx.

ArcticDB may be a better fit for you if:

  • Are developing a Go program
  • Want to embed a columnar database in your program instead of running a separate server
  • Have immutable datasets that don't require updating or deleting
  • Your data contains dynamic columns, where a column may expand during runtime

ArcticDB is likely not suitable for your needs if:

  • You aren't developing in Go
  • You require a standalone database server
  • You need to modify or delete your data
  • You query by rows instead of columns

Getting Started

You can explore the examples directory for sample code using ArcticDB. Below is a snippet from the simple database example. It creates a database with a dynamic column schema, inserts some data, and queries it back out.

// Create a new column store
columnstore := arcticdb.New(
    prometheus.NewRegistry(),
    8192,
    10*1024*1024, // 10MiB
)

// Open up a database in the column store
database, _ := columnstore.DB("simple_db")

// Define our simple schema of labels and values
schema := simpleSchema()

// Create a table named simple in our database
table, _ := database.Table(
    "simple_table",
    arcticdb.NewTableConfig(schema),
    log.NewNopLogger(),
)

// Create values to insert into the database these first rows havel dynamic label names of 'firstname' and 'surname'
buf, _ := schema.NewBuffer(map[string][]string{
    "names": {"firstname", "surname"},
})

// firstname:Frederic surname:Brancz 100
buf.WriteRow([]parquet.Value{
    parquet.ValueOf("Frederic").Level(0, 1, 0),
    parquet.ValueOf("Brancz").Level(0, 1, 1),
    parquet.ValueOf(100).Level(0, 0, 2),
})

// firstname:Thor surname:Hansen 10
buf.WriteRow([]parquet.Value{
    parquet.ValueOf("Thor").Level(0, 1, 0),
    parquet.ValueOf("Hansen").Level(0, 1, 1),
    parquet.ValueOf(10).Level(0, 0, 2),
})
table.InsertBuffer(buf)

// Now we can insert rows that have middle names into our dynamic column
buf, _ = schema.NewBuffer(map[string][]string{
    "names": {"firstname", "middlename", "surname"},
})
// firstname:Matthias middlename:Oliver surname:Loibl 1
buf.WriteRow([]parquet.Value{
    parquet.ValueOf("Matthias").Level(0, 1, 0),
    parquet.ValueOf("Oliver").Level(0, 1, 1),
    parquet.ValueOf("Loibl").Level(0, 1, 2),
    parquet.ValueOf(1).Level(0, 0, 3),
})
table.InsertBuffer(buf)

// Create a new query engine to retrieve data and print the results
engine := query.NewEngine(memory.DefaultAllocator, database.TableProvider())
engine.ScanTable("simple_table").
    Filter(
        logicalplan.Col("names.firstname").Eq(logicalplan.Literal("Frederic")),
    ).Execute(context.Background(), func(r arrow.Record) error {
    fmt.Println(r)
    return nil
})

Design choices

ArcticDB was specifically built for Observability workloads. This resulted in several characteristics that make it unique in its combination.

Table Of Contents:

Columnar layout

Observability data is most useful when highly dimensional and those dimensions can be searched and aggregated by efficiently. Contrary to many relational databases like (MySQL, PostgreSQL, CockroachDB, TiDB, etc.) that store data all data belonging to a single row together, in a columnar layout all data of the same column in a table is available in one contiguous chunk of data, making it very efficient to scan and more importantly, only the data truly necessary for a query is loaded in the first place. ArcticDB uses Apache Parquet for storage, and Apache Arrow at query time. Apache Parquet is used for storage to make use of its efficient encodings to save on memory and disk space. Apache Arrow is used at query time as a foundation to vectorize the query execution.

Dynamic Columns

While columnar databases already exist, most require a static schema, however, Observability workloads differ in that their schemas are not static, meaning not all columns are pre-defined. On the other hand, wide column databases also already exist, but typically are not strictly typed, and most wide-column databases are row-based databases, not columnar databases.

Take a Prometheus time-series for example. Prometheus time-series are uniquely identified by the combination of their label-sets:

http_requests_total{path="/api/v1/users", code="200"} 12

This model does not map well into a static schema, as label-names cannot be known upfront. The most suitable data-type some columnar databases have to offer is a map, however, maps have the same problems as row-based databases, where all values of a map in a row are stored together, unable to exploit the advantages of a columnar layout. An ArcticDB schema can define a column to be dynamic, causing a column to be created on the fly when a new label-name is seen.

An ArcticDB schema for Prometheus could look like this:

package arcticprometheus

import (
	"github.com/polarsignals/arcticdb/dynparquet"
	"github.com/segmentio/parquet-go"
)

func Schema() *dynparquet.Schema {
	return dynparquet.NewSchema(
		"prometheus",
		[]dynparquet.ColumnDefinition{{
			Name:          "labels",
			StorageLayout: parquet.Encoded(parquet.Optional(parquet.String()), &parquet.RLEDictionary),
			Dynamic:       true,
		}, {
			Name:          "timestamp",
			StorageLayout: parquet.Int(64),
			Dynamic:       false,
		}, {
			Name:          "value",
			StorageLayout: parquet.Leaf(parquet.DoubleType),
			Dynamic:       false,
		}},
		[]dynparquet.SortingColumn{
			dynparquet.NullsFirst(dynparquet.Ascending("labels")),
			dynparquet.Ascending("timestamp"),
		},
	)
}

Note: We are aware that Prometheus uses double-delta encoding for timestamps and XOR encoding for values. This schema is purely an example to highlight the dynamic columns feature.

With this schema, all rows are expected to have a timestamp and a value but can vary in their columns prefixed with labels.. In this schema all dynamically created columns are still Dictionary and run-length encoded and must be of type string.

Immutable & Sorted

There are only writes and reads. All data is immutable and sorted. Having all data sorted allows ArcticDB to avoid maintaining an index per column, and still serve queries with low latency.

To maintain global sorting ArcticDB requires all inserts to be sorted if they contain multiple rows. Combined with immutability, global sorting of all data can be maintained at a reasonable cost. To optimize throughput, it is preferable to perform inserts in as large batches as possible. ArcticDB maintains inserted data in batches of a configurable amount of rows (by default 8192), called a Granule. To directly jump to data needed for a query, ArcticDB maintains a sparse index of Granules. The sparse index is small enough to fully reside in memory, it is currently implemented as a b-tree of Granules.

Sparse index of Granules

At insert time, ArcticDB splits the inserted rows into the appropriate Granule according to their lower and upper bound, to maintain global sorting. Once a Granule exceeds the configured amount, the Granule is split into N new Granules depending.

Split of Granule

Under the hood, Granules are a list of sorted Parts, and only if a query requires it are all parts merged into a sorted stream using a direct k-way merge using a min-heap. An example of an operation that requires the whole Granule to be read as a single sorted stream are the aforementioned Granule splits.

A Granule is organized in Parts

Snapshot isolation

ArcticDB has snapshot isolation, however, it comes with a few caveats that should be well understood. It does not have read-after-write consistency as the intended use is for users reading data that are not the same as the entity writing data to it. To see new data the user re-runs a query. Choosing to trade-off read-after-write consistency allows for mechanisms to increase throughput significantly. ArcticDB releases write transactions in batches. It essentially only ensures write atomicity and that writes are not torn when reading. Since data is immutable, those characteristics together result in snapshot isolation.

More concretely, arcticDB maintains a watermark indicating that all transactions equal and lower to the watermark are safe to be read. Only write transactions obtain a new transaction ID, while reads use the transaction ID of the watermark to identify data that is safe to be read. The watermark is only increased when strictly monotonic, consecutive transactions have finished. This means that a low write transaction can block higher write transactions to become available to be read. To ensure progress is made, write transactions have a timeout.

This mechanism inspired by a mix of Google Spanner, Google Percolator and Highly Available Transactions.

Transactions are released in batches indicated by the watermark

Roadmap

  • Persistence: ArcticDB is currently fully in-memory.

Acknowledgments

ArcticDB stands on the shoulders of giants. Shout out to Segment for creating the incredible parquet-go library as well as InfluxData for starting and various contributors after them working on Go support for Apache Arrow.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoSchema = fmt.Errorf("no schema")
View Source
var ErrUnexpectedNumberOfFields = errors.New("unexpected number of fields")
View Source
var ErrUnsupportedBinaryOperation = errors.New("unsupported binary operation")

Functions

func BinaryScalarOperation

func BinaryScalarOperation(left parquet.ColumnChunk, right parquet.Value, operator logicalplan.Operator) (bool, error)

Types

type AlwaysTrueFilter

type AlwaysTrueFilter struct{}

func (*AlwaysTrueFilter) Eval

type AndExpr

type AndExpr struct {
	Left  TrueNegativeFilter
	Right TrueNegativeFilter
}

func (*AndExpr) Eval

func (a *AndExpr) Eval(rg dynparquet.DynamicRowGroup) (bool, error)

type BinaryScalarExpr

type BinaryScalarExpr struct {
	Left  *ColumnRef
	Op    logicalplan.Operator
	Right parquet.Value
}

func (BinaryScalarExpr) Eval

type BucketPrefixDecorator

type BucketPrefixDecorator struct {
	objstore.Bucket
	// contains filtered or unexported fields
}

func (*BucketPrefixDecorator) Attributes

func (*BucketPrefixDecorator) Delete

func (b *BucketPrefixDecorator) Delete(ctx context.Context, name string) error

func (*BucketPrefixDecorator) Exists

func (b *BucketPrefixDecorator) Exists(ctx context.Context, name string) (bool, error)

func (*BucketPrefixDecorator) Get

func (*BucketPrefixDecorator) GetRange

func (b *BucketPrefixDecorator) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error)

func (*BucketPrefixDecorator) Iter

func (b *BucketPrefixDecorator) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error

func (*BucketPrefixDecorator) Name

func (b *BucketPrefixDecorator) Name() string

func (*BucketPrefixDecorator) Upload

func (b *BucketPrefixDecorator) Upload(ctx context.Context, name string, r io.Reader) error

type ColumnRef

type ColumnRef struct {
	ColumnName string
}

func (*ColumnRef) Column

func (c *ColumnRef) Column(rg dynparquet.DynamicRowGroup) (parquet.ColumnChunk, bool, error)

type ColumnStore

type ColumnStore struct {
	// contains filtered or unexported fields
}

func New

func New(
	reg prometheus.Registerer,
	granuleSize int,
	activeMemorySize int64,
) *ColumnStore

func (*ColumnStore) Close

func (s *ColumnStore) Close() error

func (*ColumnStore) DB

func (s *ColumnStore) DB(name string) (*DB, error)

func (*ColumnStore) WithIndexDegree

func (s *ColumnStore) WithIndexDegree(indexDegree int) *ColumnStore

func (*ColumnStore) WithSplitSize

func (s *ColumnStore) WithSplitSize(splitSize int) *ColumnStore

func (*ColumnStore) WithStorageBucket

func (s *ColumnStore) WithStorageBucket(bucket objstore.Bucket) *ColumnStore

func (*ColumnStore) WithStoragePath

func (s *ColumnStore) WithStoragePath(storagePath string) *ColumnStore

type DB

type DB struct {
	// contains filtered or unexported fields
}

func (*DB) Close

func (db *DB) Close() error

func (*DB) StorePath

func (db *DB) StorePath() string

func (*DB) Table

func (db *DB) Table(name string, config *TableConfig, logger log.Logger) (*Table, error)

func (*DB) TableProvider

func (db *DB) TableProvider() *DBTableProvider

func (*DB) Wait

func (db *DB) Wait(tx uint64)

Wait is a blocking function that returns once the high watermark has equaled or exceeded the transaction id. Wait makes no differentiation between completed and aborted transactions.

type DBTableProvider

type DBTableProvider struct {
	// contains filtered or unexported fields
}

func NewDBTableProvider

func NewDBTableProvider(db *DB) *DBTableProvider

func (*DBTableProvider) GetTable

func (p *DBTableProvider) GetTable(name string) logicalplan.TableReader

type ErrCreateSchemaWriter

type ErrCreateSchemaWriter struct {
	// contains filtered or unexported fields
}

func (ErrCreateSchemaWriter) Error

func (e ErrCreateSchemaWriter) Error() string

type ErrReadRow

type ErrReadRow struct {
	// contains filtered or unexported fields
}

func (ErrReadRow) Error

func (e ErrReadRow) Error() string

type ErrWriteRow

type ErrWriteRow struct {
	// contains filtered or unexported fields
}

func (ErrWriteRow) Error

func (e ErrWriteRow) Error() string

type Granule

type Granule struct {
	// contains filtered or unexported fields
}

func NewGranule

func NewGranule(granulesCreated prometheus.Counter, tableConfig *TableConfig, firstPart *Part) (*Granule, error)

func (*Granule) AddPart

func (g *Granule) AddPart(p *Part) (uint64, error)

AddPart returns the new cardinality of the Granule.

func (*Granule) Least

func (g *Granule) Least() *dynparquet.DynamicRow

Least returns the least row in a Granule.

func (*Granule) Less

func (g *Granule) Less(than btree.Item) bool

Less implements the btree.Item interface.

func (*Granule) PartBuffersForTx

func (g *Granule) PartBuffersForTx(watermark uint64, iterator func(*dynparquet.SerializedBuffer) bool)

PartBuffersForTx returns the PartBuffers for the given transaction constraints.

type GranuleMetadata

type GranuleMetadata struct {
	// contains filtered or unexported fields
}

GranuleMetadata is the metadata for a granule.

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node is a Part that is a part of a linked-list.

type Part

type Part struct {
	Buf *dynparquet.SerializedBuffer
	// contains filtered or unexported fields
}

func NewPart

func NewPart(tx uint64, buf *dynparquet.SerializedBuffer) *Part

func (*Part) Least

func (p *Part) Least() (*dynparquet.DynamicRow, error)

Least returns the least row in the part.

type PartList

type PartList struct {
	// contains filtered or unexported fields
}

func NewPartList

func NewPartList(next unsafe.Pointer, total uint64, s SentinelType) *PartList

NewPartList creates a new PartList using atomic constructs.

func (*PartList) Iterate

func (l *PartList) Iterate(iterate func(*Part) bool)

Iterate accesses every node in the list.

func (*PartList) Prepend

func (l *PartList) Prepend(part *Part) *Node

Prepend a node onto the front of the list.

func (*PartList) Sentinel

func (l *PartList) Sentinel(s SentinelType) *PartList

Sentinel adds a new sentinel node to the list, and returns the sub list starting from that sentinel.

type PreExprVisitorFunc

type PreExprVisitorFunc func(expr logicalplan.Expr) bool

func (PreExprVisitorFunc) PostVisit

func (f PreExprVisitorFunc) PostVisit(expr logicalplan.Expr) bool

func (PreExprVisitorFunc) PreVisit

func (f PreExprVisitorFunc) PreVisit(expr logicalplan.Expr) bool

type SentinelType

type SentinelType uint8
const (
	None SentinelType = iota
	Compacting
	Compacted
)

type Table

type Table struct {
	// contains filtered or unexported fields
}

func (*Table) ActiveBlock

func (t *Table) ActiveBlock() *TableBlock

func (*Table) ArrowSchema

func (t *Table) ArrowSchema(
	ctx context.Context,
	tx uint64,
	pool memory.Allocator,
	projections []logicalplan.ColumnMatcher,
	filterExpr logicalplan.Expr,
	distinctColumns []logicalplan.ColumnMatcher,
) (*arrow.Schema, error)

func (*Table) Insert

func (t *Table) Insert(ctx context.Context, buf []byte) (uint64, error)

func (*Table) InsertBuffer

func (t *Table) InsertBuffer(ctx context.Context, buf *dynparquet.Buffer) (uint64, error)

func (*Table) IterateBucketBlocks

func (t *Table) IterateBucketBlocks(logger log.Logger, filter TrueNegativeFilter, iterator func(rg dynparquet.DynamicRowGroup) bool, lastBlockTimestamp uint64) error

func (*Table) Iterator

func (t *Table) Iterator(
	ctx context.Context,
	tx uint64,
	pool memory.Allocator,
	projections []logicalplan.ColumnMatcher,
	filterExpr logicalplan.Expr,
	distinctColumns []logicalplan.ColumnMatcher,
	iterator func(r arrow.Record) error,
) error

Iterator iterates in order over all granules in the table. It stops iterating when the iterator function returns false.

func (*Table) RotateBlock

func (t *Table) RotateBlock() error

func (*Table) Schema

func (t *Table) Schema() *dynparquet.Schema

func (*Table) SchemaIterator

func (t *Table) SchemaIterator(
	ctx context.Context,
	tx uint64,
	pool memory.Allocator,
	projections []logicalplan.ColumnMatcher,
	filterExpr logicalplan.Expr,
	distinctColumns []logicalplan.ColumnMatcher,
	iterator func(r arrow.Record) error,
) error

SchemaIterator iterates in order over all granules in the table and returns all the schemas seen across the table.

func (*Table) Sync

func (t *Table) Sync()

func (*Table) View

func (t *Table) View(fn func(tx uint64) error) error

type TableBlock

type TableBlock struct {
	// contains filtered or unexported fields
}

func (*TableBlock) Index

func (t *TableBlock) Index() *btree.BTree

Index provides atomic access to the table index.

func (*TableBlock) Insert

func (*TableBlock) Persist

func (t *TableBlock) Persist() error

Persist uploads the block to the underlying bucket.

func (*TableBlock) RowGroupIterator

func (t *TableBlock) RowGroupIterator(
	ctx context.Context,
	tx uint64,
	filterExpr logicalplan.Expr,
	filter TrueNegativeFilter,
	iterator func(rg dynparquet.DynamicRowGroup) bool,
) error

RowGroupIterator iterates in order over all granules in the table. It stops iterating when the iterator function returns false.

func (*TableBlock) Serialize

func (t *TableBlock) Serialize() ([]byte, error)

func (*TableBlock) Size

func (t *TableBlock) Size() int64

Size returns the cumulative size of all buffers in the table. This is roughly the size of the table in bytes.

func (*TableBlock) Sync

func (t *TableBlock) Sync()

Sync the table. This will return once all split operations have completed. Currently it does not prevent new inserts from happening, so this is only safe to rely on if you control all writers. In the future we may need to add a way to block new writes as well.

type TableConfig

type TableConfig struct {
	// contains filtered or unexported fields
}

func NewTableConfig

func NewTableConfig(
	schema *dynparquet.Schema,
) *TableConfig

type TrueNegativeFilter

type TrueNegativeFilter interface {
	Eval(dynparquet.DynamicRowGroup) (bool, error)
}

type TxNode

type TxNode struct {
	// contains filtered or unexported fields
}

type TxPool

type TxPool struct {
	// contains filtered or unexported fields
}

func NewTxPool

func NewTxPool(watermark *atomic.Uint64) *TxPool

NewTxPool returns a new TxPool and starts the pool cleaner routine.

func (*TxPool) Iterate

func (l *TxPool) Iterate(iterate func(tx uint64) bool)

Iterate accesses every node in the list.

func (*TxPool) Prepend

func (l *TxPool) Prepend(tx uint64) *TxNode

Prepend a node onto the front of the list.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL