etl

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2024 License: BSD-3-Clause Imports: 19 Imported by: 4

Documentation

Index

Constants

View Source
const KafkaOffsetEarliest = -2

KafkaOffsetEarliest is used to begin consuming at the earliest (oldest) offset.

Variables

View Source
var ErrBadPoolKey = errors.New("pool key must be 'kafka.offset' in ascending order")

Functions

func Build

func Build(transform *Transform) ([]string, error)

func Field

func Field(val zed.Value, field string) (zed.Value, error)

func FieldAsInt

func FieldAsInt(val zed.Value, field string) (int64, error)

func FieldAsString

func FieldAsString(val zed.Value, field string) (string, error)

func NewArrayFromReader

func NewArrayFromReader(zr zio.Reader) (*zbuf.Array, error)

Types

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(ctx context.Context, transform *Transform, service lakeapi.Interface) (*Pipeline, error)

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context) (int, error)

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func OpenPool

func OpenPool(ctx context.Context, poolName string, server lakeapi.Interface) (*Pool, error)

func (*Pool) LoadBatch

func (p *Pool) LoadBatch(ctx context.Context, zctx *zed.Context, batch *zbuf.Array) (ksuid.KSUID, error)

func (*Pool) NextProducerOffsets

func (p *Pool) NextProducerOffsets(ctx context.Context) (map[string]int64, error)

func (*Pool) Query

func (p *Pool) Query(ctx context.Context, src string) (*zbuf.Array, error)

type Route

type Route struct {
	Topic string `yaml:"topic"`
	Pool  string `yaml:"pool"`
}

type Routes

type Routes struct {
	// contains filtered or unexported fields
}

func (*Routes) InputsOf

func (r *Routes) InputsOf(output string) []string

func (*Routes) LookupPool

func (r *Routes) LookupPool(topic string) string

func (*Routes) Outputs

func (r *Routes) Outputs() []string

type Rule

type Rule struct {
	Type  string `yaml:"type"`
	In    string `yaml:"in"`
	Left  string `yaml:"left"`
	Right string `yaml:"right"`
	Join  string `yaml:"join-on"`
	Out   string `yaml:"out"`
	Where string `yaml:"where"`
	Zed   string `yaml:"zed"`
}

type Transform

type Transform struct {
	Inputs []Route `yaml:"inputs"`
	Output Route   `yaml:"output"`
	ETLs   []Rule  `yaml:"transforms"`
}

func Load

func Load(path string) (*Transform, error)

func (*Transform) Load

func (t *Transform) Load(path string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL