Documentation
¶
Overview ¶
Package dagsterpipes provides tools and utilities for communication and interaction within Dagster pipelines. This package defines core components for handling contexts, messages, and protocol operations to enable seamless integration with Dagster's asset and pipeline infrastructure.
Index ¶
- Constants
- type AssetCheck
- type AssetCheckSeverity
- type AssetMaterialization
- type Closed
- type Context
- func (c *Context[T]) AssetKeys() []string
- func (c *Context[T]) Close() error
- func (c *Context[T]) Extras() T
- func (c *Context[T]) IsClosed() bool
- func (c *Context[T]) LogDebug(message string) error
- func (c *Context[T]) LogError(message string) error
- func (c *Context[T]) LogInfo(message string) error
- func (c *Context[T]) LogWarn(message string) error
- func (c *Context[T]) ReportAssetCheck(check *AssetCheck) error
- func (c *Context[T]) ReportAssetMaterialization(materialization *AssetMaterialization) error
- func (c *Context[T]) ReportCustomMessage(msg *CustomMessage) error
- func (c *Context[T]) ReportException(err error) error
- func (c *Context[T]) RunID() string
- type ContextData
- type ContextLoader
- type ContextParams
- type CustomMessage
- type DefaultContextLoader
- type DefaultMessageWriter
- type EnvVarParamsLoader
- type Exception
- type FileMessageWriterChannel
- type Log
- type Message
- type MessageChannel
- type MessageWriter
- type MessagesParams
- type MetadataValue
- type Method
- type Opened
- type Options
- type ParamsLoader
- type RunFunc
- type Session
Constants ¶
const (
MetadataTypeInfer = "__infer__"
)
Constants for metadata type inference.
const (
ProtocolVersion = "0.1"
)
ProtocolVersion defines the current version of the Dagster Pipes protocol. This version is used to ensure compatibility between the client implementation and the Dagster system.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AssetCheck ¶
type AssetCheck struct { AssetKey string // The key of the asset being checked. CheckName string // The name of the check being performed. Passed bool // Whether the check passed or failed. Serverity AssetCheckSeverity // The severity of the check result. Metadata map[string]any // Metadata associated with the asset check. }
AssetCheck represents an asset check event.
func (AssetCheck) MarshalJSON ¶
func (a AssetCheck) MarshalJSON() ([]byte, error)
MarshalJSON serializes AssetCheck into JSON, normalizing metadata.
type AssetCheckSeverity ¶
type AssetCheckSeverity string
AssetCheckSeverity represents the severity levels for asset checks.
const ( // AssetCheckSeverityWarn indicates a warning severity level. AssetCheckSeverityWarn AssetCheckSeverity = "WARN" // AssetCheckSeverityError indicates an error severity level. AssetCheckSeverityError AssetCheckSeverity = "ERROR" )
type AssetMaterialization ¶
type AssetMaterialization struct { AssetKey string // The unique key of the asset being materialized. DataVersion string // The version of the asset data. Metadata map[string]any // Metadata associated with the asset materialization. }
AssetMaterialization represents an asset materialization event.
func (AssetMaterialization) MarshalJSON ¶
func (a AssetMaterialization) MarshalJSON() ([]byte, error)
MarshalJSON serializes AssetMaterialization into JSON, normalizing metadata.
type Closed ¶
type Closed struct {
Exception *Exception `json:"exception,omitempty"` // An optional exception if the context closed with an error.
}
Closed represents the parameters for the "closed" method.
type Context ¶
type Context[T any] struct { // contains filtered or unexported fields }
Context represents a Dagster Pipes execution context.
func NewContext ¶ added in v0.0.2
NewContext initializes a new Context using the provided configuration functions. Validates the Dagster Pipes process and sets up context, messages, and communication channels.
func (*Context[T]) AssetKeys ¶ added in v0.0.4
AssetKeys retrieves the list of asset keys associated with the context.
func (*Context[T]) Close ¶
Close closes the context and sends a "closed" message. Ensures the context cannot be used after it is closed.
func (*Context[T]) Extras ¶ added in v0.0.4
func (c *Context[T]) Extras() T
Extras retrieves additional data associated with the context.
func (*Context[T]) LogDebug ¶ added in v0.0.6
LogDebug logs a debug-level message using the context's logger. Also sends the log message to the message channel.
func (*Context[T]) LogError ¶ added in v0.0.6
LogError logs an error-level message using the context's logger. Also sends the log message to the message channel.
func (*Context[T]) LogInfo ¶ added in v0.0.6
LogInfo logs an informational message using the context's logger. Also sends the log message to the message channel.
func (*Context[T]) LogWarn ¶ added in v0.0.6
LogWarn logs a warning-level message using the context's logger. Also sends the log message to the message channel.
func (*Context[T]) ReportAssetCheck ¶
func (c *Context[T]) ReportAssetCheck(check *AssetCheck) error
ReportAssetCheck sends a report for an asset check event.
func (*Context[T]) ReportAssetMaterialization ¶
func (c *Context[T]) ReportAssetMaterialization(materialization *AssetMaterialization) error
ReportAssetMaterialization reports an asset materialization event. Ensures duplicate materializations for the same asset key are prevented.
func (*Context[T]) ReportCustomMessage ¶
func (c *Context[T]) ReportCustomMessage(msg *CustomMessage) error
ReportCustomMessage sends a custom message through the context.
func (*Context[T]) ReportException ¶
ReportException records an exception in the context for later reporting.
type ContextData ¶
type ContextData[T any] struct { AssetKeys []string `json:"asset_keys"` // List of asset keys related to the current context. RunID string `json:"run_id"` // Unique identifier for the current Dagster run. Extras T `json:"extras"` // Additional context-specific metadata. }
ContextData represents the runtime context for a Dagster Pipes process, including information about asset keys, the run ID, and any additional metadata.
func (*ContextData[T]) HasAssetKeys ¶
func (d *ContextData[T]) HasAssetKeys() bool
HasAssetKeys checks if any asset keys are defined in the context.
func (*ContextData[T]) IsMultiAsset ¶
func (d *ContextData[T]) IsMultiAsset() bool
IsMultiAsset checks if multiple asset keys are defined in the context.
type ContextLoader ¶
type ContextLoader[T any] interface { LoadContext(params *ContextParams[T]) (*ContextData[T], error) // Load context data from the provided parameters. }
ContextLoader defines an interface for loading a Dagster Pipes context.
type ContextParams ¶
type ContextParams[T any] struct { Data *ContextData[T] `json:"data"` // Context data provided inline. Path string `json:"path"` // File path to load the context data from. Extras map[string]any `json:"extras"` // Additional parameters. }
ContextParams represents the parameters used to load a Dagster Pipes context.
type CustomMessage ¶
type CustomMessage struct {
Payload any `json:"payload"` // The payload of the custom message.
}
CustomMessage represents a custom message to be sent.
type DefaultContextLoader ¶
type DefaultContextLoader[T any] struct{}
DefaultContextLoader is a default implementation of ContextLoader.
func (*DefaultContextLoader[T]) LoadContext ¶
func (l *DefaultContextLoader[T]) LoadContext(params *ContextParams[T]) (*ContextData[T], error)
LoadContext loads context data from a file or inline data in the parameters.
type DefaultMessageWriter ¶
type DefaultMessageWriter struct{}
DefaultMessageWriter is the default implementation of the MessageWriter interface. It supports file-based message channels and serves as a fallback for unrecognized types.
func (*DefaultMessageWriter) Open ¶
func (mw *DefaultMessageWriter) Open(params *MessagesParams) (MessageChannel, error)
Open initializes a file-based MessageChannel if a valid path is provided in the parameters. Returns the created MessageChannel or an error if the path is missing or unsupported.
func (*DefaultMessageWriter) OpenedExtras ¶ added in v0.0.3
func (mw *DefaultMessageWriter) OpenedExtras() map[string]any
OpenedExtras provides additional metadata for the opened message channel. In the default implementation, this method returns an empty map.
type EnvVarParamsLoader ¶
type EnvVarParamsLoader[T any] struct{}
EnvVarParamsLoader implements the ParamsLoader interface using environment variables.
func (*EnvVarParamsLoader[T]) IsDagsterPipesProcess ¶
func (l *EnvVarParamsLoader[T]) IsDagsterPipesProcess() bool
IsDagsterPipesProcess checks if the `DAGSTER_PIPES_CONTEXT` environment variable is set.
func (*EnvVarParamsLoader[T]) LoadContextParams ¶
func (l *EnvVarParamsLoader[T]) LoadContextParams() (*ContextParams[T], error)
LoadContextParams loads context parameters from the environment variable `DAGSTER_PIPES_CONTEXT`.
func (*EnvVarParamsLoader[T]) LoadMessagesParams ¶
func (l *EnvVarParamsLoader[T]) LoadMessagesParams() (*MessagesParams, error)
LoadMessagesParams loads messaging parameters from the environment variable `DAGSTER_PIPES_MESSAGES`.
type Exception ¶
type Exception struct { Name string `json:"name"` Message string `json:"message"` Cause *Exception `json:"cause"` Stack []string `json:"stack"` Context []string `json:"context"` }
Exception represents a structured error with detailed context.
func NewException ¶
NewException creates a new Exception from a given error. If includeStackTrace is true, it captures the stack trace.
type FileMessageWriterChannel ¶
type FileMessageWriterChannel struct {
// contains filtered or unexported fields
}
FileMessageWriterChannel implements the MessageChannel interface. It writes messages to a specified file.
func NewFileMessageWriterChannel ¶
func NewFileMessageWriterChannel(path string) (*FileMessageWriterChannel, error)
NewFileMessageWriterChannel creates a new FileMessageWriterChannel. The provided path specifies the file location for message writing.
func (*FileMessageWriterChannel) Close ¶ added in v0.0.8
func (f *FileMessageWriterChannel) Close() error
Close closes the underlying file handle. Should be called to release resources.
func (*FileMessageWriterChannel) WriteMessage ¶
func (f *FileMessageWriterChannel) WriteMessage(message Message) error
WriteMessage writes a Message to the file specified in the channel's Path. If the file does not exist, it creates it. Messages are appended to the file, with each message serialized as a JSON object followed by a newline.
type Log ¶
type Log struct { Message string `json:"message"` // The log message. Level string `json:"level"` // The log level (e.g., DEBUG, INFO). }
Log represents the parameters for the "log" method.
type Message ¶
type Message struct { DagsterPipesVersion string `json:"__dagster_pipes_version"` // The version of Dagster Pipes protocol. Method Method `json:"method"` // The communication method. Params any `json:"params"` // The parameters of the message. }
Message represents a communication message in Dagster Pipes.
type MessageChannel ¶
type MessageChannel interface { // WriteMessage writes a Message to the underlying channel. WriteMessage(message Message) error // Close closes the underlying channel and releases any associated resources. // After calling Close, the channel cannot be used for further writes. Close() error }
MessageChannel represents an interface for writing messages.
type MessageWriter ¶
type MessageWriter interface { // Open initializes a MessageChannel using the provided parameters. // Returns the created MessageChannel or an error if initialization fails. Open(params *MessagesParams) (MessageChannel, error) // OpenedExtras retrieves any additional metadata or information // associated with the opened message channel. OpenedExtras() map[string]any }
MessageWriter defines an interface for creating and managing message channels. It includes methods for opening a message channel and retrieving additional metadata about the opened channel.
type MessagesParams ¶
type MessagesParams struct { Stdio string `json:"stdio"` // Configuration for standard I/O messaging. Path string `json:"path"` // File path for message exchange. }
MessagesParams represents parameters for managing messages between Dagster Pipes processes.
type MetadataValue ¶
type MetadataValue struct { RawValue any `json:"raw_value"` // The raw value of the metadata. Type string `json:"type"` // The type of the metadata. }
MetadataValue represents a metadata entry with a type and raw value.
type Method ¶
type Method string
Method represents different types of communication methods.
const ( // MethodClosed indicates that the context is closed. MethodClosed Method = "closed" // MethodLog represents a log message. MethodLog Method = "log" // MethodOpened indicates that the context is opened. MethodOpened Method = "opened" // MethodReportAssetMaterialization reports an asset materialization event. MethodReportAssetMaterialization Method = "report_asset_materialization" // MethodReportAssetCheck reports an asset check event. MethodReportAssetCheck Method = "report_asset_check" // MethodReportCustomMessage sends a custom message. MethodReportCustomMessage Method = "report_custom_message" )
type Opened ¶
type Opened[T any] struct { Extras T `json:"extras"` // Additional metadata for the opened method. }
Opened represents the parameters for the "opened" method.
type Options ¶
type Options[T any] struct { ParamsLoader ParamsLoader[T] // Loader for context parameters. ContextLoader ContextLoader[T] // Loader for the execution context. MessageWriter MessageWriter // Writer for communication messages. Logger *slog.Logger // Logger instance for logging messages. }
Options defines configuration options for creating a new Context.
type ParamsLoader ¶
type ParamsLoader[T any] interface { LoadContextParams() (*ContextParams[T], error) // Load parameters for the Dagster context. LoadMessagesParams() (*MessagesParams, error) // Load parameters for messaging configuration. IsDagsterPipesProcess() bool // Check if the current process is a Dagster Pipes process. }
ParamsLoader defines an interface for loading context and messaging parameters.
type RunFunc ¶
RunFunc defines a function type that processes a Dagster Pipes context. It should return an error if any issues occur during execution.
type Session ¶
type Session[T any] struct { // contains filtered or unexported fields }
Session represents a managed session for interacting with a Dagster Pipes context. It ensures proper handling of the context lifecycle.
func New ¶
New initializes a new Session by creating and opening a Dagster Pipes context. It accepts a variadic list of option functions to configure the context. Returns a pointer to the created Session or an error if the context fails to open.
func (*Session[T]) Close ¶
Close finalizes the session by closing the associated context. Returns an error if the context fails to close.