Documentation
¶
Index ¶
- Variables
- type BaseEvent
- type Client
- type ClientConfig
- type Dataset
- type DatasetEvent
- type Emittable
- type Event
- type EventType
- type InputElement
- type Job
- type JobEvent
- type OutputElement
- type Run
- type RunEvent
- func (e *RunEvent) AsEmittable() Event
- func (e *RunEvent) Emit()
- func (r *RunEvent) WithInputs(inputs ...InputElement) *RunEvent
- func (r *RunEvent) WithJobFacets(jobFacets ...facets.JobFacet) *RunEvent
- func (r *RunEvent) WithOutputs(outputs ...OutputElement) *RunEvent
- func (r *RunEvent) WithParent(parentID uuid.UUID, jobName, namespace string) *RunEvent
- func (r *RunEvent) WithRunFacets(runFacets ...facets.RunFacet) *RunEvent
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultClient, _ = NewClient(ClientConfig{
Transport: transport.Config{
Type: transport.TransportTypeConsole,
Console: &transport.ConsoleConfig{
PrettyPrint: true,
},
},
})
var DefaultNamespace = "default"
Functions ¶
This section is empty.
Types ¶
type BaseEvent ¶
type BaseEvent struct { // the time the event occurred at EventTime string // URI identifying the producer of this metadata. For example this could be a git url with a; given tag or sha Producer string // The JSON Pointer (https://tools.ietf.org/html/rfc6901) URL to the corresponding version; of the schema definition for this RunEvent SchemaURL string }
type Client ¶
type Client struct { Namespace string // contains filtered or unexported fields }
Example ¶
cfg := openlineage.ClientConfig{ Transport: transport.Config{ Type: transport.TransportTypeConsole, Console: &transport.ConsoleConfig{ PrettyPrint: true, }, }, } client, err := openlineage.NewClient(cfg) if err != nil { slog.Error("ol.NewClient failed", "error", err) } ctx := context.Background() runID := uuid.Must(uuid.NewV7()) event := openlineage.NewRunEvent(openlineage.EventTypeStart, runID, "foo-job") if err := client.Emit(ctx, event); err != nil { slog.Error("emitting event failed", "error", err) }
Output:
func NewClient ¶
func NewClient(cfg ClientConfig) (*Client, error)
type ClientConfig ¶
type ClientConfig struct { Transport transport.Config `yaml:"transport"` // Namespace for events. Defaults to "default" Namespace string `yaml:"namespace" env:"OPENLINEAGE_NAMESPACE, overwrite, default=default"` // When true, OpenLineage will not emit events (default: false) Disabled bool `yaml:"disabled" env:"OPENLINEAGE_DISABLED, overwrite"` }
func ConfigFromEnv ¶
func ConfigFromEnv() (ClientConfig, error)
ConfigFromEnv attempts to parse ClientConfig from the environment. If OPENLINEAGE_CONFIG_FILE is specified, it will be read first. Environment variables take precedence over values from the configuration file.
func ConfigFromFile ¶
func ConfigFromFile(location string) (ClientConfig, error)
ConfigFromFile reads a configuration file in YAML-format from the specified location.
type Dataset ¶
type Dataset struct { Facets *facets.DatasetFacets `json:"facets,omitempty"` // The facets for this dataset Name string `json:"name"` // The unique name for that dataset within that namespace Namespace string `json:"namespace"` // The namespace containing that dataset }
A Dataset sent within static metadata events
func NewDataset ¶
func NewDataset(name string, namespace string, datasetFacets ...facets.DatasetFacet) Dataset
type DatasetEvent ¶
DatasetEvent represents an OpenLineage DatasetEvent.
func NewDatasetEvent ¶
func NewDatasetEvent( name string, namespace string, facets ...facets.DatasetFacet, ) DatasetEvent
func (*DatasetEvent) AsEmittable ¶
func (e *DatasetEvent) AsEmittable() Event
type Event ¶
type Event struct { EventTime string `json:"eventTime"` // the time the event occurred at Producer string `json:"producer"` // URI identifying the producer of this metadata. For example this could be a git url with a; given tag or sha SchemaURL string `json:"schemaURL"` // The JSON Pointer (https://tools.ietf.org/html/rfc6901) URL to the corresponding version; of the schema definition for this RunEvent EventType *EventType `json:"eventType,omitempty"` // the current transition of the run state. It is required to issue 1 START event and 1 of [; COMPLETE, ABORT, FAIL ] event per run. Additional events with OTHER eventType can be; added to the same run. For example to send additional metadata after the run is complete Inputs []InputElement `json:"inputs,omitempty"` // The set of **input** datasets. Job *Job `json:"job,omitempty"` Outputs []OutputElement `json:"outputs,omitempty"` // The set of **output** datasets. Run *Run `json:"run,omitempty"` Dataset *Dataset `json:"dataset,omitempty"` }
type EventType ¶
type EventType string
the current transition of the run state. It is required to issue 1 START event and 1 of [ COMPLETE, ABORT, FAIL ] event per run. Additional events with OTHER eventType can be added to the same run. For example to send additional metadata after the run is complete
type InputElement ¶
type InputElement struct { Facets *facets.DatasetFacets `json:"facets,omitempty"` // The facets for this dataset Name string `json:"name"` // The unique name for that dataset within that namespace Namespace string `json:"namespace"` // The namespace containing that dataset InputFacets *facets.InputDatasetFacets `json:"inputFacets,omitempty"` // The input facets for this dataset. }
An input dataset
func NewInputElement ¶
func NewInputElement(name string, namespace string) InputElement
func (InputElement) WithFacets ¶
func (ie InputElement) WithFacets(facets ...facets.DatasetFacet) InputElement
func (InputElement) WithInputFacets ¶
func (ie InputElement) WithInputFacets(facets ...facets.InputDatasetFacet) InputElement
type Job ¶
type Job struct { Facets *facets.JobFacets `json:"facets,omitempty"` // The job facets. Name string `json:"name"` // The unique name for that job within that namespace Namespace string `json:"namespace"` // The namespace containing that job }
func NewJob ¶
NewJob calls NewNamespacedJob with DefaultNamespace.
type JobEvent ¶
type JobEvent struct { Job Job // The set of **input** datasets. Inputs []InputElement // The set of **output** datasets. Outputs []OutputElement BaseEvent }
JobEvent represents an OpenLineage JobEvent.
func NewJobEvent ¶
NewJobEvent calls NewNamespacedJobEvent with DefaultNamespace.
func NewNamespacedJobEvent ¶
func (*JobEvent) AsEmittable ¶
func (*JobEvent) WithFacets ¶
WithFacets sets the supplied instances of facets.JobFacet for this event.
func (*JobEvent) WithInputs ¶
func (j *JobEvent) WithInputs(inputs ...InputElement) *JobEvent
WithInputs appends the supplied instances of InputElement to this event's inputs.
func (*JobEvent) WithOutputs ¶
func (j *JobEvent) WithOutputs(inputs ...OutputElement) *JobEvent
WithOutputs appends the supplied instances of OutputElement to this event's outputs.
type OutputElement ¶
type OutputElement struct { Facets *facets.DatasetFacets `json:"facets,omitempty"` // The facets for this dataset Name string `json:"name"` // The unique name for that dataset within that namespace Namespace string `json:"namespace"` // The namespace containing that dataset OutputFacets *facets.OutputDatasetFacets `json:"outputFacets,omitempty"` // The output facets for this dataset }
An output dataset
func NewOutputElement ¶
func NewOutputElement(name string, namespace string) OutputElement
func (OutputElement) WithFacets ¶
func (oe OutputElement) WithFacets(facets ...facets.DatasetFacet) OutputElement
func (OutputElement) WithOutputFacets ¶
func (oe OutputElement) WithOutputFacets(facets ...facets.OutputDatasetFacet) OutputElement
type RunEvent ¶
type RunEvent struct { Run Run Job Job EventType EventType // The set of **input** datasets. Inputs []InputElement // The set of **output** datasets. Outputs []OutputElement BaseEvent }
func NewNamespacedRunEvent ¶
func NewNamespacedRunEvent( eventType EventType, runID uuid.UUID, jobName string, jobNamespace string, ) *RunEvent
NewNamespacedRunEvent creates a new RunEvent with EventTime set to time.Now.
func NewRunEvent ¶
NewRunEvent calls NewNamespacedRunEvent with DefaultNamespace.
func (*RunEvent) AsEmittable ¶
AsEmittable transforms this RunEvent into its emittable representation.
func (*RunEvent) WithInputs ¶
func (r *RunEvent) WithInputs(inputs ...InputElement) *RunEvent
WithInputs appends the supplied instances of InputElement to this event's inputs.
func (*RunEvent) WithJobFacets ¶
WithJobFacets sets the supplied instances of facets.JobFacet for this RunEvent.
func (*RunEvent) WithOutputs ¶
func (r *RunEvent) WithOutputs(outputs ...OutputElement) *RunEvent
WithOutputs appends the supplied instances of OutputElement to this event's outputs.
func (*RunEvent) WithParent ¶
WithParent configures facets.Parent for this RunEvent.
func (*RunEvent) WithRunFacets ¶
WithRunFacets sets the supplied facets.RunFacet for this RunEvent.