openlineage

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

README

OpenLineage SDK for Go GoDoc

This library contains an SDK for the OpenLineage project. Large parts are generated from the OpenLineage specification.

The root of the library contains the core OpenLineage types, facets are located in pkg/facets.

Installing

go get github.com/ThijsKoot/openlineage-go

Usage

runID := uuid.Must(uuid.NewV7())
facet := facets.NewProcessingEngine("v1.2.3").
    WithName("my-go-engine")

openlineage.NewRunEvent(openlineage.EventTypeStart, runID, "my-job").
    WithRunFacets(facet).
    Emit()
Configuring

All configuration is captured in openlineage.ClientConfig, which can be read from the following sources:

  • YAML configuration file
  • Environment variables
  • Variables set in code
// config which prints pretty-printed events to console
cfg := openlineage.ClientConfig{
	Transport: transport.Config{
		Type: transport.TransportTypeConsole,
		Console: &transport.ConsoleConfig{
			PrettyPrint: true,
		},
	},
}

// create a new client using this configuration
client, err := openlineage.NewClient(cfg)
File

See below for how to read a configuration file and its format.

config, err := openlineage.ConfigFromFile("my/openlineage/config.yaml")
# example file
namespace: default # default
disabled: false # default

transport:
  type: console
  console:
    prettyPrint: true

  http:
    url: https://foo
    endpoint: api/v1/lineage # default
    apiKey: ""
Environment

Use openlineage.ConfigFromEnv to read configuration values from the environment. If OPENLINEAGE_CONFIG is specified, it is processed first. Values from the environment are applied afterwards.

cfg, err := openlineage.ConfigFromEnv()

The table below contains an overview of all environment variables.

Variable Default Description
OPENLINEAGE_CONFIG Path to YAML-file containing configuration
OPENLINEAGE_TRANSPORT Transport to use. Can be: http, console
OPENLINEAGE_PRETTY_PRINT Pretty-print JSON events if using console transport
OPENLINEAGE_NAMESPACE default Namespace used for emitting events
OPENLINEAGE_ENDPOINT api/v1/lineage Endpoint on OPENLINEAGE_URL accepting events
OPENLINEAGE_API_KEY API key for HTTP transport, if required
OPENLINEAGE_URL URL for HTTP transport
OPENLINEAGE_DISABLED false Disable OpenLineage
Transport

The SDK supports pluggable transports via the transport.Transport interface.

type Transport interface {
	Emit(ctx context.Context, event any) error
}

The built-in transports are HTTP and Console. HTTP uses POST-requests to an endpoint, optionally secured with bearer authentication. Console prints JSON-formatted events to stdout.

Run API

The run package contains a tracing-like API modeled loosely after OpenTelemetry's. It is separate from the core functionality because of its opinionated design. The purpose of this package is to provide an ergonomic way of emitting events within code with less verbosity. It also allows for implicit passing of Runs with context.Context to avoid having to manually propagate a run context.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var DefaultClient, _ = NewClient(ClientConfig{
	Transport: transport.Config{
		Type: transport.TransportTypeConsole,
		Console: &transport.ConsoleConfig{
			PrettyPrint: true,
		},
	},
})
View Source
var DefaultNamespace = "default"

Functions

This section is empty.

Types

type BaseEvent

type BaseEvent struct {
	// the time the event occurred at
	EventTime string
	// URI identifying the producer of this metadata. For example this could be a git url with a; given tag or sha
	Producer string
	// The JSON Pointer (https://tools.ietf.org/html/rfc6901) URL to the corresponding version; of the schema definition for this RunEvent
	SchemaURL string
}

type Client

type Client struct {
	Namespace string
	// contains filtered or unexported fields
}
Example
cfg := openlineage.ClientConfig{
	Transport: transport.Config{
		Type: transport.TransportTypeConsole,
		Console: &transport.ConsoleConfig{
			PrettyPrint: true,
		},
	},
}

client, err := openlineage.NewClient(cfg)
if err != nil {
	slog.Error("ol.NewClient failed", "error", err)
}

ctx := context.Background()
runID := uuid.Must(uuid.NewV7())
event := openlineage.NewRunEvent(openlineage.EventTypeStart, runID, "foo-job")

if err := client.Emit(ctx, event); err != nil {
	slog.Error("emitting event failed", "error", err)
}
Output:

func NewClient

func NewClient(cfg ClientConfig) (*Client, error)

func (*Client) Emit

func (olc *Client) Emit(ctx context.Context, event Emittable) error

type ClientConfig

