events

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const MaxErrorMessageLength = 102400 //100KB

Variables

This section is empty.

Functions

func IDFromMessage

func IDFromMessage(message proto.Message) ([]byte, error)

Generates an ID which uniquely represents the admin event entity and associated phase.

Types

type Config

type Config struct {
	Type     EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."`
	FilePath string             `json:"file-path" pflag:",For file types, specify where the file should be located."`
	Rate     int64              `json:"rate" pflag:",Max rate at which events can be recorded per second."`
	Capacity int                `json:"capacity" pflag:",The max bucket size for event recording tokens."`
}

func GetConfig

func GetConfig(ctx context.Context) *Config

GetConfig Retrieves current global config for storage.

func (Config) GetPFlagSet

func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet

GetPFlagSet will return strongly types pflags for all fields in Config and its nested types. The format of the flags is json-name.json-sub-name... etc.

type EventRecorder

type EventRecorder interface {
	RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent) error

	RecordTaskEvent(ctx context.Context, event *event.TaskExecutionEvent) error

	RecordWorkflowEvent(ctx context.Context, event *event.WorkflowExecutionEvent) error
}

Recorder for Workflow, Node, and Task events

func NewEventRecorder

func NewEventRecorder(eventSink EventSink, scope promutils.Scope) EventRecorder

Construct a new Event Recorder

type EventReportingType

type EventReportingType = string
const (
	EventSinkLog   EventReportingType = "log"
	EventSinkFile  EventReportingType = "file"
	EventSinkAdmin EventReportingType = "admin"
)

type EventSink

type EventSink interface {

	// Send the Event to this EventSink. The EventSink will identify the type of message through the
	// specified eventType and sink it appropriately based on the type.
	Sink(ctx context.Context, message proto.Message) error

	// Callers should close the EventSink when it is no longer being used as there may be long living
	// connections.
	Close() error
}

EventSink determines how/where Events are emitted to. The type of EventSink the operator wants should be configurable. In Kozmo, we also have local implementations so that operators can emit events without dependency on other services.

func ConstructEventSink

func ConstructEventSink(ctx context.Context, config *Config, scope promutils.Scope) (EventSink, error)

func NewAdminEventSink

func NewAdminEventSink(ctx context.Context, adminClient service.AdminServiceClient, config *Config, filter fastcheck.Filter) (EventSink, error)

Constructs a new EventSink that sends events to KozmoAdmin through gRPC

func NewFileSink

func NewFileSink(path string) (EventSink, error)

TODO this will cause multiple handles to the same file if we open multiple syncs. Maybe we should remove this

func NewLogSink

func NewLogSink() (EventSink, error)

func NewStdoutSink

func NewStdoutSink() (EventSink, error)

type EventSinkType

type EventSinkType = string

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

File Writer is just a thin wrapper around io.Writer

func (*FileWriter) Flush

func (fw *FileWriter) Flush() error

func (*FileWriter) Write

func (fw *FileWriter) Write(ctx context.Context, content string) error

type LogWriter

type LogWriter struct{}

Log Writer writes to the log output

func (*LogWriter) Flush

func (w *LogWriter) Flush() error

func (*LogWriter) Write

func (w *LogWriter) Write(ctx context.Context, content string) error

type NodeEventRecorder

type NodeEventRecorder interface {
	// RecordNodeEvent records execution events indicating the node has undergone a phase change and additional metadata.
	RecordNodeEvent(ctx context.Context, event *event.NodeExecutionEvent, eventConfig *config.EventConfig) error
}

NodeEventRecorder records Node events

func NewNodeEventRecorder

func NewNodeEventRecorder(eventSink EventSink, scope promutils.Scope, store *storage.DataStore) NodeEventRecorder

type StdWriter

type StdWriter struct{}

Std Writer is just the default standard if no sink type is provided

func (*StdWriter) Flush

func (s *StdWriter) Flush() error

func (*StdWriter) Write

func (s *StdWriter) Write(ctx context.Context, content string) error

type TaskEventRecorder

type TaskEventRecorder interface {
	// Records task execution events indicating the task has undergone a phase change and additional metadata.
	RecordTaskEvent(ctx context.Context, event *event.TaskExecutionEvent, eventConfig *config.EventConfig) error
}

Recorder for Task events

func NewTaskEventRecorder

func NewTaskEventRecorder(eventSink EventSink, scope promutils.Scope, store *storage.DataStore) TaskEventRecorder

type WorkflowEventRecorder

type WorkflowEventRecorder interface {
	// Records workflow execution events indicating the workflow has undergone a phase change and additional metadata.
	RecordWorkflowEvent(ctx context.Context, event *event.WorkflowExecutionEvent, eventConfig *config.EventConfig) error
}

Recorder for Workflow events

func NewWorkflowEventRecorder

func NewWorkflowEventRecorder(eventSink EventSink, scope promutils.Scope, store *storage.DataStore) WorkflowEventRecorder

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL