tracer

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2024 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AnsiMoveCursorUp        = "\x1b[%dA"
	AnsiMoveCursorStartLine = "\x1b[0G"
	AnsiEraseToEnd          = "\x1b[0J"
)
View Source
const (
	MinFoldingDepth = 1
)

Variables

View Source
var (
	StatusRunning              string = color.BlueString("▷")
	StatusCompleted            string = color.GreenString("✓")
	StatusTerminated           string = color.RedString("x")
	StatusCanceled             string = color.YellowString("x")
	StatusFailed               string = color.RedString("!")
	StatusContinueAsNew        string = color.GreenString("»")
	StatusTimedOut             string = color.RedString("⏱")
	StatusUnspecifiedScheduled string = "•"
	StatusCancelRequested      string = color.YellowString("▷")
	StatusTimerWaiting         string = color.BlueString("⧖")
	StatusTimerFired           string = color.GreenString("⧖")
	StatusTimerCanceled        string = color.YellowString("⧖")
)
View Source
var StatusIconsLegend = []StatusIcon{
	{
		Name: "Unspecified or Scheduled", Icon: StatusUnspecifiedScheduled,
	},
	{
		Name: "Running", Icon: StatusRunning,
	},
	{
		Name: "Completed", Icon: StatusCompleted,
	},
	{
		Name: "Continue As New", Icon: StatusContinueAsNew,
	},
	{
		Name: "Failed", Icon: StatusFailed,
	},
	{
		Name: "Timed Out", Icon: StatusTimedOut,
	},
	{
		Name: "Cancel Requested", Icon: StatusCancelRequested,
	},
	{
		Name: "Canceled", Icon: StatusCanceled,
	},
	{
		Name: "Terminated", Icon: StatusTerminated,
	},
}

Functions

func CountBytesPrintWidth

func CountBytesPrintWidth(b []byte) int

CountBytesPrintWidth counts the number of printed characters a byte array will take.

func ExecutionStatus

func ExecutionStatus(exec ExecutionState) string

ExecutionStatus returns the icon (with color) for a given ExecutionState's status.

func FmtDuration

func FmtDuration(duration time.Duration) string

FmtDuration produces a string for a given duration, rounding to the most reasonable timeframe.

func FmtTimeSince

func FmtTimeSince(start time.Time, duration time.Duration) string

FmtTimeSince returns a string representing the difference it time between start and close (or start and now).

func GetExitCode

func GetExitCode(exec *WorkflowExecutionState) int

GetExitCode returns the exit code for a given workflow execution status.

func LineHeight

func LineHeight(line []byte, maxWidth int) int

LineHeight returns the number of lines a string is going to take

func MoveCursorUp

func MoveCursorUp(lines int) string

func NewTailBoxBoundBuffer

func NewTailBoxBoundBuffer(buf *bytes.Buffer, maxLines int, maxWidth int) (*bytes.Buffer, int)

NewTailBoxBoundBuffer returns trims a buffer to fit into the box defined by maxLines and maxWidth and the number of lines printing the buffer will take. For no limit on lines, use maxLines = 0. NOTE: This is a best guess. It'll take ansi codes into account but some other chars might throw this off.

func PrintAndExit

func PrintAndExit(writer *TermWriter, tmpl *ExecutionTemplate, update *WorkflowExecutionUpdate) (int, error)

func ProgressString

func ProgressString(currentEvents int64, totalEvents int64) string

func ReverseLinesBuffer

func ReverseLinesBuffer(buf *bytes.Buffer) [][]byte

func ShouldFoldStatus

func ShouldFoldStatus(foldStatus []enums.WorkflowExecutionStatus, noFold bool) func(*WorkflowExecutionState, int) bool

ShouldFoldStatus returns a predicate that will return true when the workflow status can be folded for a given depth. NOTE: Depth starts at 0 (i.e. the root workflow will be at depth 0).

func StripBytesAnsi

func StripBytesAnsi(b []byte) []byte

StripBytesAnsi removes all ansi codes from a byte array.

func WithInterrupts

func WithInterrupts(signals ...os.Signal) func(*WorkflowTracer)

WithInterrupts sets the signals that will interrupt the tracer

func WithOptions

func WithOptions(opts WorkflowTracerOptions) func(*WorkflowTracer)

WithOptions sets the view options for the tracer

func WithOutput

func WithOutput(w io.Writer) func(*WorkflowTracer)

Types

type ActivityExecutionState

type ActivityExecutionState struct {
	// ActivityId is the Activity's id, which will usually be the EventId of the Event it was scheduled with.
	ActivityId string
	// Status is the Execution's Status based on the last event that was processed.
	Status ActivityExecutionStatus
	// Type is the name/type of Activity.
	Type *common.ActivityType
	// Attempt contains the current Activity Execution's attempt.
	// Since Activities' events aren't reported until the Activity is closed, this will always be the last attempt.
	Attempt int32
	// Failure contains the last failure that the Execution has reported (if any).
	Failure *failure.Failure
	// RetryState contains the reason provided for whether the Task should or shouldn't be retried.
	RetryState enums.RetryState

	// StartTime is the time the Execution was started (based on the start Event).
	StartTime *timestamppb.Timestamp
	// CloseTime is the time the Execution was closed (based on the closing Event). Will be nil if the Execution hasn't been closed yet.
	CloseTime *timestamppb.Timestamp
}

ActivityExecutionState is a snapshot of the state of an Activity's Execution. It implements the ExecutionState interface so it can be referenced as a WorkflowExecutionState's child state.

func NewActivityExecutionState

func NewActivityExecutionState() *ActivityExecutionState

func (*ActivityExecutionState) GetAttempt

func (state *ActivityExecutionState) GetAttempt() int32

func (*ActivityExecutionState) GetDuration

func (state *ActivityExecutionState) GetDuration() time.Duration

func (*ActivityExecutionState) GetFailure

func (state *ActivityExecutionState) GetFailure() *failure.Failure

func (*ActivityExecutionState) GetName

func (state *ActivityExecutionState) GetName() string

func (*ActivityExecutionState) GetRetryState

func (state *ActivityExecutionState) GetRetryState() enums.RetryState

func (*ActivityExecutionState) GetStartTime

func (state *ActivityExecutionState) GetStartTime() time.Time

func (*ActivityExecutionState) Update

func (state *ActivityExecutionState) Update(event *history.HistoryEvent)

Update updates the ActivityExecutionState with a HistoryEvent.

type ActivityExecutionStatus

type ActivityExecutionStatus int32

ActivityExecutionStatus is the Status of an ActivityExecution, analogous to enums.WorkflowExecutionStatus.

var (
	ACTIVITY_EXECUTION_STATUS_UNSPECIFIED      ActivityExecutionStatus = 0
	ACTIVITY_EXECUTION_STATUS_SCHEDULED        ActivityExecutionStatus = 1
	ACTIVITY_EXECUTION_STATUS_RUNNING          ActivityExecutionStatus = 2
	ACTIVITY_EXECUTION_STATUS_COMPLETED        ActivityExecutionStatus = 3
	ACTIVITY_EXECUTION_STATUS_FAILED           ActivityExecutionStatus = 4
	ACTIVITY_EXECUTION_STATUS_TIMED_OUT        ActivityExecutionStatus = 5
	ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED ActivityExecutionStatus = 6
	ACTIVITY_EXECUTION_STATUS_CANCELED         ActivityExecutionStatus = 7
)

type ExecutionState

type ExecutionState interface {
	// Update updates an ExecutionState with a new HistoryEvent.
	Update(*history.HistoryEvent)
	// GetName returns the state's name (usually for displaying to the user).
	GetName() string
	// GetAttempt returns the attempts to execute the current ExecutionState.
	GetAttempt() int32
	// GetFailure returns the execution's failure (if any).
	GetFailure() *failure.Failure
	// GetRetryState returns the execution's RetryState.
	GetRetryState() enums.RetryState

	GetDuration() time.Duration
	GetStartTime() time.Time
}

ExecutionState provides a common interface to any execution (Workflows, Activities and Timers in this case) updated through HistoryEvents.

type ExecutionTemplate

type ExecutionTemplate struct {
	// contains filtered or unexported fields
}

ExecutionTemplate contains the necessary templates and utilities to render WorkflowExecutionState and its child states.

func NewExecutionTemplate

func NewExecutionTemplate(foldStatus []enums.WorkflowExecutionStatus, noFold bool) (*ExecutionTemplate, error)

NewExecutionTemplate initializes the templates with the necessary functions.

func (*ExecutionTemplate) Execute

func (t *ExecutionTemplate) Execute(writer io.Writer, state ExecutionState, depth int) error

Execute executes the templates for a given Execution state and writes it into the ExecutionTemplate's writer.

type StateTemplate

type StateTemplate struct {
	State ExecutionState
	Depth int
}

type StatusIcon

type StatusIcon struct {
	Name string
	Icon string
}

StatusIcon has names for each status (useful for help messages).

type TermWriter

type TermWriter struct {
	// contains filtered or unexported fields
}

func NewTermWriter

func NewTermWriter(out io.Writer) *TermWriter

NewTermWriter returns a new TermWriter set to output to Stdout. TermWriter is a stateful writer designed to print into a terminal window by limiting the number of lines printed what fits and clearing them on new outputs.

func (*TermWriter) Flush

func (w *TermWriter) Flush(trim bool) error

Flush writes to the out and resets the buffer. It should be called after the last call to Write to ensure that any data buffered in the TermWriter is written to output. Any incomplete escape sequence at the end is considered complete for formatting purposes. An error is returned if the contents of the buffer cannot be written to the underlying output stream.

func (*TermWriter) GetSize

func (w *TermWriter) GetSize() (int, int)

func (*TermWriter) WithSize

func (w *TermWriter) WithSize(width, height int) *TermWriter

WithSize sets the size of TermWriter to the desired width and height.

func (*TermWriter) WithTerminalSize

func (w *TermWriter) WithTerminalSize() *TermWriter

WithTerminalSize sets the size of TermWriter to that of the terminal.

func (*TermWriter) Write

func (w *TermWriter) Write(buf []byte) (n int, err error)

Write save the contents of buf to the writer b. The only errors returned are ones encountered while writing to the underlying buffer. TODO: Consider merging it into Flush since we might always want to write and flush. Alternatively, we can pass the writer to the Sprint functions and write (but we might run into issues if the normal writing is interrupted and the interrupt writing starts).

func (*TermWriter) WriteLine

func (w *TermWriter) WriteLine(s string) (n int, err error)

WriteLine writes a string into TermWriter.

type TimerExecutionState

type TimerExecutionState struct {
	TimerId string
	// Name is the name of the Timer (if any has been given to it)
	Name string
	// StartToFireTimeout is the amount of time to elapse before the timer fires.
	StartToFireTimeout *durationpb.Duration
	// Status is the Execution's Status based on the last event that was processed.
	Status TimerExecutionStatus
	// StartTime is the time the Execution was started (based on the start Event).
	StartTime *timestamppb.Timestamp
	// CloseTime is the time the Execution was closed (based on the closing Event). Will be nil if the Execution hasn't been closed yet.
	CloseTime *timestamppb.Timestamp
}

TimerExecutionState contains information about a Timer as an execution. It implements the ExecutionState interface so it can be referenced as a WorkflowExecutionState's child state.

func (*TimerExecutionState) GetAttempt

func (t *TimerExecutionState) GetAttempt() int32

func (*TimerExecutionState) GetDuration

func (t *TimerExecutionState) GetDuration() time.Duration

func (*TimerExecutionState) GetFailure

func (t *TimerExecutionState) GetFailure() *failure.Failure

func (*TimerExecutionState) GetName

func (t *TimerExecutionState) GetName() string

func (*TimerExecutionState) GetRetryState

func (t *TimerExecutionState) GetRetryState() enums.RetryState

GetRetryState will always return RETRY_STATE_UNSPECIFIED since Timers don't retry.

func (*TimerExecutionState) GetStartTime

func (t *TimerExecutionState) GetStartTime() time.Time

func (*TimerExecutionState) Update

func (t *TimerExecutionState) Update(event *history.HistoryEvent)

Update updates the TimerExecutionState with a HistoryEvent.

type TimerExecutionStatus

type TimerExecutionStatus int32

TimerExecutionStatus is the Status of a TimerExecution, analogous to enums.WorkflowExecutionStatus.

var (
	TIMER_STATUS_WAITING  TimerExecutionStatus = 0
	TIMER_STATUS_FIRED    TimerExecutionStatus = 1
	TIMER_STATUS_CANCELED TimerExecutionStatus = 2
)

type WorkflowExecutionState

type WorkflowExecutionState struct {
	// Execution is the workflow's execution (WorkflowId and RunId).
	Execution *common.WorkflowExecution
	// Type is the name/type of Workflow.
	Type *common.WorkflowType
	// StartTime is the time the Execution was started (based on the first Execution's Event).
	//StartTime *time.Time
	StartTime *timestamppb.Timestamp
	// CloseTime is the time the Execution was closed (based on the first Execution's Event). Will be nil if the Execution hasn't been closed yet.
	//CloseTime *time.Time
	CloseTime *timestamppb.Timestamp
	// Status is the Execution's Status based on the last event that was processed.
	Status enums.WorkflowExecutionStatus
	// IsArchived will be true if the workflow has been archived.
	IsArchived bool

	// LastEventId is the EventId of the last processed HistoryEvent.
	LastEventId int64
	// HistoryLength is the number of HistoryEvents available in the server. It will zero for archived workflows and non-zero positive for any other workflow executions.
	HistoryLength int64

	// ChildStates contains all the ExecutionStates contained by this WorkflowExecutionState in order of execution.
	ChildStates []ExecutionState

	// Non-successful closed states
	// Failure contains the last failure that the Execution has reported (if any).
	Failure *failure.Failure
	// Termination contains the last available termination information that the Workflow Execution has reported (if any).
	Termination *history.WorkflowExecutionTerminatedEventAttributes
	// CancelRequest contains the last request that has been made to cancel the Workflow Execution (if any).
	CancelRequest *history.WorkflowExecutionCancelRequestedEventAttributes
	// RetryState contains the reason provided for whether the Task should or shouldn't be retried.
	RetryState enums.RetryState

	// Timeout and retry policies
	// WorkflowExecutionTimeout contains the Workflow Execution's timeout if it has been set.
	WorkflowExecutionTimeout *durationpb.Duration
	// Attempt contains the current Workflow Execution's attempt.
	Attempt int32
	// MaximumAttempts contains the maximum number of times the Workflow Execution is allowed to retry before failing.
	MaximumAttempts int32

	// ParentWorkflowExecution identifies the parent Workflow and the execution run.
	ParentWorkflowExecution *common.WorkflowExecution
	// contains filtered or unexported fields
}

WorkflowExecutionState is a snapshot of the state of a WorkflowExecution. It is updated through HistoryEvents.

func NewWorkflowExecutionState

func NewWorkflowExecutionState(wfId, runId string) *WorkflowExecutionState

func (*WorkflowExecutionState) GetAttempt

func (state *WorkflowExecutionState) GetAttempt() int32

func (*WorkflowExecutionState) GetChildWorkflowByEventId

func (state *WorkflowExecutionState) GetChildWorkflowByEventId(initiatedEventId int64) (*WorkflowExecutionState, bool)

GetChildWorkflowByEventId returns a child workflow for a given initiated event id

func (*WorkflowExecutionState) GetDuration

func (state *WorkflowExecutionState) GetDuration() time.Duration

func (*WorkflowExecutionState) GetFailure

func (state *WorkflowExecutionState) GetFailure() *failure.Failure

func (*WorkflowExecutionState) GetName

func (state *WorkflowExecutionState) GetName() string

func (*WorkflowExecutionState) GetNumberOfEvents

func (state *WorkflowExecutionState) GetNumberOfEvents() (int64, int64)

GetNumberOfEvents returns a count of the number of events processed and the total for a workflow execution. This method iteratively sums the LastEventId (the sequential id of the last event processed) and the HistoryLength for all child workflows

func (*WorkflowExecutionState) GetRetryState

func (state *WorkflowExecutionState) GetRetryState() enums.RetryState

func (*WorkflowExecutionState) GetStartTime

func (state *WorkflowExecutionState) GetStartTime() time.Time

func (*WorkflowExecutionState) IsClosed

func (state *WorkflowExecutionState) IsClosed() (bool, error)

IsClosed returns true when the Workflow Execution is closed. A Closed status means that the Workflow Execution cannot make further progress.

func (*WorkflowExecutionState) Update

func (state *WorkflowExecutionState) Update(event *history.HistoryEvent)

Update updates the WorkflowExecutionState and its child states with a HistoryEvent.

type WorkflowExecutionUpdate

type WorkflowExecutionUpdate struct {
	State *WorkflowExecutionState
}

func (*WorkflowExecutionUpdate) GetState

func (update *WorkflowExecutionUpdate) GetState() *WorkflowExecutionState