type ClientConfig struct {
	Transport transport.Config `yaml:"transport"`

	// Namespace for events. Defaults to "default"
	Namespace string `yaml:"namespace" env:"OPENLINEAGE_NAMESPACE, overwrite, default=default"`

	// When true, OpenLineage will not emit events (default: false)
	Disabled bool `yaml:"disabled" env:"OPENLINEAGE_DISABLED, overwrite"`
}

func ConfigFromEnv

func ConfigFromEnv() (ClientConfig, error)

ConfigFromEnv attempts to parse ClientConfig from the environment. If OPENLINEAGE_CONFIG_FILE is specified, it will be read first. Environment variables take precedence over values from the configuration file.

func ConfigFromFile

func ConfigFromFile(location string) (ClientConfig, error)

ConfigFromFile reads a configuration file in YAML-format from the specified location.

type Dataset

type Dataset struct {
	Facets    *facets.DatasetFacets `json:"facets,omitempty"` // The facets for this dataset
	Name      string                `json:"name"`             // The unique name for that dataset within that namespace
	Namespace string                `json:"namespace"`        // The namespace containing that dataset
}

A Dataset sent within static metadata events

func NewDataset

func NewDataset(name string, namespace string, datasetFacets ...facets.DatasetFacet) Dataset

type DatasetEvent

type DatasetEvent struct {
	Dataset Dataset

	BaseEvent
}

DatasetEvent represents an OpenLineage DatasetEvent.

func NewDatasetEvent

func NewDatasetEvent(
	name string,
	namespace string,
	facets ...facets.DatasetFacet,
) DatasetEvent

func (*DatasetEvent) AsEmittable

func (e *DatasetEvent) AsEmittable() Event

func (*DatasetEvent) Emit

func (e *DatasetEvent) Emit()

Emit calls Client.Emit on DefaultClient.

type Emittable

type Emittable interface {
	AsEmittable() Event
}

type Event

type Event struct {
	EventTime string          `json:"eventTime"`           // the time the event occurred at
	Producer  string          `json:"producer"`            // URI identifying the producer of this metadata. For example this could be a git url with a; given tag or sha
	SchemaURL string          `json:"schemaURL"`           // The JSON Pointer (https://tools.ietf.org/html/rfc6901) URL to the corresponding version; of the schema definition for this RunEvent
	EventType *EventType      `json:"eventType,omitempty"` // the current transition of the run state. It is required to issue 1 START event and 1 of [; COMPLETE, ABORT, FAIL ] event per run. Additional events with OTHER eventType can be; added to the same run. For example to send additional metadata after the run is complete
	Inputs    []InputElement  `json:"inputs,omitempty"`    // The set of **input** datasets.
	Job       *Job            `json:"job,omitempty"`
	Outputs   []OutputElement `json:"outputs,omitempty"` // The set of **output** datasets.
	Run       *Run            `json:"run,omitempty"`
	Dataset   *Dataset        `json:"dataset,omitempty"`
}

type EventType

type EventType string

the current transition of the run state. It is required to issue 1 START event and 1 of [ COMPLETE, ABORT, FAIL ] event per run. Additional events with OTHER eventType can be added to the same run. For example to send additional metadata after the run is complete

const (
	EventTypeAbort    EventType = "ABORT"
	EventTypeComplete EventType = "COMPLETE"
	EventTypeFail     EventType = "FAIL"
	EventTypeOther    EventType = "OTHER"
	EventTypeRunning  EventType = "RUNNING"
	EventTypeStart    EventType = "START"
)

type InputElement

type InputElement struct {
	Facets      *facets.DatasetFacets      `json:"facets,omitempty"`      // The facets for this dataset
	Name        string                     `json:"name"`                  // The unique name for that dataset within that namespace
	Namespace   string                     `json:"namespace"`             // The namespace containing that dataset
	InputFacets *facets.InputDatasetFacets `json:"inputFacets,omitempty"` // The input facets for this dataset.
}

An input dataset

func NewInputElement

func NewInputElement(name string, namespace string) InputElement

func (InputElement) WithFacets

func (ie InputElement) WithFacets(facets ...facets.DatasetFacet) InputElement

func (InputElement) WithInputFacets

func (ie InputElement) WithInputFacets(facets ...facets.InputDatasetFacet) InputElement

type Job

type Job struct {
	Facets    *facets.JobFacets `json:"facets,omitempty"` // The job facets.
	Name      string            `json:"name"`             // The unique name for that job within that namespace
	Namespace string            `json:"namespace"`        // The namespace containing that job
}

func NewJob

func NewJob(name string, jobFacets ...facets.JobFacet) Job

NewJob calls NewNamespacedJob with DefaultNamespace.

func NewNamespacedJob

func NewNamespacedJob(name string, namespace string, jobFacets ...facets.JobFacet) Job

NewNamespacedJob creates a new Job.

type JobEvent

type JobEvent struct {
	Job Job

	// The set of **input** datasets.
	Inputs []InputElement
	// The set of **output** datasets.
	Outputs []OutputElement

	BaseEvent
}

JobEvent represents an OpenLineage JobEvent.

func NewJobEvent

func NewJobEvent(name string) *JobEvent

NewJobEvent calls NewNamespacedJobEvent with DefaultNamespace.

func NewNamespacedJobEvent

func NewNamespacedJobEvent(name, namespace string) *JobEvent

func (*JobEvent) AsEmittable

func (e *JobEvent) AsEmittable() Event

func (*JobEvent) Emit

func (e *JobEvent) Emit()

Emit calls Client.Emit on DefaultClient.

func (*JobEvent) WithFacets

func (j *JobEvent) WithFacets(facets ...facets.JobFacet) *JobEvent

WithFacets sets the supplied instances of facets.JobFacet for this event.

func (*JobEvent) WithInputs

func (j *JobEvent) WithInputs(inputs ...InputElement) *JobEvent

WithInputs appends the supplied instances of InputElement to this event's inputs.

func (*JobEvent) WithOutputs

func (j *JobEvent) WithOutputs(inputs ...OutputElement) *JobEvent

WithOutputs appends the supplied instances of OutputElement to this event's outputs.

type OutputElement

type OutputElement struct {
	Facets       *facets.DatasetFacets       `json:"facets,omitempty"`       // The facets for this dataset
	Name         string                      `json:"name"`                   // The unique name for that dataset within that namespace
	Namespace    string                      `json:"namespace"`              // The namespace containing that dataset
	OutputFacets *facets.OutputDatasetFacets `json:"outputFacets,omitempty"` // The output facets for this dataset
}

An output dataset

func NewOutputElement

func NewOutputElement(name string, namespace string) OutputElement

func (OutputElement) WithFacets

func (oe OutputElement) WithFacets(facets ...facets.DatasetFacet) OutputElement

func (OutputElement) WithOutputFacets

func (oe OutputElement) WithOutputFacets(facets ...facets.OutputDatasetFacet) OutputElement

type Run

type Run struct {
	Facets *facets.RunFacets `json:"facets,omitempty"` // The run facets.
	RunID  string            `json:"runId"`            // The globally unique ID of the run associated with the job.
}

type RunEvent

type RunEvent struct {
	Run Run
	Job Job

	EventType EventType

	// The set of **input** datasets.
	Inputs []InputElement

	// The set of **output** datasets.
	Outputs []OutputElement

	BaseEvent
}

func NewNamespacedRunEvent

func NewNamespacedRunEvent(
	eventType EventType,
	runID uuid.UUID,
	jobName string,
	jobNamespace string,
) *RunEvent

NewNamespacedRunEvent creates a new RunEvent with EventTime set to time.Now.

func NewRunEvent

func NewRunEvent(eventType EventType, runID uuid.UUID, jobName string) *RunEvent

NewRunEvent calls NewNamespacedRunEvent with DefaultNamespace.

func (*RunEvent) AsEmittable

func (e *RunEvent) AsEmittable() Event

AsEmittable transforms this RunEvent into its emittable representation.

func (*RunEvent) Emit

func (e *RunEvent) Emit()

Emit calls Client.Emit on DefaultClient.

func (*RunEvent) WithInputs

func (r *RunEvent) WithInputs(inputs ...InputElement) *RunEvent

WithInputs appends the supplied instances of InputElement to this event's inputs.

func (*RunEvent) WithJobFacets

func (r *RunEvent) WithJobFacets(jobFacets ...facets.JobFacet) *RunEvent

WithJobFacets sets the supplied instances of facets.JobFacet for this RunEvent.

func (*RunEvent) WithOutputs

func (r *RunEvent) WithOutputs(outputs ...OutputElement) *RunEvent

WithOutputs appends the supplied instances of OutputElement to this event's outputs.

func (*RunEvent) WithParent

func (r *RunEvent) WithParent(parentID uuid.UUID, jobName, namespace string) *RunEvent

WithParent configures facets.Parent for this RunEvent.

func (*RunEvent) WithRunFacets

func (r *RunEvent) WithRunFacets(runFacets ...facets.RunFacet) *RunEvent

WithRunFacets sets the supplied facets.RunFacet for this RunEvent.

Directories

Path Synopsis
internal
pkg
run

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL