dagsterpipes

package module
v0.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2024 License: MIT Imports: 12 Imported by: 0

README

dagster-pipes-go

Build Status Go Reference goreportcard

dagster-pipes-go is a Go library for integrating with Dagster Pipes, enabling seamless communication between external processes and Dagster workflows. It provides a set of utilities for managing context, logging, reporting asset materializations, and more.

Features

  • Context Management: Open and manage Pipes contexts.
  • Message Handling: Send and receive messages in a structured format.
  • Asset Reporting: Report asset materializations and checks.
  • Custom Messaging: Send custom messages for advanced use cases.
  • Error Handling: Report exceptions gracefully.

Installation

To install the library, use go get:

go get github.com/hupe1980/dagster-pipes-go

How to use

package main

import (
	"log"

	dagsterpipes "github.com/hupe1980/dagster-pipes-go"
)

func main() {
	session, err := dagsterpipes.New[map[string]any]()
	if err != nil {
		log.Fatalf("Error creating dagster pipes session: %v", err)
	}
	defer session.Close()

	if err := session.Run(func(context *dagsterpipes.Context[map[string]any]) error {
		if err := context.ReportAssetMaterialization(&dagsterpipes.AssetMaterialization{
			AssetKey:    "asset",
			DataVersion: "1.0",
			Metadata: map[string]any{
				"foo": "bar",
			},
		}); err != nil {
			return err
		}

		return nil
	}); err != nil {
		log.Fatalf("Error running dagster pipes session: %v", err)
	}
}

Contributing

Contributions are welcome! If you find bugs or want to suggest features, please open an issue or submit a pull request.

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Overview

Package dagsterpipes provides tools and utilities for communication and interaction within Dagster pipelines. This package defines core components for handling contexts, messages, and protocol operations to enable seamless integration with Dagster's asset and pipeline infrastructure.

Index

Constants

View Source
const (
	MetadataTypeInfer = "__infer__"
)

Constants for metadata type inference.

View Source
const (
	ProtocolVersion = "0.1"
)

ProtocolVersion defines the current version of the Dagster Pipes protocol. This version is used to ensure compatibility between the client implementation and the Dagster system.

Variables

This section is empty.

Functions

This section is empty.

Types

type AssetCheck

type AssetCheck struct {
	AssetKey  string             // The key of the asset being checked.
	CheckName string             // The name of the check being performed.
	Passed    bool               // Whether the check passed or failed.
	Serverity AssetCheckSeverity // The severity of the check result.
	Metadata  map[string]any     // Metadata associated with the asset check.
}

AssetCheck represents an asset check event.

func (AssetCheck) MarshalJSON

func (a AssetCheck) MarshalJSON() ([]byte, error)

MarshalJSON serializes AssetCheck into JSON, normalizing metadata.

type AssetCheckSeverity

type AssetCheckSeverity string

AssetCheckSeverity represents the severity levels for asset checks.

const (
	// AssetCheckSeverityWarn indicates a warning severity level.
	AssetCheckSeverityWarn AssetCheckSeverity = "WARN"

	// AssetCheckSeverityError indicates an error severity level.
	AssetCheckSeverityError AssetCheckSeverity = "ERROR"
)

type AssetMaterialization

type AssetMaterialization struct {
	AssetKey    string         // The unique key of the asset being materialized.
	DataVersion string         // The version of the asset data.
	Metadata    map[string]any // Metadata associated with the asset materialization.
}

AssetMaterialization represents an asset materialization event.

func (AssetMaterialization) MarshalJSON

func (a AssetMaterialization) MarshalJSON() ([]byte, error)

MarshalJSON serializes AssetMaterialization into JSON, normalizing metadata.

type Closed

type Closed struct {
	Exception *Exception `json:"exception,omitempty"` // An optional exception if the context closed with an error.
}

Closed represents the parameters for the "closed" method.

type Context

type Context[T any] struct {
	// contains filtered or unexported fields
}

Context represents a Dagster Pipes execution context.

func NewContext added in v0.0.2

func NewContext[T any](optFns ...func(o *Options[T])) (*Context[T], error)

NewContext initializes a new Context using the provided configuration functions. Validates the Dagster Pipes process and sets up context, messages, and communication channels.

func (*Context[T]) AssetKeys added in v0.0.4

func (c *Context[T]) AssetKeys() []string

AssetKeys retrieves the list of asset keys associated with the context.

func (*Context[T]) Close

func (c *Context[T]) Close() error

Close closes the context and sends a "closed" message. Ensures the context cannot be used after it is closed.

func (*Context[T]) Extras added in v0.0.4

func (c *Context[T]) Extras() T

Extras retrieves additional data associated with the context.

func (*Context[T]) IsClosed

func (c *Context[T]) IsClosed() bool

IsClosed checks if the context is closed.

func (*Context[T]) LogDebug added in v0.0.6

func (c *Context[T]) LogDebug(message string) error

LogDebug logs a debug-level message using the context's logger. Also sends the log message to the message channel.

func (*Context[T]) LogError added in v0.0.6

func (c *Context[T]) LogError(message string) error

LogError logs an error-level message using the context's logger. Also sends the log message to the message channel.

func (*Context[T]) LogInfo added in v0.0.6

func (c *Context[T]) LogInfo(message string) error

LogInfo logs an informational message using the context's logger. Also sends the log message to the message channel.

func (*Context[T]) LogWarn added in v0.0.6

func (c *Context[T]) LogWarn(message string) error

LogWarn logs a warning-level message using the context's logger. Also sends the log message to the message channel.

func (*Context[T]) ReportAssetCheck

func (c *Context[T]) ReportAssetCheck(check *AssetCheck) error

ReportAssetCheck sends a report for an asset check event.

func (*Context[T]) ReportAssetMaterialization

func (c *Context[T]) ReportAssetMaterialization(materialization *AssetMaterialization) error

ReportAssetMaterialization reports an asset materialization event. Ensures duplicate materializations for the same asset key are prevented.

func (*Context[T]) ReportCustomMessage

func (c *Context[T]) ReportCustomMessage(msg *CustomMessage) error

ReportCustomMessage sends a custom message through the context.

func (*Context[T]) ReportException

func (c *Context[T]) ReportException(err error) error

ReportException records an exception in the context for later reporting.

func (*Context[T]) RunID

func (c *Context[T]) RunID() string

RunID retrieves the run identifier from the context data.

type ContextData

type ContextData[T any] struct {
	AssetKeys []string `json:"asset_keys"` // List of asset keys related to the current context.
	RunID     string   `json:"run_id"`     // Unique identifier for the current Dagster run.
	Extras    T        `json:"extras"`     // Additional context-specific metadata.
}

ContextData represents the runtime context for a Dagster Pipes process, including information about asset keys, the run ID, and any additional metadata.

func (*ContextData[T]) HasAssetKeys

func (d *ContextData[T]) HasAssetKeys() bool

HasAssetKeys checks if any asset keys are defined in the context.

func (*ContextData[T]) IsMultiAsset

func (d *ContextData[T]) IsMultiAsset() bool

IsMultiAsset checks if multiple asset keys are defined in the context.

type ContextLoader

type ContextLoader[T any] interface {
	LoadContext(params *ContextParams[T]) (*ContextData[T], error) // Load context data from the provided parameters.
}

ContextLoader defines an interface for loading a Dagster Pipes context.

type ContextParams

type ContextParams[T any] struct {
	Data   *ContextData[T] `json:"data"`   // Context data provided inline.
	Path   string          `json:"path"`   // File path to load the context data from.
	Extras map[string]any  `json:"extras"` // Additional parameters.
}

ContextParams represents the parameters used to load a Dagster Pipes context.

type CustomMessage

type CustomMessage struct {
	Payload any `json:"payload"` // The payload of the custom message.
}

CustomMessage represents a custom message to be sent.

type DefaultContextLoader

type DefaultContextLoader[T any] struct{}

DefaultContextLoader is a default implementation of ContextLoader.

func (*DefaultContextLoader[T]) LoadContext

func (l *DefaultContextLoader[T]) LoadContext(params *ContextParams[T]) (*ContextData[T], error)

LoadContext loads context data from a file or inline data in the parameters.

type DefaultMessageWriter

type DefaultMessageWriter struct{}

DefaultMessageWriter is the default implementation of the MessageWriter interface. It supports file-based message channels and serves as a fallback for unrecognized types.

func (*DefaultMessageWriter) Open

Open initializes a file-based MessageChannel if a valid path is provided in the parameters. Returns the created MessageChannel or an error if the path is missing or unsupported.

func (*DefaultMessageWriter) OpenedExtras added in v0.0.3

func (mw *DefaultMessageWriter) OpenedExtras() map[string]any

OpenedExtras provides additional metadata for the opened message channel. In the default implementation, this method returns an empty map.

type EnvVarParamsLoader

type EnvVarParamsLoader[T any] struct{}

EnvVarParamsLoader implements the ParamsLoader interface using environment variables.

func (*EnvVarParamsLoader[T]) IsDagsterPipesProcess

func (l *EnvVarParamsLoader[T]) IsDagsterPipesProcess() bool

IsDagsterPipesProcess checks if the `DAGSTER_PIPES_CONTEXT` environment variable is set.

func (*EnvVarParamsLoader[T]) LoadContextParams

func (l *EnvVarParamsLoader[T]) LoadContextParams() (*ContextParams[T], error)

LoadContextParams loads context parameters from the environment variable `DAGSTER_PIPES_CONTEXT`.

func (*EnvVarParamsLoader[T]) LoadMessagesParams

func (l *EnvVarParamsLoader[T]) LoadMessagesParams() (*MessagesParams, error)

LoadMessagesParams loads messaging parameters from the environment variable `DAGSTER_PIPES_MESSAGES`.

type Exception

type Exception struct {
	Name    string     `json:"name"`
	Message string     `json:"message"`
	Cause   *Exception `json:"cause"`
	Stack   []string   `json:"stack"`
	Context []string   `json:"context"`
}

Exception represents a structured error with detailed context.

func NewException

func NewException(err error, includeStackTrace bool) *Exception

NewException creates a new Exception from a given error. If includeStackTrace is true, it captures the stack trace.

type FileMessageWriterChannel

type FileMessageWriterChannel struct {
	// contains filtered or unexported fields
}

FileMessageWriterChannel implements the MessageChannel interface. It writes messages to a specified file.

func NewFileMessageWriterChannel

func NewFileMessageWriterChannel(path string) (*FileMessageWriterChannel, error)

NewFileMessageWriterChannel creates a new FileMessageWriterChannel. The provided path specifies the file location for message writing.

func (*FileMessageWriterChannel) Close added in v0.0.8

func (f *FileMessageWriterChannel) Close() error

Close closes the underlying file handle. Should be called to release resources.

func (*FileMessageWriterChannel) WriteMessage

func (f *FileMessageWriterChannel) WriteMessage(message Message) error

WriteMessage writes a Message to the file specified in the channel's Path. If the file does not exist, it creates it. Messages are appended to the file, with each message serialized as a JSON object followed by a newline.

type Log

type Log struct {
	Message string `json:"message"` // The log message.
	Level   string `json:"level"`   // The log level (e.g., DEBUG, INFO).
}

Log represents the parameters for the "log" method.

type Message

type Message struct {
	DagsterPipesVersion string `json:"__dagster_pipes_version"` // The version of Dagster Pipes protocol.
	Method              Method `json:"method"`                  // The communication method.
	Params              any    `json:"params"`                  // The parameters of the message.
}

Message represents a communication message in Dagster Pipes.

type MessageChannel

type MessageChannel interface {
	// WriteMessage writes a Message to the underlying channel.
	WriteMessage(message Message) error
	// Close closes the underlying channel and releases any associated resources.
	// After calling Close, the channel cannot be used for further writes.
	Close() error
}

MessageChannel represents an interface for writing messages.

type MessageWriter

type MessageWriter interface {
	// Open initializes a MessageChannel using the provided parameters.
	// Returns the created MessageChannel or an error if initialization fails.
	Open(params *MessagesParams) (MessageChannel, error)

	// OpenedExtras retrieves any additional metadata or information
	// associated with the opened message channel.
	OpenedExtras() map[string]any
}

MessageWriter defines an interface for creating and managing message channels. It includes methods for opening a message channel and retrieving additional metadata about the opened channel.

type MessagesParams

type MessagesParams struct {
	Stdio string `json:"stdio"` // Configuration for standard I/O messaging.
	Path  string `json:"path"`  // File path for message exchange.
}

MessagesParams represents parameters for managing messages between Dagster Pipes processes.

type MetadataValue

type MetadataValue struct {
	RawValue any    `json:"raw_value"` // The raw value of the metadata.
	Type     string `json:"type"`      // The type of the metadata.
}

MetadataValue represents a metadata entry with a type and raw value.

type Method

type Method string

Method represents different types of communication methods.

const (
	// MethodClosed indicates that the context is closed.
	MethodClosed Method = "closed"

	// MethodLog represents a log message.
	MethodLog Method = "log"

	// MethodOpened indicates that the context is opened.
	MethodOpened Method = "opened"

	// MethodReportAssetMaterialization reports an asset materialization event.
	MethodReportAssetMaterialization Method = "report_asset_materialization"

	// MethodReportAssetCheck reports an asset check event.
	MethodReportAssetCheck Method = "report_asset_check"

	// MethodReportCustomMessage sends a custom message.
	MethodReportCustomMessage Method = "report_custom_message"
)

type Opened

type Opened[T any] struct {
	Extras T `json:"extras"` // Additional metadata for the opened method.
}

Opened represents the parameters for the "opened" method.

type Options

type Options[T any] struct {
	ParamsLoader  ParamsLoader[T]  // Loader for context parameters.
	ContextLoader ContextLoader[T] // Loader for the execution context.
	MessageWriter MessageWriter    // Writer for communication messages.
	Logger        *slog.Logger     // Logger instance for logging messages.
}

Options defines configuration options for creating a new Context.

type ParamsLoader

type ParamsLoader[T any] interface {
	LoadContextParams() (*ContextParams[T], error) // Load parameters for the Dagster context.
	LoadMessagesParams() (*MessagesParams, error)  // Load parameters for messaging configuration.
	IsDagsterPipesProcess() bool                   // Check if the current process is a Dagster Pipes process.
}

ParamsLoader defines an interface for loading context and messaging parameters.

type RunFunc

type RunFunc[T any] func(context *Context[T]) error

RunFunc defines a function type that processes a Dagster Pipes context. It should return an error if any issues occur during execution.

type Session

type Session[T any] struct {
	// contains filtered or unexported fields
}

Session represents a managed session for interacting with a Dagster Pipes context. It ensures proper handling of the context lifecycle.

func New

func New[T any](optFns ...func(o *Options[T])) (*Session[T], error)

New initializes a new Session by creating and opening a Dagster Pipes context. It accepts a variadic list of option functions to configure the context. Returns a pointer to the created Session or an error if the context fails to open.

func (*Session[T]) Close

func (s *Session[T]) Close() error

Close finalizes the session by closing the associated context. Returns an error if the context fails to close.

func (*Session[T]) Context

func (s *Session[T]) Context() *Context[T]

Context retrieves the underlying context associated with the session.

func (*Session[T]) Run

func (s *Session[T]) Run(fn RunFunc[T]) error

Run executes the provided RunFunc with the session's context. If the RunFunc encounters an error, the error is reported using the context's ReportException method. If reporting the error also fails, that error is returned. Otherwise, nil is returned.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL