etl

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2022 License: BSD-3-Clause Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const KafkaOffsetEarliest = -2

KafkaOffsetEarliest is used to begin consuming at the earliest (oldest) offset.

Variables

View Source
var ErrBadPoolKey = errors.New("pool key must be 'kafka.offset' in ascending order")

Functions

func Build

func Build(transform *Transform) ([]string, error)

func Field

func Field(val *zed.Value, field string) (*zed.Value, error)

func FieldAsInt

func FieldAsInt(val *zed.Value, field string) (int64, error)

func FieldAsString

func FieldAsString(val *zed.Value, field string) (string, error)

func NewArrayFromReader

func NewArrayFromReader(zr zio.Reader) (*zbuf.Array, error)

Types

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(ctx context.Context, transform *Transform, service lakeapi.Interface) (*Pipeline, error)

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context) (int, error)

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func OpenPool

func OpenPool(ctx context.Context, poolName string, server lakeapi.Interface) (*Pool, error)

func (*Pool) LoadBatch

func (p *Pool) LoadBatch(ctx context.Context, zctx *zed.Context, batch *zbuf.Array) (ksuid.KSUID, error)

func (*Pool) NextProducerOffsets

func (p *Pool) NextProducerOffsets(ctx context.Context) (map[string]int64, error)

func (*Pool) Query

func (p *Pool) Query(ctx context.Context, src string) (*zbuf.Array, error)

type Route

type Route struct {
	Topic string `yaml:"topic"`
	Pool  string `yaml:"pool"`
}

type Routes

type Routes struct {
	// contains filtered or unexported fields
}

func (*Routes) InputsOf

func (r *Routes) InputsOf(output string) []string

func (*Routes) LookupPool

func (r *Routes) LookupPool(topic string) string

func (*Routes) Outputs

func (r *Routes) Outputs() []string

type Rule

type Rule struct {
	Type  string `yaml:"type"`
	In    string `yaml:"in"`
	Left  string `yaml:"left"`
	Right string `yaml:"right"`
	Join  string `yaml:"join-on"`
	Out   string `yaml:"out"`
	Where string `yaml:"where"`
	Zed   string `yaml:"zed"`
}

type Transform

type Transform struct {
	Inputs []Route `yaml:"inputs"`
	Output Route   `yaml:"output"`
	ETLs   []Rule  `yaml:"transforms"`
}

func Load

func Load(path string) (*Transform, error)

func (*Transform) Load

func (t *Transform) Load(path string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL