Documentation ¶
Overview ¶
Copyright 2021 Molecula Corp. All rights reserved. Package batch provides tooling to prepare batches of records for ingest.
Copyright 2022 Molecula Corp. (DBA FeatureBase). SPDX-License-Identifier: Apache-2.0
Index ¶
- Constants
- Variables
- func Int64ToTimestamp(unit TimeUnit, epoch time.Time, val int64) (time.Time, error)
- func TimestampToInt64(unit TimeUnit, epoch time.Time, ts time.Time) (int64, error)
- type Batch
- type BatchOption
- func OptCacheMaxAge(age uint64) BatchOption
- func OptImporter(i featurebase.Importer) BatchOption
- func OptKeyTranslateBatchSize(v int) BatchOption
- func OptLogger(l logger.Logger) BatchOption
- func OptMaxStaleness(t time.Duration) BatchOption
- func OptSplitBatchMode(on bool) BatchOption
- func OptUseShardTransactionalEndpoint(use bool) BatchOption
- type QuantizedTime
- func (qt *QuantizedTime) Reset()
- func (qt *QuantizedTime) Set(t time.Time)
- func (qt *QuantizedTime) SetDay(day string)
- func (qt *QuantizedTime) SetHour(hour string)
- func (qt *QuantizedTime) SetMonth(month string)
- func (qt *QuantizedTime) SetYear(year string)
- func (qt *QuantizedTime) Time() (time.Time, error)
- type RecordBatch
- type Row
- type TimeUnit
Constants ¶
const ( TimeUnitSeconds = TimeUnit(featurebase.TimeUnitSeconds) TimeUnitMilliseconds = TimeUnit(featurebase.TimeUnitMilliseconds) TimeUnitMicroseconds = TimeUnit(featurebase.TimeUnitMicroseconds) TimeUnitUSeconds = TimeUnit(featurebase.TimeUnitUSeconds) TimeUnitNanoseconds = TimeUnit(featurebase.TimeUnitNanoseconds) )
const (
DefaultKeyTranslateBatchSize = 100000
)
Batch defaults.
Variables ¶
var ( MinTimestampNano = time.Unix(-1<<32, 0).UTC() // 1833-11-24T17:31:44Z MaxTimestampNano = time.Unix(1<<32, 0).UTC() // 2106-02-07T06:28:16Z MinTimestamp = time.Unix(-62135596799, 0).UTC() // 0001-01-01T00:00:01Z MaxTimestamp = time.Unix(253402300799, 0).UTC() // 9999-12-31T23:59:59Z ErrTimestampOutOfRange = errors.New("", "value provided for timestamp field is out of range") )
var ErrBatchAlreadyFull = errors.New("batch was already full, record was rejected")
ErrBatchAlreadyFull is a real error saying that Batch.Add did not complete because the batch was full.
var ErrBatchNowFull = errors.New("batch is now full - you cannot add any more records (though the one you just added was accepted)")
ErrBatchNowFull — similar to io.EOF — is a marker error to notify the user of a batch that it is time to call Import.
var ErrBatchNowStale = errors.New("batch is stale and needs to be imported (however, record was accepted)")
ErrBatchNowStale indicates that the oldest record in the batch is older than the maxStaleness value of the batch. Like ErrBatchNowFull, the error does not mean the record was rejected.
var (
ErrPreconditionFailed = errors.New("Precondition failed")
)
Predefined batch-related errors.
Functions ¶
func Int64ToTimestamp ¶
Int64ToTimestamp converts the provided int64 to a timestamp based on the time unit and epoch.
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch implements RecordBatch.
It supports Values of type string, uint64, int64, float64, or nil. The following table describes what Pilosa field each type of value must map to. Fields are set up when calling "NewBatch".
| type | pilosa field type | options | |--------+-------------------+-----------| | string | set | keys=true | | uint64 | set | any | | int64 | int | any | | float64| decimal | scale | | bool | bool | any | | nil | any | |
nil values are ignored.
func NewBatch ¶
func NewBatch(importer featurebase.Importer, size int, tbl *dax.Table, fields []*featurebase.FieldInfo, opts ...BatchOption) (*Batch, error)
NewBatch initializes a new Batch object which will use the given Importer, index, set of fields, and will take "size" records before returning ErrBatchNowFull. The positions of the Fields in 'fields' correspond to the positions of values in the Row's Values passed to Batch.Add().
func (*Batch) Add ¶
Add adds a record to the batch. Performance will be best if record IDs are shard-sorted. That is, all records which belong to the same Pilosa shard are added adjacent to each other. If the records are also in-order within a shard this will likely help as well. Add clears rec.Clears when it returns normally (either a nil error or BatchNowFull).
func (*Batch) Flush ¶
Flush is only applicable in split batch mode where it actually imports the stored data to Pilosa. Otherwise it simply returns nil.
func (*Batch) Import ¶
Import does translation, creates the fragment files, and then, if we're not using split batch mode, imports everything to Pilosa. It then resets internal data structures for the next batch. If we are using split batch mode, it saves the fragment data to the batch, resets all other internal structures, and continues. split batch mode DOES NOT CURRENTLY SUPPORT MUTEX OR INT FIELDS!
type BatchOption ¶
BatchOption is a functional option for Batch objects.
func OptCacheMaxAge ¶
func OptCacheMaxAge(age uint64) BatchOption
func OptImporter ¶
func OptImporter(i featurebase.Importer) BatchOption
func OptKeyTranslateBatchSize ¶
func OptKeyTranslateBatchSize(v int) BatchOption
func OptLogger ¶
func OptLogger(l logger.Logger) BatchOption
func OptMaxStaleness ¶
func OptMaxStaleness(t time.Duration) BatchOption
func OptSplitBatchMode ¶
func OptSplitBatchMode(on bool) BatchOption
func OptUseShardTransactionalEndpoint ¶
func OptUseShardTransactionalEndpoint(use bool) BatchOption
OptUseShardTransactionalEndpoint tells the batch to import using the newer shard-transactional endpoint.
type QuantizedTime ¶
type QuantizedTime struct {
// contains filtered or unexported fields
}
QuantizedTime represents a moment in time down to some granularity (year, month, day, or hour).
func (*QuantizedTime) Reset ¶
func (qt *QuantizedTime) Reset()
Reset sets the time to the zero value which generates no time views.
func (*QuantizedTime) Set ¶
func (qt *QuantizedTime) Set(t time.Time)
Set sets the Quantized time to the given timestamp (down to hour granularity).
func (*QuantizedTime) SetDay ¶
func (qt *QuantizedTime) SetDay(day string)
SetDay sets the QuantizedTime's day, but leaves year, month, and hour untouched.
func (*QuantizedTime) SetHour ¶
func (qt *QuantizedTime) SetHour(hour string)
SetHour sets the QuantizedTime's hour, but leaves year, month, and day untouched.
func (*QuantizedTime) SetMonth ¶
func (qt *QuantizedTime) SetMonth(month string)
SetMonth sets the QuantizedTime's month, but leaves year, day, and hour untouched.
func (*QuantizedTime) SetYear ¶
func (qt *QuantizedTime) SetYear(year string)
SetYear sets the quantized time's year, but leaves month, day, and hour untouched.
type RecordBatch ¶
type RecordBatch interface { Add(Row) error // Import does translation, creates the fragment files, and then, // if we're not using split batch mode, imports everything to // Pilosa. It then resets internal data structures for the next // batch. If we are using split batch mode, it saves the fragment // data to the batch, resets all other internal structures, and // continues. // Split batch mode DOES NOT CURRENTLY SUPPORT MUTEX OR INT FIELDS! Import() error // Len reports the number of records which have been added to the // batch since the last call to Import (or since it was created). Len() int // Flush is only applicable in split batch mode where it actually // imports the stored data to Pilosa. Otherwise it simply returns // nil. Flush() error }
RecordBatch is a Pilosa ingest interface designed to allow for maximum throughput on common workloads. Users should call Add() with a Row object until it returns ErrBatchNowFull, at which time they should call Import(), and then repeat.
Add will not modify or otherwise retain the Row once it returns, so it is recommended that callers reuse the same Row with repeated calls to Add, just modifying its values appropriately in between calls. This avoids allocating a new slice of Values for each inserted Row.
The supported types of the values in Row.Values are implementation defined. Similarly, the supported types for Row.ID are implementation defined.
type Row ¶
type Row struct { ID interface{} // Values map to the slice of fields in Batch.header Values []interface{} // Clears' int key is an index into Batch.header Clears map[int]interface{} // Time applies to all time fields Time QuantizedTime }
Row represents a single record which can be added to a Batch.