type WorkflowExecutionUpdateIterator

type WorkflowExecutionUpdateIterator interface {
	HasNext() bool
	Next() (*WorkflowExecutionUpdate, error)
}

WorkflowExecutionUpdateIterator is the interface the provides iterative updates, analogous to the HistoryEventIterator interface.

func GetWorkflowExecutionUpdates

func GetWorkflowExecutionUpdates(ctx context.Context, client sdkclient.Client, wfId, runId string, fetchAll bool, foldStatus []enums.WorkflowExecutionStatus, depth int, concurrency int) (WorkflowExecutionUpdateIterator, error)

GetWorkflowExecutionUpdates gets workflow execution updates for a particular workflow - workflow ID of the workflow - runID can be default (empty string) - depth of child workflows to request updates for (-1 for unlimited depth) - concurrency of requests (non-zero positive integer) Returns iterator (see client.GetWorkflowHistory) that provides updated WorkflowExecutionState snapshots. Example: To print a workflow's state whenever there's updates

iter := GetWorkflowExecutionUpdates(ctx, client, wfId, runId, -1, 5)
var state *WorkflowExecutionState
for iter.HasNext() {
	update = iter.Next()
	PrintWorkflowState(update.State)
}

type WorkflowExecutionUpdateIteratorImpl

type WorkflowExecutionUpdateIteratorImpl struct {
	// contains filtered or unexported fields
}

WorkflowExecutionUpdateIteratorImpl implements the iterator interface. Keeps information about the last processed update and receives new updates through the updateChan channel.

func (*WorkflowExecutionUpdateIteratorImpl) HasNext

func (iter *WorkflowExecutionUpdateIteratorImpl) HasNext() bool

HasNext checks if there's any more updates in the updateChan channel. Returns false if the channel has been closed.

func (*WorkflowExecutionUpdateIteratorImpl) Next

Next return the last processed execution update. HasNext has to be called first (following the HasNext/Next pattern).

type WorkflowStateJob

type WorkflowStateJob struct {
	// contains filtered or unexported fields
}

WorkflowStateJob implements a WorkerJob to retrieve updates for a WorkflowExecutionState and its child workflows.

func NewWorkflowStateJob

func NewWorkflowStateJob(ctx context.Context, client sdkclient.Client, state *WorkflowExecutionState, fetchAll bool, foldStatus []enums.WorkflowExecutionStatus, depth int, updateChan chan struct{}) (*WorkflowStateJob, error)

NewWorkflowStateJob returns a new WorkflowStateJob. It requires an updateChan to signal when there's updates.

func (*WorkflowStateJob) GetChildJob

func (job *WorkflowStateJob) GetChildJob(event *history.HistoryEvent) (*WorkflowStateJob, error)

GetChildJob gets a new child job and appends it to the list of childJobs. These jobs don't start until the parent is catched up.

func (*WorkflowStateJob) Run

func (job *WorkflowStateJob) Run(group *pond.TaskGroupWithContext) func() error

Run starts the WorkflowStateJob, which retrieves the workflow's events and spawns new jobs for the child workflows once it's up-to-date. New jobs are submitted to the pool when the job is up-to-date to reduce the amount of unnecessary history fetches (e.g. when the child workflow is already completed).

func (*WorkflowStateJob) ShouldStart

func (job *WorkflowStateJob) ShouldStart() bool

ShouldStart will return true if the state is in a status that requires requesting its event history. This will help reduce the amount of event histories requested when they're not needed.

type WorkflowTracer

type WorkflowTracer struct {
	// contains filtered or unexported fields
}

func NewWorkflowTracer

func NewWorkflowTracer(client client.Client, options ...func(tracer *WorkflowTracer)) (*WorkflowTracer, error)

func (*WorkflowTracer) GetExecutionUpdates

func (t *WorkflowTracer) GetExecutionUpdates(ctx context.Context, wid, rid string) error

GetExecutionUpdates gets workflow execution updates for a particular workflow

func (*WorkflowTracer) PrintUpdates

func (t *WorkflowTracer) PrintUpdates(tmpl *ExecutionTemplate, updatePeriod time.Duration) (int, error)

type WorkflowTracerOptions

type WorkflowTracerOptions struct {
	NoFold       bool
	FoldStatuses []enums.WorkflowExecutionStatus
	Depth        int
	Concurrency  int

	UpdatePeriod time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL