run

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContextWithRun

func ContextWithRun(parent context.Context, run Run) context.Context

ContextWithRun returns a copy of the Context with the Run saved.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(client *openlineage.Client) *Client

func (*Client) Emit

func (c *Client) Emit(ctx context.Context, event openlineage.Emittable) error

func (*Client) ExistingRun

func (c *Client) ExistingRun(ctx context.Context, job string, runID uuid.UUID) (context.Context, Run)

ExistingRun recreates a Run for a given job and ID. The resulting Run is stored in ctx using ContextWithRun.

func (*Client) NewRun

func (c *Client) NewRun(ctx context.Context, job string) (context.Context, Run)

NewRun creates a Run. If ctx already contains a RunContext, it set as the parent. The resulting Run is stored in ctx using ContextWithRun.

func (*Client) StartRun

func (c *Client) StartRun(ctx context.Context, job string) (context.Context, Run)

StartRun calls NewRun and emits a START event. For details, see NewRun.

type Run

type Run interface {
	// Parent returns the parent of this run, if any.
	Parent() Run

	// RunID returns the ID for this Run.
	RunID() uuid.UUID

	// JobName returns the name for this Run's job.
	JobName() string

	// JobNamespace returns the namespace for this Run's job.
	JobNamespace() string

	// NewChild creates a new Run with the current Run set as its parent
	NewChild(ctx context.Context, jobName string) (context.Context, Run)

	// StartChild calls NewChild and emits a START event
	StartChild(ctx context.Context, jobName string) (context.Context, Run)

	// NewEvent creates a new Event of the provided EventType
	NewEvent(openlineage.EventType) *openlineage.RunEvent

	// Finish will emit a COMPLETE event if no error has occurred.
	// Otherwise, it will emit a FAIL event.
	Finish()

	// Returns true if RecordError was called for this Run.
	HasFailed() bool

	// RecordError emits an OTHER event with an ErrorMessage facet.
	// Once this is called, the run is considered to have failed.
	RecordError(error)

	// RecordRunFacets emits an OTHER event with the supplied RunFacets
	RecordRunFacets(...facets.RunFacet)

	// RecordJobFacets emits an OTHER event with the supplied JobFacets
	RecordJobFacets(...facets.JobFacet)

	// RecordInputs emits an OTHER event with the supplied InputElements
	RecordInputs(...openlineage.InputElement)

	// RecordOutputs emits an OTHER event with the supplied OutputElements
	RecordOutputs(...openlineage.OutputElement)
}

Run is an instrumentation utility that allows for more ergonomic usage of the SDK. It is loosely modeled after the OpenTelemetry Span/Trace APIs.

Example
package main

import (
	"context"
	"errors"
	"log/slog"

	ol "github.com/ThijsKoot/openlineage-go"
	"github.com/ThijsKoot/openlineage-go/pkg/run"
	"github.com/ThijsKoot/openlineage-go/pkg/transport"
)

func main() {
	ctx := context.Background()

	cfg := ol.ClientConfig{
		Transport: transport.Config{
			Type: transport.TransportTypeConsole,
			Console: &transport.ConsoleConfig{
				PrettyPrint: true,
			},
		},
	}
	olClient, err := ol.NewClient(cfg)
	if err != nil {
		slog.Error("ol.NewClient failed", "error", err)
	}

	runClient := run.NewClient(olClient)

	ctx, run := runClient.StartRun(ctx, "ingest")
	defer run.Finish()

	if err := ChildFunction(ctx); err != nil {
		run.RecordError(err)

		slog.Warn("child function failed", "error", err)
	}

}

func ChildFunction(ctx context.Context) error {
	parent := run.FromContext(ctx)
	_, childRun := parent.StartChild(ctx, "child")
	defer childRun.Finish()

	if err := DoWork(); err != nil {
		// Record the error in this run.
		// Finish() will emit a FAIL event.
		childRun.RecordError(err)

		return err
	}

	return nil
}

func DoWork() error {
	return errors.New("did not do work")
}
Output:

func Existing

func Existing(ctx context.Context, job string, runID uuid.UUID) (context.Context, Run)

Existing calls Client.ExistingRun using openlineage.DefaultClient.

func FromContext

func FromContext(ctx context.Context) Run

FromContext extracts the current Run from a context.

If no Run is currently set in ctx an implementation of a Run that performs no operations is returned.

func New

func New(ctx context.Context, job string) (context.Context, Run)

New calls [Client.New] using openlineage.DefaultClient.

func Start

func Start(ctx context.Context, job string) (context.Context, Run)

Start calls [Client.Start] using openlineage.DefaultClient.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL