Documentation ¶
Index ¶
- Variables
- func HandleDebug(w http.ResponseWriter, r *http.Request)
- func PrintLeakedStoppers(t testing.TB)
- type Closer
- type CloserFn
- type Option
- type SpanOption
- type Stopper
- func (s *Stopper) AddCloser(c Closer)
- func (s *Stopper) IsStopped() <-chan struct{}
- func (s *Stopper) NumTasks() int
- func (s *Stopper) Quiesce(ctx context.Context)
- func (s *Stopper) Recover(ctx context.Context)
- func (s *Stopper) RunAsyncTask(ctx context.Context, taskName string, f func(context.Context)) error
- func (s *Stopper) RunAsyncTaskEx(ctx context.Context, opt TaskOpts, f func(context.Context)) error
- func (s *Stopper) RunTask(ctx context.Context, taskName string, f func(context.Context)) error
- func (s *Stopper) RunTaskWithErr(ctx context.Context, taskName string, f func(context.Context) error) error
- func (s *Stopper) SetTracer(tr *tracing.Tracer)
- func (s *Stopper) ShouldQuiesce() <-chan struct{}
- func (s *Stopper) Stop(ctx context.Context)
- func (s *Stopper) Tracer() *tracing.Tracer
- func (s *Stopper) WithCancelOnQuiesce(ctx context.Context) (context.Context, func())
- type TaskOpts
Constants ¶
This section is empty.
Variables ¶
var ErrThrottled = errors.New("throttled on async limiting semaphore")
ErrThrottled is returned from RunAsyncTaskEx in the event that there is no more capacity for async tasks, as limited by the semaphore.
ErrUnavailable indicates that the server is quiescing and is unable to process new work.
Functions ¶
func HandleDebug ¶
func HandleDebug(w http.ResponseWriter, r *http.Request)
HandleDebug responds with the list of stopper tasks actively running.
func PrintLeakedStoppers ¶
PrintLeakedStoppers prints (using `t`) the creation site of each Stopper for which `.Stop()` has not yet been called.
Types ¶
type Closer ¶
type Closer interface {
Close()
}
Closer is an interface for objects to attach to the stopper to be closed once the stopper completes.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
An Option can be passed to NewStopper.
func OnPanic ¶
func OnPanic(handler func(interface{})) Option
OnPanic is an option which lets the Stopper recover from all panics using the provided panic handler.
When Stop() is invoked during stack unwinding, OnPanic is also invoked, but Stop() may not have carried out its duties.
func WithTracer ¶
WithTracer is an option for NewStopper() supplying the Tracer to use for creating spans for tasks. Note that for tasks asking for a child span, the parent's tracer is used instead of this one.
type SpanOption ¶
type SpanOption int
SpanOption specifies the type of tracing span that a task will run in.
const ( // FollowsFromSpan makes the task run in a span that's not included in the // caller's recording (if any). For external tracers, the task's span will // still reference the caller's span through a FollowsFrom relationship. If // the caller doesn't have a span, then the task will execute in a root span. // // Use this when the caller will not wait for the task to finish, but a // relationship between the caller and the task might still be useful to // visualize in a trace collector. FollowsFromSpan SpanOption = iota // ChildSpan makes the task run in a span that's a child of the caller's span // (if any). The child is included in the parent's recording. For external // tracers, the child references the parent through a ChildOf relationship. // If the caller doesn't have a span, then the task will execute in a root // span. // // ChildSpan has consequences on memory usage: the memory lifetime of // the task's span becomes tied to the lifetime of the parent. Generally // ChildSpan should be used when the parent usually waits for the task to // complete, and the parent is not a long-running process. ChildSpan // SterileRootSpan makes the task run in a root span that doesn't get any // children. Anybody trying to create a child of the task's span will get a // root span. This is suitable for long-running tasks: connecting children to // these tasks would lead to infinitely-long traces, and connecting the // long-running task to its parent is also problematic because of the // different lifetimes. SterileRootSpan )
type Stopper ¶
type Stopper struct {
// contains filtered or unexported fields
}
A Stopper provides control over the lifecycle of goroutines started through it via its RunTask, RunAsyncTask, and other similar methods.
When Stop is invoked, the Stopper ¶
- it invokes Quiesce, which causes the Stopper to refuse new work (that is, its Run* family of methods starts returning ErrUnavailable), closes the channel returned by ShouldQuiesce, and blocks until until no more tasks are tracked, then
- it runs all of the methods supplied to AddCloser, then
- closes the IsStopped channel.
When ErrUnavailable is returned from a task, the caller needs to handle it appropriately by terminating any work that it had hoped to defer to the task (which is guaranteed to never have been invoked). A simple example of this can be seen in the below snippet:
var wg sync.WaitGroup wg.Add(1) if err := s.RunAsyncTask("foo", func(ctx context.Context) { defer wg.Done() }); err != nil { // Task never ran. wg.Done() }
To ensure that tasks that do get started are sensitive to Quiesce, they need to observe the ShouldQuiesce channel similar to how they are expected to observe context cancellation:
func x() { select { case <-s.ShouldQuiesce: return case <-ctx.Done(): return case <-someChan: // Do work. } }
TODO(tbg): many improvements here are possible:
- propagate quiescing via context cancellation
- better API around refused tasks
- all the other things mentioned in: https://sqlfmt/cockroach/issues/58164
func NewStopper ¶
NewStopper returns an instance of Stopper.
func (*Stopper) AddCloser ¶
AddCloser adds an object to close after the stopper has been stopped.
WARNING: memory resources acquired by this method will stay around for the lifetime of the Stopper. Use with care to avoid leaking memory.
A closer that is added after Stop has already been called will be called immediately.
func (*Stopper) IsStopped ¶
func (s *Stopper) IsStopped() <-chan struct{}
IsStopped returns a channel which will be closed after Stop() has been invoked to full completion, meaning all workers have completed and all closers have been closed.
func (*Stopper) Quiesce ¶
Quiesce moves the stopper to state quiescing and waits until all tasks complete. This is used from Stop() and unittests.
func (*Stopper) Recover ¶
Recover is used internally by Stopper to provide a hook for recovery of panics on goroutines started by the Stopper. It can also be invoked explicitly (via "defer s.Recover()") on goroutines that are created outside of Stopper.
func (*Stopper) RunAsyncTask ¶
func (s *Stopper) RunAsyncTask( ctx context.Context, taskName string, f func(context.Context), ) error
RunAsyncTask is like RunTask, except the callback is run in a goroutine. The method doesn't block for the callback to finish execution.
See also RunAsyncTaskEx for a version with more options.
func (*Stopper) RunAsyncTaskEx ¶
RunAsyncTaskEx is like RunTask, except the callback f is run in a goroutine. The call doesn't block for the callback to finish execution.
func (*Stopper) RunTask ¶
RunTask adds one to the count of tasks left to quiesce in the system. Any worker which is a "first mover" when starting tasks must call this method before starting work on a new task. First movers include goroutines launched to do periodic work and the kv/db.go gateway which accepts external client requests.
taskName is used as the "operation" field of the span opened for this task and is visible in traces. It's also part of reports printed by stoppers waiting to stop. The convention is <package name>.<struct name>: <succinct description of the task's action>
Returns an error to indicate that the system is currently quiescing and function f was not called.
func (*Stopper) RunTaskWithErr ¶
func (s *Stopper) RunTaskWithErr( ctx context.Context, taskName string, f func(context.Context) error, ) error
RunTaskWithErr is like RunTask(), but takes in a callback that can return an error. The error is returned to the caller.
func (*Stopper) SetTracer ¶
SetTracer sets the tracer to be used for task spans. This cannot be called concurrently with starting tasks.
Note that for tasks asking for a child span, the parent's tracer is used instead of this one.
When possible, prefer supplying the tracer to the ctor through WithTracer.
func (*Stopper) ShouldQuiesce ¶
func (s *Stopper) ShouldQuiesce() <-chan struct{}
ShouldQuiesce returns a channel which will be closed when Stop() has been invoked and outstanding tasks should begin to quiesce.
func (*Stopper) Stop ¶
Stop signals all live workers to stop and then waits for each to confirm it has stopped.
Stop is idempotent; concurrent calls will block on each other.
func (*Stopper) WithCancelOnQuiesce ¶
WithCancelOnQuiesce returns a child context which is canceled when the returned cancel function is called or when the Stopper begins to quiesce, whichever happens first.
Canceling this context releases resources associated with it, so code should call cancel as soon as the operations running in this Context complete.
type TaskOpts ¶
type TaskOpts struct { // TaskName is a human-readable name for the operation. Used as the name of // the tracing span. TaskName string // SpanOpt controls the kind of span that the task will run in. SpanOpt SpanOption // If set, Sem is used as a semaphore limiting the concurrency (each task has // weight 1). // // It is the caller's responsibility to ensure that Sem is closed when the // stopper is quiesced. For quotapools which live for the lifetime of the // stopper, it is generally best to register the sem with the stopper using // AddCloser. Sem *quotapool.IntPool // If Sem is not nil, WaitForSem specifies whether the call blocks or not when // the semaphore is full. If true, the call blocks until the semaphore is // available in order to push back on callers that may be trying to create // many tasks. If false, returns immediately with an error if the semaphore is // not available. WaitForSem bool }
TaskOpts groups the task execution options for RunAsyncTaskEx.