stop

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2022 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrThrottled = errors.New("throttled on async limiting semaphore")

ErrThrottled is returned from RunAsyncTaskEx in the event that there is no more capacity for async tasks, as limited by the semaphore.

View Source
var ErrUnavailable = &roachpb.NodeUnavailableError{}

ErrUnavailable indicates that the server is quiescing and is unable to process new work.

Functions

func HandleDebug

func HandleDebug(w http.ResponseWriter, r *http.Request)

HandleDebug responds with the list of stopper tasks actively running.

func PrintLeakedStoppers

func PrintLeakedStoppers(t testing.TB)

PrintLeakedStoppers prints (using `t`) the creation site of each Stopper for which `.Stop()` has not yet been called.

Types

type Closer

type Closer interface {
	Close()
}

Closer is an interface for objects to attach to the stopper to be closed once the stopper completes.

type CloserFn

type CloserFn func()

CloserFn is type that allows any function to be a Closer.

func (CloserFn) Close

func (f CloserFn) Close()

Close implements the Closer interface.

type Option

type Option interface {
	// contains filtered or unexported methods
}

An Option can be passed to NewStopper.

func OnPanic

func OnPanic(handler func(interface{})) Option

OnPanic is an option which lets the Stopper recover from all panics using the provided panic handler.

When Stop() is invoked during stack unwinding, OnPanic is also invoked, but Stop() may not have carried out its duties.

func WithTracer

func WithTracer(t *tracing.Tracer) Option

WithTracer is an option for NewStopper() supplying the Tracer to use for creating spans for tasks. Note that for tasks asking for a child span, the parent's tracer is used instead of this one.

type SpanOption

type SpanOption int

SpanOption specifies the type of tracing span that a task will run in.

const (
	// FollowsFromSpan makes the task run in a span that's not included in the
	// caller's recording (if any). For external tracers, the task's span will
	// still reference the caller's span through a FollowsFrom relationship. If
	// the caller doesn't have a span, then the task will execute in a root span.
	//
	// Use this when the caller will not wait for the task to finish, but a
	// relationship between the caller and the task might still be useful to
	// visualize in a trace collector.
	FollowsFromSpan SpanOption = iota

	// ChildSpan makes the task run in a span that's a child of the caller's span
	// (if any). The child is included in the parent's recording. For external
	// tracers, the child references the parent through a ChildOf relationship.
	// If the caller doesn't have a span, then the task will execute in a root
	// span.
	//
	// ChildSpan has consequences on memory usage: the memory lifetime of
	// the task's span becomes tied to the lifetime of the parent. Generally
	// ChildSpan should be used when the parent usually waits for the task to
	// complete, and the parent is not a long-running process.
	ChildSpan

	// SterileRootSpan makes the task run in a root span that doesn't get any
	// children. Anybody trying to create a child of the task's span will get a
	// root span. This is suitable for long-running tasks: connecting children to
	// these tasks would lead to infinitely-long traces, and connecting the
	// long-running task to its parent is also problematic because of the
	// different lifetimes.
	SterileRootSpan
)

type Stopper

type Stopper struct {
	// contains filtered or unexported fields
}

A Stopper provides control over the lifecycle of goroutines started through it via its RunTask, RunAsyncTask, and other similar methods.

When Stop is invoked, the Stopper

  • it invokes Quiesce, which causes the Stopper to refuse new work (that is, its Run* family of methods starts returning ErrUnavailable), closes the channel returned by ShouldQuiesce, and blocks until until no more tasks are tracked, then
  • it runs all of the methods supplied to AddCloser, then
  • closes the IsStopped channel.

When ErrUnavailable is returned from a task, the caller needs to handle it appropriately by terminating any work that it had hoped to defer to the task (which is guaranteed to never have been invoked). A simple example of this can be seen in the below snippet:

var wg sync.WaitGroup
wg.Add(1)
if err := s.RunAsyncTask("foo", func(ctx context.Context) {
  defer wg.Done()
}); err != nil {
  // Task never ran.
  wg.Done()
}

To ensure that tasks that do get started are sensitive to Quiesce, they need to observe the ShouldQuiesce channel similar to how they are expected to observe context cancellation:

func x() {
  select {
  case <-s.ShouldQuiesce:
    return
  case <-ctx.Done():
    return
  case <-someChan:
    // Do work.
  }
}

TODO(tbg): many improvements here are possible:

func NewStopper

func NewStopper(options ...Option) *Stopper

NewStopper returns an instance of Stopper.

func (*Stopper) AddCloser

func (s *Stopper) AddCloser(c Closer)

AddCloser adds an object to close after the stopper has been stopped.

WARNING: memory resources acquired by this method will stay around for the lifetime of the Stopper. Use with care to avoid leaking memory.

A closer that is added after Stop has already been called will be called immediately.

func (*Stopper) IsStopped

func (s *Stopper) IsStopped() <-chan struct{}

IsStopped returns a channel which will be closed after Stop() has been invoked to full completion, meaning all workers have completed and all closers have been closed.

func (*Stopper) NumTasks

func (s *Stopper) NumTasks() int

NumTasks returns the number of active tasks.

func (*Stopper) Quiesce

func (s *Stopper) Quiesce(ctx context.Context)

Quiesce moves the stopper to state quiescing and waits until all tasks complete. This is used from Stop() and unittests.

func (*Stopper) Recover

func (s *Stopper) Recover(ctx context.Context)

Recover is used internally by Stopper to provide a hook for recovery of panics on goroutines started by the Stopper. It can also be invoked explicitly (via "defer s.Recover()") on goroutines that are created outside of Stopper.

func (*Stopper) RunAsyncTask

func (s *Stopper) RunAsyncTask(
	ctx context.Context, taskName string, f func(context.Context),
) error

RunAsyncTask is like RunTask, except the callback is run in a goroutine. The method doesn't block for the callback to finish execution.

See also RunAsyncTaskEx for a version with more options.

func (*Stopper) RunAsyncTaskEx

func (s *Stopper) RunAsyncTaskEx(ctx context.Context, opt TaskOpts, f func(context.Context)) error

RunAsyncTaskEx is like RunTask, except the callback f is run in a goroutine. The call doesn't block for the callback to finish execution.

func (*Stopper) RunTask

func (s *Stopper) RunTask(ctx context.Context, taskName string, f func(context.Context)) error

RunTask adds one to the count of tasks left to quiesce in the system. Any worker which is a "first mover" when starting tasks must call this method before starting work on a new task. First movers include goroutines launched to do periodic work and the kv/db.go gateway which accepts external client requests.

taskName is used as the "operation" field of the span opened for this task and is visible in traces. It's also part of reports printed by stoppers waiting to stop. The convention is <package name>.<struct name>: <succinct description of the task's action>

Returns an error to indicate that the system is currently quiescing and function f was not called.

func (*Stopper) RunTaskWithErr

func (s *Stopper) RunTaskWithErr(
	ctx context.Context, taskName string, f func(context.Context) error,
) error

RunTaskWithErr is like RunTask(), but takes in a callback that can return an error. The error is returned to the caller.

func (*Stopper) SetTracer

func (s *Stopper) SetTracer(tr *tracing.Tracer)

SetTracer sets the tracer to be used for task spans. This cannot be called concurrently with starting tasks.

Note that for tasks asking for a child span, the parent's tracer is used instead of this one.

When possible, prefer supplying the tracer to the ctor through WithTracer.

func (*Stopper) ShouldQuiesce

func (s *Stopper) ShouldQuiesce() <-chan struct{}

ShouldQuiesce returns a channel which will be closed when Stop() has been invoked and outstanding tasks should begin to quiesce.

func (*Stopper) Stop

func (s *Stopper) Stop(ctx context.Context)

Stop signals all live workers to stop and then waits for each to confirm it has stopped.

Stop is idempotent; concurrent calls will block on each other.

func (*Stopper) Tracer

func (s *Stopper) Tracer() *tracing.Tracer

Tracer returns the Tracer that the Stopper will use for tasks.

func (*Stopper) WithCancelOnQuiesce

func (s *Stopper) WithCancelOnQuiesce(ctx context.Context) (context.Context, func())

WithCancelOnQuiesce returns a child context which is canceled when the returned cancel function is called or when the Stopper begins to quiesce, whichever happens first.

Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.

type TaskOpts

type TaskOpts struct {
	// TaskName is a human-readable name for the operation. Used as the name of
	// the tracing span.
	TaskName string

	// SpanOpt controls the kind of span that the task will run in.
	SpanOpt SpanOption

	// If set, Sem is used as a semaphore limiting the concurrency (each task has
	// weight 1).
	//
	// It is the caller's responsibility to ensure that Sem is closed when the
	// stopper is quiesced. For quotapools which live for the lifetime of the
	// stopper, it is generally best to register the sem with the stopper using
	// AddCloser.
	Sem *quotapool.IntPool
	// If Sem is not nil, WaitForSem specifies whether the call blocks or not when
	// the semaphore is full. If true, the call blocks until the semaphore is
	// available in order to push back on callers that may be trying to create
	// many tasks. If false, returns immediately with an error if the semaphore is
	// not available.
	WaitForSem bool
}

TaskOpts groups the task execution options for RunAsyncTaskEx.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL