batch

package
v3.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: Apache-2.0, Apache-2.0 Imports: 14 Imported by: 0

README

batch

The batch package provides a standard tool set for batching records in a way that is most performant for ingesting those records into FeatureBase. The main implementation is Batch (which can be initated with the NewBatch() function). The NewBatch() function takes an Importer which contains all of the methods required to interact with FeatureBase; these include methods for doing string/id translation as well as for importing shards of data.

IDK uses the batch package internally. Another example where the batch package is used in the sql3 package. When an "INSERT INTO" statement is executed, the SQL engine uses a Batch to do key translation and build import batches prior to doing the final import.

Integration tests

To run the tests, you will need to install the following dependencies:

  1. Docker
  2. Docker Compose

In addition to these dependancies, you will need to be added to the molecula Gitlab account.

First start the test environment. This is a docker-compose environment that includes featurebase.

make startup

To build and run the integration tests, run:

make test-run-local

Then to shut down the test environment, run:

make shutdown

The previous command is equivalent to running the following:

make startup
sleep 30 # wait for services to come up
make test-run
make shutdown

To run an individual test, you can run the command directly using docker-compose. Note that you must run docker-compose build batch-test for docker to run the latest code. Modify the following as needed:

make startup
docker-compose build batch-test
docker-compose run batch-test /usr/local/go/bin/go test -count=1 -mod=vendor -run=TestCmdMainOne .

Documentation

Overview

Copyright 2021 Molecula Corp. All rights reserved. Package batch provides tooling to prepare batches of records for ingest.

Copyright 2022 Molecula Corp. (DBA FeatureBase). SPDX-License-Identifier: Apache-2.0

Index

Constants

View Source
const (
	TimeUnitSeconds      = TimeUnit(featurebase.TimeUnitSeconds)
	TimeUnitMilliseconds = TimeUnit(featurebase.TimeUnitMilliseconds)
	TimeUnitMicroseconds = TimeUnit(featurebase.TimeUnitMicroseconds)
	TimeUnitUSeconds     = TimeUnit(featurebase.TimeUnitUSeconds)
	TimeUnitNanoseconds  = TimeUnit(featurebase.TimeUnitNanoseconds)
)
View Source
const (
	DefaultKeyTranslateBatchSize = 100000
)

Batch defaults.

Variables

View Source
var (
	MinTimestampNano = time.Unix(-1<<32, 0).UTC()       // 1833-11-24T17:31:44Z
	MaxTimestampNano = time.Unix(1<<32, 0).UTC()        // 2106-02-07T06:28:16Z
	MinTimestamp     = time.Unix(-62135596799, 0).UTC() // 0001-01-01T00:00:01Z
	MaxTimestamp     = time.Unix(253402300799, 0).UTC() // 9999-12-31T23:59:59Z

	ErrTimestampOutOfRange = errors.New("", "value provided for timestamp field is out of range")
)
View Source
var ErrBatchAlreadyFull = errors.New("batch was already full, record was rejected")

ErrBatchAlreadyFull is a real error saying that Batch.Add did not complete because the batch was full.

View Source
var ErrBatchNowFull = errors.New("batch is now full - you cannot add any more records (though the one you just added was accepted)")

ErrBatchNowFull — similar to io.EOF — is a marker error to notify the user of a batch that it is time to call Import.

View Source
var ErrBatchNowStale = errors.New("batch is stale and needs to be imported (however, record was accepted)")

ErrBatchNowStale indicates that the oldest record in the batch is older than the maxStaleness value of the batch. Like ErrBatchNowFull, the error does not mean the record was rejected.

View Source
var (
	ErrPreconditionFailed = errors.New("Precondition failed")
)

Predefined batch-related errors.

Functions

func Int64ToTimestamp

func Int64ToTimestamp(unit TimeUnit, epoch time.Time, val int64) (time.Time, error)

Int64ToTimestamp converts the provided int64 to a timestamp based on the time unit and epoch.

func TimestampToInt64

func TimestampToInt64(unit TimeUnit, epoch time.Time, ts time.Time) (int64, error)

TimestampToInt64 converts the provided timestamp to an int64 as the number of units past the epoch.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch implements RecordBatch.

It supports Values of type string, uint64, int64, float64, or nil. The following table describes what Pilosa field each type of value must map to. Fields are set up when calling "NewBatch".

| type | pilosa field type | options | |--------+-------------------+-----------| | string | set | keys=true | | uint64 | set | any | | int64 | int | any | | float64| decimal | scale | | bool | bool | any | | nil | any | |

nil values are ignored.

func NewBatch

func NewBatch(importer featurebase.Importer, size int, tbl *dax.Table, fields []*featurebase.FieldInfo, opts ...BatchOption) (*Batch, error)

NewBatch initializes a new Batch object which will use the given Importer, index, set of fields, and will take "size" records before returning ErrBatchNowFull. The positions of the Fields in 'fields' correspond to the positions of values in the Row's Values passed to Batch.Add().

func (*Batch) Add

func (b *Batch) Add(rec Row) error

Add adds a record to the batch. Performance will be best if record IDs are shard-sorted. That is, all records which belong to the same Pilosa shard are added adjacent to each other. If the records are also in-order within a shard this will likely help as well. Add clears rec.Clears when it returns normally (either a nil error or BatchNowFull).

func (*Batch) Flush

func (b *Batch) Flush() error

Flush is only applicable in split batch mode where it actually imports the stored data to Pilosa. Otherwise it simply returns nil.

func (*Batch) Import

func (b *Batch) Import() error

Import does translation, creates the fragment files, and then, if we're not using split batch mode, imports everything to Pilosa. It then resets internal data structures for the next batch. If we are using split batch mode, it saves the fragment data to the batch, resets all other internal structures, and continues. split batch mode DOES NOT CURRENTLY SUPPORT MUTEX OR INT FIELDS!

func (*Batch) Len

func (b *Batch) Len() int

type BatchOption

type BatchOption func(b *Batch) error

BatchOption is a functional option for Batch objects.

func OptCacheMaxAge

func OptCacheMaxAge(age uint64) BatchOption

func OptImporter

func OptImporter(i featurebase.Importer) BatchOption

func OptKeyTranslateBatchSize

func OptKeyTranslateBatchSize(v int) BatchOption

func OptLogger

func OptLogger(l logger.Logger) BatchOption

func OptMaxStaleness

func OptMaxStaleness(t time.Duration) BatchOption

func OptSplitBatchMode

func OptSplitBatchMode(on bool) BatchOption

func OptUseShardTransactionalEndpoint

func OptUseShardTransactionalEndpoint(use bool) BatchOption

OptUseShardTransactionalEndpoint tells the batch to import using the newer shard-transactional endpoint.

type Batcher added in v3.34.0

type Batcher interface {
	NewBatch(cfg Config, tbl *dax.Table, fields []*dax.Field) (RecordBatch, error)
}

Batcher is an interface implemented by anything which can allocate new batches.

type Config added in v3.34.0

type Config struct {
	Size         int
	MaxStaleness time.Duration
}

Config is the configuration options passed to NewBatch for any implementation of the Batcher interface.

type QuantizedTime

type QuantizedTime struct {
	// contains filtered or unexported fields
}

QuantizedTime represents a moment in time down to some granularity (year, month, day, or hour).

func (*QuantizedTime) Reset

func (qt *QuantizedTime) Reset()

Reset sets the time to the zero value which generates no time views.

func (*QuantizedTime) Set

func (qt *QuantizedTime) Set(t time.Time)

Set sets the Quantized time to the given timestamp (down to hour granularity).

func (*QuantizedTime) SetDay

func (qt *QuantizedTime) SetDay(day string)

SetDay sets the QuantizedTime's day, but leaves year, month, and hour untouched.

func (*QuantizedTime) SetHour

func (qt *QuantizedTime) SetHour(hour string)

SetHour sets the QuantizedTime's hour, but leaves year, month, and day untouched.

func (*QuantizedTime) SetMonth

func (qt *QuantizedTime) SetMonth(month string)

SetMonth sets the QuantizedTime's month, but leaves year, day, and hour untouched.

func (*QuantizedTime) SetYear

func (qt *QuantizedTime) SetYear(year string)

SetYear sets the quantized time's year, but leaves month, day, and hour untouched.

func (*QuantizedTime) Time

func (qt *QuantizedTime) Time() (time.Time, error)

type RecordBatch

type RecordBatch interface {
	Add(Row) error

	// Import does translation, creates the fragment files, and then,
	// if we're not using split batch mode, imports everything to
	// Pilosa. It then resets internal data structures for the next
	// batch. If we are using split batch mode, it saves the fragment
	// data to the batch, resets all other internal structures, and
	// continues.
	// Split batch mode DOES NOT CURRENTLY SUPPORT MUTEX OR INT FIELDS!
	Import() error

	// Len reports the number of records which have been added to the
	// batch since the last call to Import (or since it was created).
	Len() int

	// Flush is only applicable in split batch mode where it actually
	// imports the stored data to Pilosa. Otherwise it simply returns
	// nil.
	Flush() error
}

RecordBatch is a Pilosa ingest interface designed to allow for maximum throughput on common workloads. Users should call Add() with a Row object until it returns ErrBatchNowFull, at which time they should call Import(), and then repeat.

Add will not modify or otherwise retain the Row once it returns, so it is recommended that callers reuse the same Row with repeated calls to Add, just modifying its values appropriately in between calls. This avoids allocating a new slice of Values for each inserted Row.

The supported types of the values in Row.Values are implementation defined. Similarly, the supported types for Row.ID are implementation defined.

type Row

type Row struct {
	ID interface{}
	// Values map to the slice of fields in Batch.header
	Values []interface{}
	// Clears' int key is an index into Batch.header
	Clears map[int]interface{}
	// Time applies to all time fields
	Time QuantizedTime
}

Row represents a single record which can be added to a Batch.

type TimeUnit

type TimeUnit string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL