Documentation ¶
Index ¶
- Constants
- Variables
- func CountBytesPrintWidth(b []byte) int
- func ExecutionStatus(exec ExecutionState) string
- func FmtDuration(duration time.Duration) string
- func FmtTimeSince(start time.Time, duration time.Duration) string
- func GetExitCode(exec *WorkflowExecutionState) int
- func LineHeight(line []byte, maxWidth int) int
- func MoveCursorUp(lines int) string
- func NewTailBoxBoundBuffer(buf *bytes.Buffer, maxLines int, maxWidth int) (*bytes.Buffer, int)
- func PrintAndExit(writer *TermWriter, tmpl *ExecutionTemplate, update *WorkflowExecutionUpdate) (int, error)
- func ProgressString(currentEvents int64, totalEvents int64) string
- func ReverseLinesBuffer(buf *bytes.Buffer) [][]byte
- func ShouldFoldStatus(foldStatus []enums.WorkflowExecutionStatus, noFold bool) func(*WorkflowExecutionState, int) bool
- func StripBytesAnsi(b []byte) []byte
- func WithInterrupts(signals ...os.Signal) func(*WorkflowTracer)
- func WithOptions(opts WorkflowTracerOptions) func(*WorkflowTracer)
- func WithOutput(w io.Writer) func(*WorkflowTracer)
- type ActivityExecutionState
- func (state *ActivityExecutionState) GetAttempt() int32
- func (state *ActivityExecutionState) GetDuration() time.Duration
- func (state *ActivityExecutionState) GetFailure() *failure.Failure
- func (state *ActivityExecutionState) GetName() string
- func (state *ActivityExecutionState) GetRetryState() enums.RetryState
- func (state *ActivityExecutionState) GetStartTime() time.Time
- func (state *ActivityExecutionState) Update(event *history.HistoryEvent)
- type ActivityExecutionStatus
- type ExecutionState
- type ExecutionTemplate
- type StateTemplate
- type StatusIcon
- type TermWriter
- func (w *TermWriter) Flush(trim bool) error
- func (w *TermWriter) GetSize() (int, int)
- func (w *TermWriter) WithSize(width, height int) *TermWriter
- func (w *TermWriter) WithTerminalSize() *TermWriter
- func (w *TermWriter) Write(buf []byte) (n int, err error)
- func (w *TermWriter) WriteLine(s string) (n int, err error)
- type TimerExecutionState
- func (t *TimerExecutionState) GetAttempt() int32
- func (t *TimerExecutionState) GetDuration() time.Duration
- func (t *TimerExecutionState) GetFailure() *failure.Failure
- func (t *TimerExecutionState) GetName() string
- func (t *TimerExecutionState) GetRetryState() enums.RetryState
- func (t *TimerExecutionState) GetStartTime() time.Time
- func (t *TimerExecutionState) Update(event *history.HistoryEvent)
- type TimerExecutionStatus
- type WorkflowExecutionState
- func (state *WorkflowExecutionState) GetAttempt() int32
- func (state *WorkflowExecutionState) GetChildWorkflowByEventId(initiatedEventId int64) (*WorkflowExecutionState, bool)
- func (state *WorkflowExecutionState) GetDuration() time.Duration
- func (state *WorkflowExecutionState) GetFailure() *failure.Failure
- func (state *WorkflowExecutionState) GetName() string
- func (state *WorkflowExecutionState) GetNumberOfEvents() (int64, int64)
- func (state *WorkflowExecutionState) GetRetryState() enums.RetryState
- func (state *WorkflowExecutionState) GetStartTime() time.Time
- func (state *WorkflowExecutionState) IsClosed() (bool, error)
- func (state *WorkflowExecutionState) Update(event *history.HistoryEvent)
- type WorkflowExecutionUpdate
- type WorkflowExecutionUpdateIterator
- type WorkflowExecutionUpdateIteratorImpl
- type WorkflowStateJob
- type WorkflowTracer
- type WorkflowTracerOptions
Constants ¶
const ( AnsiMoveCursorUp = "\x1b[%dA" AnsiMoveCursorStartLine = "\x1b[0G" AnsiEraseToEnd = "\x1b[0J" )
const (
MinFoldingDepth = 1
)
Variables ¶
var ( StatusRunning string = color.BlueString("▷") StatusCompleted string = color.GreenString("✓") StatusTerminated string = color.RedString("x") StatusCanceled string = color.YellowString("x") StatusFailed string = color.RedString("!") StatusContinueAsNew string = color.GreenString("»") StatusTimedOut string = color.RedString("⏱") StatusUnspecifiedScheduled string = "•" StatusCancelRequested string = color.YellowString("▷") StatusTimerWaiting string = color.BlueString("⧖") StatusTimerFired string = color.GreenString("⧖") StatusTimerCanceled string = color.YellowString("⧖") )
var StatusIconsLegend = []StatusIcon{ { Name: "Unspecified or Scheduled", Icon: StatusUnspecifiedScheduled, }, { Name: "Running", Icon: StatusRunning, }, { Name: "Completed", Icon: StatusCompleted, }, { Name: "Continue As New", Icon: StatusContinueAsNew, }, { Name: "Failed", Icon: StatusFailed, }, { Name: "Timed Out", Icon: StatusTimedOut, }, { Name: "Cancel Requested", Icon: StatusCancelRequested, }, { Name: "Canceled", Icon: StatusCanceled, }, { Name: "Terminated", Icon: StatusTerminated, }, }
Functions ¶
func CountBytesPrintWidth ¶
CountBytesPrintWidth counts the number of printed characters a byte array will take.
func ExecutionStatus ¶
func ExecutionStatus(exec ExecutionState) string
ExecutionStatus returns the icon (with color) for a given ExecutionState's status.
func FmtDuration ¶
FmtDuration produces a string for a given duration, rounding to the most reasonable timeframe.
func FmtTimeSince ¶
FmtTimeSince returns a string representing the difference it time between start and close (or start and now).
func GetExitCode ¶
func GetExitCode(exec *WorkflowExecutionState) int
GetExitCode returns the exit code for a given workflow execution status.
func LineHeight ¶
LineHeight returns the number of lines a string is going to take
func MoveCursorUp ¶
func NewTailBoxBoundBuffer ¶
NewTailBoxBoundBuffer returns trims a buffer to fit into the box defined by maxLines and maxWidth and the number of lines printing the buffer will take. For no limit on lines, use maxLines = 0. NOTE: This is a best guess. It'll take ansi codes into account but some other chars might throw this off.
func PrintAndExit ¶
func PrintAndExit(writer *TermWriter, tmpl *ExecutionTemplate, update *WorkflowExecutionUpdate) (int, error)
func ProgressString ¶
func ReverseLinesBuffer ¶
func ShouldFoldStatus ¶
func ShouldFoldStatus(foldStatus []enums.WorkflowExecutionStatus, noFold bool) func(*WorkflowExecutionState, int) bool
ShouldFoldStatus returns a predicate that will return true when the workflow status can be folded for a given depth. NOTE: Depth starts at 0 (i.e. the root workflow will be at depth 0).
func StripBytesAnsi ¶
StripBytesAnsi removes all ansi codes from a byte array.
func WithInterrupts ¶
func WithInterrupts(signals ...os.Signal) func(*WorkflowTracer)
WithInterrupts sets the signals that will interrupt the tracer
func WithOptions ¶
func WithOptions(opts WorkflowTracerOptions) func(*WorkflowTracer)
WithOptions sets the view options for the tracer
func WithOutput ¶
func WithOutput(w io.Writer) func(*WorkflowTracer)
Types ¶
type ActivityExecutionState ¶
type ActivityExecutionState struct { // ActivityId is the Activity's id, which will usually be the EventId of the Event it was scheduled with. ActivityId string // Status is the Execution's Status based on the last event that was processed. Status ActivityExecutionStatus // Type is the name/type of Activity. Type *common.ActivityType // Attempt contains the current Activity Execution's attempt. // Since Activities' events aren't reported until the Activity is closed, this will always be the last attempt. Attempt int32 // Failure contains the last failure that the Execution has reported (if any). Failure *failure.Failure // RetryState contains the reason provided for whether the Task should or shouldn't be retried. RetryState enums.RetryState // StartTime is the time the Execution was started (based on the start Event). StartTime *timestamppb.Timestamp // CloseTime is the time the Execution was closed (based on the closing Event). Will be nil if the Execution hasn't been closed yet. CloseTime *timestamppb.Timestamp }
ActivityExecutionState is a snapshot of the state of an Activity's Execution. It implements the ExecutionState interface so it can be referenced as a WorkflowExecutionState's child state.
func NewActivityExecutionState ¶
func NewActivityExecutionState() *ActivityExecutionState
func (*ActivityExecutionState) GetAttempt ¶
func (state *ActivityExecutionState) GetAttempt() int32
func (*ActivityExecutionState) GetDuration ¶
func (state *ActivityExecutionState) GetDuration() time.Duration
func (*ActivityExecutionState) GetFailure ¶
func (state *ActivityExecutionState) GetFailure() *failure.Failure
func (*ActivityExecutionState) GetName ¶
func (state *ActivityExecutionState) GetName() string
func (*ActivityExecutionState) GetRetryState ¶
func (state *ActivityExecutionState) GetRetryState() enums.RetryState
func (*ActivityExecutionState) GetStartTime ¶
func (state *ActivityExecutionState) GetStartTime() time.Time
func (*ActivityExecutionState) Update ¶
func (state *ActivityExecutionState) Update(event *history.HistoryEvent)
Update updates the ActivityExecutionState with a HistoryEvent.
type ActivityExecutionStatus ¶
type ActivityExecutionStatus int32
ActivityExecutionStatus is the Status of an ActivityExecution, analogous to enums.WorkflowExecutionStatus.
var ( ACTIVITY_EXECUTION_STATUS_UNSPECIFIED ActivityExecutionStatus = 0 ACTIVITY_EXECUTION_STATUS_SCHEDULED ActivityExecutionStatus = 1 ACTIVITY_EXECUTION_STATUS_RUNNING ActivityExecutionStatus = 2 ACTIVITY_EXECUTION_STATUS_COMPLETED ActivityExecutionStatus = 3 ACTIVITY_EXECUTION_STATUS_FAILED ActivityExecutionStatus = 4 ACTIVITY_EXECUTION_STATUS_TIMED_OUT ActivityExecutionStatus = 5 ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED ActivityExecutionStatus = 6 ACTIVITY_EXECUTION_STATUS_CANCELED ActivityExecutionStatus = 7 )
type ExecutionState ¶
type ExecutionState interface { // Update updates an ExecutionState with a new HistoryEvent. Update(*history.HistoryEvent) // GetName returns the state's name (usually for displaying to the user). GetName() string // GetAttempt returns the attempts to execute the current ExecutionState. GetAttempt() int32 // GetFailure returns the execution's failure (if any). GetFailure() *failure.Failure // GetRetryState returns the execution's RetryState. GetRetryState() enums.RetryState GetDuration() time.Duration GetStartTime() time.Time }
ExecutionState provides a common interface to any execution (Workflows, Activities and Timers in this case) updated through HistoryEvents.
type ExecutionTemplate ¶
type ExecutionTemplate struct {
// contains filtered or unexported fields
}
ExecutionTemplate contains the necessary templates and utilities to render WorkflowExecutionState and its child states.
func NewExecutionTemplate ¶
func NewExecutionTemplate(foldStatus []enums.WorkflowExecutionStatus, noFold bool) (*ExecutionTemplate, error)
NewExecutionTemplate initializes the templates with the necessary functions.
func (*ExecutionTemplate) Execute ¶
func (t *ExecutionTemplate) Execute(writer io.Writer, state ExecutionState, depth int) error
Execute executes the templates for a given Execution state and writes it into the ExecutionTemplate's writer.
type StateTemplate ¶
type StateTemplate struct { State ExecutionState Depth int }
type StatusIcon ¶
StatusIcon has names for each status (useful for help messages).
type TermWriter ¶
type TermWriter struct {
// contains filtered or unexported fields
}
func NewTermWriter ¶
func NewTermWriter(out io.Writer) *TermWriter
NewTermWriter returns a new TermWriter set to output to Stdout. TermWriter is a stateful writer designed to print into a terminal window by limiting the number of lines printed what fits and clearing them on new outputs.
func (*TermWriter) Flush ¶
func (w *TermWriter) Flush(trim bool) error
Flush writes to the out and resets the buffer. It should be called after the last call to Write to ensure that any data buffered in the TermWriter is written to output. Any incomplete escape sequence at the end is considered complete for formatting purposes. An error is returned if the contents of the buffer cannot be written to the underlying output stream.
func (*TermWriter) GetSize ¶
func (w *TermWriter) GetSize() (int, int)
func (*TermWriter) WithSize ¶
func (w *TermWriter) WithSize(width, height int) *TermWriter
WithSize sets the size of TermWriter to the desired width and height.
func (*TermWriter) WithTerminalSize ¶
func (w *TermWriter) WithTerminalSize() *TermWriter
WithTerminalSize sets the size of TermWriter to that of the terminal.
func (*TermWriter) Write ¶
func (w *TermWriter) Write(buf []byte) (n int, err error)
Write save the contents of buf to the writer b. The only errors returned are ones encountered while writing to the underlying buffer. TODO: Consider merging it into Flush since we might always want to write and flush. Alternatively, we can pass the writer to the Sprint functions and write (but we might run into issues if the normal writing is interrupted and the interrupt writing starts).
type TimerExecutionState ¶
type TimerExecutionState struct { TimerId string // Name is the name of the Timer (if any has been given to it) Name string // StartToFireTimeout is the amount of time to elapse before the timer fires. StartToFireTimeout *durationpb.Duration // Status is the Execution's Status based on the last event that was processed. Status TimerExecutionStatus // StartTime is the time the Execution was started (based on the start Event). StartTime *timestamppb.Timestamp // CloseTime is the time the Execution was closed (based on the closing Event). Will be nil if the Execution hasn't been closed yet. CloseTime *timestamppb.Timestamp }
TimerExecutionState contains information about a Timer as an execution. It implements the ExecutionState interface so it can be referenced as a WorkflowExecutionState's child state.
func (*TimerExecutionState) GetAttempt ¶
func (t *TimerExecutionState) GetAttempt() int32
func (*TimerExecutionState) GetDuration ¶
func (t *TimerExecutionState) GetDuration() time.Duration
func (*TimerExecutionState) GetFailure ¶
func (t *TimerExecutionState) GetFailure() *failure.Failure
func (*TimerExecutionState) GetName ¶
func (t *TimerExecutionState) GetName() string
func (*TimerExecutionState) GetRetryState ¶
func (t *TimerExecutionState) GetRetryState() enums.RetryState
GetRetryState will always return RETRY_STATE_UNSPECIFIED since Timers don't retry.
func (*TimerExecutionState) GetStartTime ¶
func (t *TimerExecutionState) GetStartTime() time.Time
func (*TimerExecutionState) Update ¶
func (t *TimerExecutionState) Update(event *history.HistoryEvent)
Update updates the TimerExecutionState with a HistoryEvent.
type TimerExecutionStatus ¶
type TimerExecutionStatus int32
TimerExecutionStatus is the Status of a TimerExecution, analogous to enums.WorkflowExecutionStatus.
var ( TIMER_STATUS_WAITING TimerExecutionStatus = 0 TIMER_STATUS_FIRED TimerExecutionStatus = 1 TIMER_STATUS_CANCELED TimerExecutionStatus = 2 )
type WorkflowExecutionState ¶
type WorkflowExecutionState struct { // Execution is the workflow's execution (WorkflowId and RunId). Execution *common.WorkflowExecution // Type is the name/type of Workflow. Type *common.WorkflowType // StartTime is the time the Execution was started (based on the first Execution's Event). //StartTime *time.Time StartTime *timestamppb.Timestamp // CloseTime is the time the Execution was closed (based on the first Execution's Event). Will be nil if the Execution hasn't been closed yet. //CloseTime *time.Time CloseTime *timestamppb.Timestamp // Status is the Execution's Status based on the last event that was processed. Status enums.WorkflowExecutionStatus // IsArchived will be true if the workflow has been archived. IsArchived bool // LastEventId is the EventId of the last processed HistoryEvent. LastEventId int64 // HistoryLength is the number of HistoryEvents available in the server. It will zero for archived workflows and non-zero positive for any other workflow executions. HistoryLength int64 // ChildStates contains all the ExecutionStates contained by this WorkflowExecutionState in order of execution. ChildStates []ExecutionState // Non-successful closed states // Failure contains the last failure that the Execution has reported (if any). Failure *failure.Failure // Termination contains the last available termination information that the Workflow Execution has reported (if any). Termination *history.WorkflowExecutionTerminatedEventAttributes // CancelRequest contains the last request that has been made to cancel the Workflow Execution (if any). CancelRequest *history.WorkflowExecutionCancelRequestedEventAttributes // RetryState contains the reason provided for whether the Task should or shouldn't be retried. RetryState enums.RetryState // Timeout and retry policies // WorkflowExecutionTimeout contains the Workflow Execution's timeout if it has been set. WorkflowExecutionTimeout *durationpb.Duration // Attempt contains the current Workflow Execution's attempt. Attempt int32 // MaximumAttempts contains the maximum number of times the Workflow Execution is allowed to retry before failing. MaximumAttempts int32 // ParentWorkflowExecution identifies the parent Workflow and the execution run. ParentWorkflowExecution *common.WorkflowExecution // contains filtered or unexported fields }
WorkflowExecutionState is a snapshot of the state of a WorkflowExecution. It is updated through HistoryEvents.
func NewWorkflowExecutionState ¶
func NewWorkflowExecutionState(wfId, runId string) *WorkflowExecutionState
func (*WorkflowExecutionState) GetAttempt ¶
func (state *WorkflowExecutionState) GetAttempt() int32
func (*WorkflowExecutionState) GetChildWorkflowByEventId ¶
func (state *WorkflowExecutionState) GetChildWorkflowByEventId(initiatedEventId int64) (*WorkflowExecutionState, bool)
GetChildWorkflowByEventId returns a child workflow for a given initiated event id
func (*WorkflowExecutionState) GetDuration ¶
func (state *WorkflowExecutionState) GetDuration() time.Duration
func (*WorkflowExecutionState) GetFailure ¶
func (state *WorkflowExecutionState) GetFailure() *failure.Failure
func (*WorkflowExecutionState) GetName ¶
func (state *WorkflowExecutionState) GetName() string
func (*WorkflowExecutionState) GetNumberOfEvents ¶
func (state *WorkflowExecutionState) GetNumberOfEvents() (int64, int64)
GetNumberOfEvents returns a count of the number of events processed and the total for a workflow execution. This method iteratively sums the LastEventId (the sequential id of the last event processed) and the HistoryLength for all child workflows
func (*WorkflowExecutionState) GetRetryState ¶
func (state *WorkflowExecutionState) GetRetryState() enums.RetryState
func (*WorkflowExecutionState) GetStartTime ¶
func (state *WorkflowExecutionState) GetStartTime() time.Time
func (*WorkflowExecutionState) IsClosed ¶
func (state *WorkflowExecutionState) IsClosed() (bool, error)
IsClosed returns true when the Workflow Execution is closed. A Closed status means that the Workflow Execution cannot make further progress.
func (*WorkflowExecutionState) Update ¶
func (state *WorkflowExecutionState) Update(event *history.HistoryEvent)
Update updates the WorkflowExecutionState and its child states with a HistoryEvent.
type WorkflowExecutionUpdate ¶
type WorkflowExecutionUpdate struct {
State *WorkflowExecutionState
}
func (*WorkflowExecutionUpdate) GetState ¶
func (update *WorkflowExecutionUpdate) GetState() *WorkflowExecutionState
type WorkflowExecutionUpdateIterator ¶
type WorkflowExecutionUpdateIterator interface { HasNext() bool Next() (*WorkflowExecutionUpdate, error) }
WorkflowExecutionUpdateIterator is the interface the provides iterative updates, analogous to the HistoryEventIterator interface.
func GetWorkflowExecutionUpdates ¶
func GetWorkflowExecutionUpdates(ctx context.Context, client sdkclient.Client, wfId, runId string, fetchAll bool, foldStatus []enums.WorkflowExecutionStatus, depth int, concurrency int) (WorkflowExecutionUpdateIterator, error)
GetWorkflowExecutionUpdates gets workflow execution updates for a particular workflow - workflow ID of the workflow - runID can be default (empty string) - depth of child workflows to request updates for (-1 for unlimited depth) - concurrency of requests (non-zero positive integer) Returns iterator (see client.GetWorkflowHistory) that provides updated WorkflowExecutionState snapshots. Example: To print a workflow's state whenever there's updates
iter := GetWorkflowExecutionUpdates(ctx, client, wfId, runId, -1, 5) var state *WorkflowExecutionState for iter.HasNext() { update = iter.Next() PrintWorkflowState(update.State) }
type WorkflowExecutionUpdateIteratorImpl ¶
type WorkflowExecutionUpdateIteratorImpl struct {
// contains filtered or unexported fields
}
WorkflowExecutionUpdateIteratorImpl implements the iterator interface. Keeps information about the last processed update and receives new updates through the updateChan channel.
func (*WorkflowExecutionUpdateIteratorImpl) HasNext ¶
func (iter *WorkflowExecutionUpdateIteratorImpl) HasNext() bool
HasNext checks if there's any more updates in the updateChan channel. Returns false if the channel has been closed.
func (*WorkflowExecutionUpdateIteratorImpl) Next ¶
func (iter *WorkflowExecutionUpdateIteratorImpl) Next() (*WorkflowExecutionUpdate, error)
Next return the last processed execution update. HasNext has to be called first (following the HasNext/Next pattern).
type WorkflowStateJob ¶
type WorkflowStateJob struct {
// contains filtered or unexported fields
}
WorkflowStateJob implements a WorkerJob to retrieve updates for a WorkflowExecutionState and its child workflows.
func NewWorkflowStateJob ¶
func NewWorkflowStateJob(ctx context.Context, client sdkclient.Client, state *WorkflowExecutionState, fetchAll bool, foldStatus []enums.WorkflowExecutionStatus, depth int, updateChan chan struct{}) (*WorkflowStateJob, error)
NewWorkflowStateJob returns a new WorkflowStateJob. It requires an updateChan to signal when there's updates.
func (*WorkflowStateJob) GetChildJob ¶
func (job *WorkflowStateJob) GetChildJob(event *history.HistoryEvent) (*WorkflowStateJob, error)
GetChildJob gets a new child job and appends it to the list of childJobs. These jobs don't start until the parent is catched up.
func (*WorkflowStateJob) Run ¶
func (job *WorkflowStateJob) Run(group *pond.TaskGroupWithContext) func() error
Run starts the WorkflowStateJob, which retrieves the workflow's events and spawns new jobs for the child workflows once it's up-to-date. New jobs are submitted to the pool when the job is up-to-date to reduce the amount of unnecessary history fetches (e.g. when the child workflow is already completed).
func (*WorkflowStateJob) ShouldStart ¶
func (job *WorkflowStateJob) ShouldStart() bool
ShouldStart will return true if the state is in a status that requires requesting its event history. This will help reduce the amount of event histories requested when they're not needed.
type WorkflowTracer ¶
type WorkflowTracer struct {
// contains filtered or unexported fields
}
func NewWorkflowTracer ¶
func NewWorkflowTracer(client client.Client, options ...func(tracer *WorkflowTracer)) (*WorkflowTracer, error)
func (*WorkflowTracer) GetExecutionUpdates ¶
func (t *WorkflowTracer) GetExecutionUpdates(ctx context.Context, wid, rid string) error
GetExecutionUpdates gets workflow execution updates for a particular workflow
func (*WorkflowTracer) PrintUpdates ¶
func (t *WorkflowTracer) PrintUpdates(tmpl *ExecutionTemplate, updatePeriod time.Duration) (int, error)