lake

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2022 License: BSD-3-Clause Imports: 36 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DataTag     = "data"
	IndexTag    = "index"
	BranchesTag = "branches"
	CommitsTag  = "commits"
)
View Source
const (
	Version         = 1
	PoolsTag        = "pools"
	IndexRulesTag   = "index_rules"
	LakeMagicFile   = "lake.zng"
	LakeMagicString = "ZED LAKE"
)

Variables

View Source
var (
	ErrCommitFailed      = fmt.Errorf("exceeded max update attempts (%d) to branch tip: commit failed", maxCommitRetries)
	ErrInvalidCommitMeta = errors.New("cannot parse ZSON string")
)

Functions

func CreateBranch

func CreateBranch(ctx context.Context, poolConfig *pools.Config, engine storage.Engine, root *storage.URI, name string, parent ksuid.KSUID) (*branches.Config, error)

func CreatePool

func CreatePool(ctx context.Context, config *pools.Config, engine storage.Engine, root *storage.URI) error

func DataPath

func DataPath(poolPath *storage.URI) *storage.URI

func IndexPath

func IndexPath(poolPath *storage.URI) *storage.URI

func RemovePool

func RemovePool(ctx context.Context, config *pools.Config, engine storage.Engine, root *storage.URI) error

func ScanIndexes

func ScanIndexes(ctx context.Context, snap commits.View, span extent.Span, o order.Which, ch chan<- *index.Object) error

func ScanPartitions

func ScanPartitions(ctx context.Context, snap commits.View, span extent.Span, o order.Which, ch chan<- Partition) error

ScanPartitions partitions all the data objects in snap overlapping span into non-overlapping partitions, sorts them by pool key and order, and sends them to ch.

func ScanSpan

func ScanSpan(ctx context.Context, snap commits.View, span extent.Span, o order.Which, ch chan<- data.Object) error

func ScanSpanInOrder

func ScanSpanInOrder(ctx context.Context, snap commits.View, span extent.Span, o order.Which, ch chan<- data.Object) error

Types

type Branch

type Branch struct {
	branches.Config
	// contains filtered or unexported fields
}

func OpenBranch

func OpenBranch(ctx context.Context, config *branches.Config, engine storage.Engine, poolPath *storage.URI, pool *Pool) (*Branch, error)

func (*Branch) ApplyIndexRules

func (b *Branch) ApplyIndexRules(ctx context.Context, rules []index.Rule, ids []ksuid.KSUID) (ksuid.KSUID, error)

func (*Branch) Delete

func (b *Branch) Delete(ctx context.Context, ids []ksuid.KSUID, author, message string) (ksuid.KSUID, error)

func (*Branch) Load

func (b *Branch) Load(ctx context.Context, zctx *zed.Context, r zio.Reader, author, message, meta string) (ksuid.KSUID, error)

func (*Branch) LookupTags

func (b *Branch) LookupTags(ctx context.Context, tags []ksuid.KSUID) ([]ksuid.KSUID, error)

func (*Branch) Pool

func (b *Branch) Pool() *Pool

func (*Branch) Revert

func (b *Branch) Revert(ctx context.Context, commit ksuid.KSUID, author, message string) (ksuid.KSUID, error)

func (*Branch) Stats

func (b *Branch) Stats(ctx context.Context, snap commits.View) (info BranchStats, err error)

func (*Branch) UpdateIndex

func (b *Branch) UpdateIndex(ctx context.Context, rules []index.Rule) (ksuid.KSUID, error)

type BranchMeta

type BranchMeta struct {
	Pool   pools.Config    `zed:"pool"`
	Branch branches.Config `zed:"branch"`
}

type BranchStats

type BranchStats struct {
	Size int64 `zed:"size"`
	// XXX (nibs) - This shouldn't be a span because keys don't have to be time.
	Span *nano.Span `zed:"span"`
}

type BranchTip

type BranchTip struct {
	Name   string
	Commit ksuid.KSUID
}

type ImportStats

type ImportStats struct {
	ObjectsWritten     int64
	RecordBytesWritten int64
	RecordsWritten     int64
}

func (*ImportStats) Accumulate

func (s *ImportStats) Accumulate(b ImportStats)

func (*ImportStats) Copy

func (s *ImportStats) Copy() ImportStats

type LakeMagic

type LakeMagic struct {
	Magic   string `zed:"magic"`
	Version int    `zed:"version"`
}

type Partition

type Partition struct {
	extent.Span

	Objects []*data.ObjectScan
	// contains filtered or unexported fields
}

A Partition is a logical view of the records within a time span, stored in one or more data objects. This provides a way to return the list of objects that should be scanned along with a span to limit the scan to only the span involved.

func PartitionObjects

func PartitionObjects(objects []*data.Object, o order.Which) []Partition

PartitionObjects takes a sorted set of data objects with possibly overlapping key ranges and returns an ordered list of Ranges such that none of the Ranges overlap with one another. This is the straightforward computational geometry problem of merging overlapping intervals, e.g., https://www.geeksforgeeks.org/merging-intervals/

XXX this algorithm doesn't quite do what we want because it continues to merge *anything* that overlaps. It's easy to fix though. Issue #2538

func (Partition) FormatRange

func (p Partition) FormatRange() string

func (Partition) FormatRangeOf

func (p Partition) FormatRangeOf(index int) string

func (Partition) IsZero

func (p Partition) IsZero() bool

type Pool

type Pool struct {
	pools.Config

	Path      *storage.URI
	DataPath  *storage.URI
	IndexPath *storage.URI
	// contains filtered or unexported fields
}

func OpenPool

func OpenPool(ctx context.Context, config *pools.Config, engine storage.Engine, root *storage.URI) (*Pool, error)

func (*Pool) ListBranches

func (p *Pool) ListBranches(ctx context.Context) ([]branches.Config, error)

func (*Pool) LookupBranchByName

func (p *Pool) LookupBranchByName(ctx context.Context, name string) (*branches.Config, error)

func (*Pool) Main

func (p *Pool) Main(ctx context.Context) (BranchMeta, error)

func (*Pool) ObjectExists

func (p *Pool) ObjectExists(ctx context.Context, id ksuid.KSUID) (bool, error)

XXX this is inefficient but is only meant for interactive queries...?

func (*Pool) OpenBranchByName

func (p *Pool) OpenBranchByName(ctx context.Context, name string) (*Branch, error)

func (*Pool) Snapshot

func (p *Pool) Snapshot(ctx context.Context, commit ksuid.KSUID) (commits.View, error)

func (*Pool) Stats

func (p *Pool) Stats(ctx context.Context, snap commits.View) (info PoolStats, err error)

type PoolStats

type PoolStats struct {
	Size int64 `zed:"size"`
	// XXX (nibs) - This shouldn't be a span because keys don't have to be time.
	Span *nano.Span `zed:"span"`
}

XXX for backward compat keep this for now, and return branchstats for pool/main

type Root

type Root struct {
	// contains filtered or unexported fields
}

The Root of the lake represents the path prefix and configuration state for all of the data pools in the lake.

func Create

func Create(ctx context.Context, engine storage.Engine, path *storage.URI) (*Root, error)

func CreateOrOpen

func CreateOrOpen(ctx context.Context, engine storage.Engine, path *storage.URI) (*Root, error)

func Open

func Open(ctx context.Context, engine storage.Engine, path *storage.URI) (*Root, error)

func (*Root) AddIndexRules

func (r *Root) AddIndexRules(ctx context.Context, rules []index.Rule) error

func (*Root) AllIndexRules

func (r *Root) AllIndexRules(ctx context.Context) ([]index.Rule, error)

func (*Root) CommitObject

func (r *Root) CommitObject(ctx context.Context, poolID ksuid.KSUID, branchName string) (ksuid.KSUID, error)

func (*Root) CreateBranch

func (r *Root) CreateBranch(ctx context.Context, poolID ksuid.KSUID, name string, parent ksuid.KSUID) (*branches.Config, error)

func (*Root) CreatePool

func (r *Root) CreatePool(ctx context.Context, name string, layout order.Layout, seekStride int, thresh int64) (*Pool, error)

func (*Root) DeleteIndexRules

func (r *Root) DeleteIndexRules(ctx context.Context, ids []ksuid.KSUID) ([]index.Rule, error)

func (*Root) Layout

func (r *Root) Layout(ctx context.Context, src dag.Source) order.Layout

func (*Root) ListPools

func (r *Root) ListPools(ctx context.Context) ([]pools.Config, error)

func (*Root) LookupIndexRules

func (r *Root) LookupIndexRules(ctx context.Context, names ...string) ([]index.Rule, error)

func (*Root) MergeBranch

func (r *Root) MergeBranch(ctx context.Context, poolID ksuid.KSUID, childBranch, parentBranch, author, message string) (ksuid.KSUID, error)

MergeBranch merges the indicated branch into its parent returning the commit tag of the new commit into the parent branch.

func (*Root) NewScheduler

func (r *Root) NewScheduler(ctx context.Context, zctx *zed.Context, src dag.Source, span extent.Span, filter zbuf.Filter, idx *dag.Filter) (op.Scheduler, error)

func (*Root) Open

func (*Root) OpenPool

func (r *Root) OpenPool(ctx context.Context, id ksuid.KSUID) (*Pool, error)

func (*Root) PoolID

func (r *Root) PoolID(ctx context.Context, poolName string) (ksuid.KSUID, error)

func (*Root) RemoveBranch

func (r *Root) RemoveBranch(ctx context.Context, poolID ksuid.KSUID, name string) error

func (*Root) RemovePool

func (r *Root) RemovePool(ctx context.Context, id ksuid.KSUID) error

RemovePool deletes a pool from the configuration journal and deletes all data associated with the pool.

func (*Root) RenamePool

func (r *Root) RenamePool(ctx context.Context, id ksuid.KSUID, newName string) error

func (*Root) Revert

func (r *Root) Revert(ctx context.Context, poolID ksuid.KSUID, branchName string, commitID ksuid.KSUID, author, message string) (ksuid.KSUID, error)

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewSortedScheduler

func NewSortedScheduler(ctx context.Context, zctx *zed.Context, pool *Pool, snap commits.View, span extent.Span, filter zbuf.Filter, index *index.Filter) *Scheduler

func (*Scheduler) AddProgress added in v1.0.0

func (s *Scheduler) AddProgress(progress zbuf.Progress)

func (*Scheduler) Progress added in v1.0.0

func (s *Scheduler) Progress() zbuf.Progress

func (*Scheduler) PullScanTask

func (s *Scheduler) PullScanTask() (zbuf.Puller, error)

func (*Scheduler) PullScanWork

func (s *Scheduler) PullScanWork() (Partition, error)

PullScanWork returns the next span in the schedule. This is useful for a worker proc that pulls spans from teh scheduler, sends them to a remote worker, and streams the results into the runtime DAG.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is a zio.Writer that consumes records into memory according to the pools data object threshold, sorts each resulting buffer, and writes it as an immutable object to the storage system. The presumption is that each buffer's worth of data fits into memory.

func NewWriter

func NewWriter(ctx context.Context, zctx *zed.Context, pool *Pool) (*Writer, error)

NewWriter creates a zio.Writer compliant writer for writing data to an a data pool presuming the input is not guaranteed to be sorted. XXX we should make another writer that takes sorted input and is a bit more efficient. This other writer could have different commit triggers to do useful things like paritioning given the context is a rollup.

func (*Writer) Close

func (w *Writer) Close() error

func (*Writer) Objects

func (w *Writer) Objects() []data.Object

func (*Writer) Stats

func (w *Writer) Stats() ImportStats

func (*Writer) Write

func (w *Writer) Write(rec *zed.Value) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL