Documentation
¶
Index ¶
- func ContextWithRun(parent context.Context, run Run) context.Context
- type Client
- func (c *Client) Emit(ctx context.Context, event openlineage.Emittable) error
- func (c *Client) ExistingRun(ctx context.Context, job string, runID uuid.UUID) (context.Context, Run)
- func (c *Client) NewRun(ctx context.Context, job string) (context.Context, Run)
- func (c *Client) StartRun(ctx context.Context, job string) (context.Context, Run)
- type Run
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) ExistingRun ¶
func (c *Client) ExistingRun(ctx context.Context, job string, runID uuid.UUID) (context.Context, Run)
ExistingRun recreates a Run for a given job and ID. The resulting Run is stored in ctx using ContextWithRun.
func (*Client) NewRun ¶
NewRun creates a Run. If ctx already contains a RunContext, it set as the parent. The resulting Run is stored in ctx using ContextWithRun.
type Run ¶
type Run interface { // Parent returns the parent of this run, if any. Parent() Run // RunID returns the ID for this Run. RunID() uuid.UUID // JobName returns the name for this Run's job. JobName() string // JobNamespace returns the namespace for this Run's job. JobNamespace() string // NewChild creates a new Run with the current Run set as its parent NewChild(ctx context.Context, jobName string) (context.Context, Run) // StartChild calls NewChild and emits a START event StartChild(ctx context.Context, jobName string) (context.Context, Run) // NewEvent creates a new Event of the provided EventType NewEvent(openlineage.EventType) *openlineage.RunEvent // Finish will emit a COMPLETE event if no error has occurred. // Otherwise, it will emit a FAIL event. Finish() // Returns true if RecordError was called for this Run. HasFailed() bool // RecordError emits an OTHER event with an ErrorMessage facet. // Once this is called, the run is considered to have failed. RecordError(error) // RecordRunFacets emits an OTHER event with the supplied RunFacets RecordRunFacets(...facets.RunFacet) // RecordJobFacets emits an OTHER event with the supplied JobFacets RecordJobFacets(...facets.JobFacet) // RecordInputs emits an OTHER event with the supplied InputElements RecordInputs(...openlineage.InputElement) // RecordOutputs emits an OTHER event with the supplied OutputElements RecordOutputs(...openlineage.OutputElement) }
Run is an instrumentation utility that allows for more ergonomic usage of the SDK. It is loosely modeled after the OpenTelemetry Span/Trace APIs.
Example ¶
package main import ( "context" "errors" "log/slog" ol "github.com/ThijsKoot/openlineage-go" "github.com/ThijsKoot/openlineage-go/pkg/run" "github.com/ThijsKoot/openlineage-go/pkg/transport" ) func main() { ctx := context.Background() cfg := ol.ClientConfig{ Transport: transport.Config{ Type: transport.TransportTypeConsole, Console: &transport.ConsoleConfig{ PrettyPrint: true, }, }, } olClient, err := ol.NewClient(cfg) if err != nil { slog.Error("ol.NewClient failed", "error", err) } runClient := run.NewClient(olClient) ctx, run := runClient.StartRun(ctx, "ingest") defer run.Finish() if err := ChildFunction(ctx); err != nil { run.RecordError(err) slog.Warn("child function failed", "error", err) } } func ChildFunction(ctx context.Context) error { parent := run.FromContext(ctx) _, childRun := parent.StartChild(ctx, "child") defer childRun.Finish() if err := DoWork(); err != nil { // Record the error in this run. // Finish() will emit a FAIL event. childRun.RecordError(err) return err } return nil } func DoWork() error { return errors.New("did not do work") }
Output:
func Existing ¶
Existing calls Client.ExistingRun using openlineage.DefaultClient.
func FromContext ¶
FromContext extracts the current Run from a context.
If no Run is currently set in ctx an implementation of a Run that performs no operations is returned.
func New ¶
New calls [Client.New] using openlineage.DefaultClient.