concurrency

package
v0.21.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: Apache-2.0 Imports: 5 Imported by: 32

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AllErrorAggregator

type AllErrorAggregator func(errors []error) error

AllErrorAggregator aggregates errors.

type AllErrorRecorder

type AllErrorRecorder struct {
	Errors []error
	// contains filtered or unexported fields
}

AllErrorRecorder records all the errors.

func (*AllErrorRecorder) AggrError

func (aer *AllErrorRecorder) AggrError(aggr AllErrorAggregator) error

AggrError runs the provided aggregator over all errors and returns the error from aggregator.

func (*AllErrorRecorder) Error

func (aer *AllErrorRecorder) Error() error

Error returns an aggregate of all errors by concatenation.

func (*AllErrorRecorder) ErrorStrings

func (aer *AllErrorRecorder) ErrorStrings() []string

ErrorStrings returns all errors as string array.

func (*AllErrorRecorder) GetErrors

func (aer *AllErrorRecorder) GetErrors() []error

GetErrors returns a reference to the internal errors array.

Note that the array is not copied, so this should only be used once the recording is complete.

func (*AllErrorRecorder) HasErrors

func (aer *AllErrorRecorder) HasErrors() bool

HasErrors returns true if we ever recorded an error

func (*AllErrorRecorder) RecordError

func (aer *AllErrorRecorder) RecordError(err error)

RecordError records a possible error: - does nothing if err is nil

type Error added in v0.13.0

type Error struct {
	Err         error
	MustWaitFor bool
}

Error is used in ErrGroup.Wait function It contains the error that was received along with the information of whether the received error originated from a tablet that we must wait for

type ErrorGroup added in v0.10.0

type ErrorGroup struct {
	NumGoroutines        int
	NumRequiredSuccesses int
	NumAllowedErrors     int
	NumErrorsToWaitFor   int
}

ErrorGroup provides a function for waiting for N goroutines to complete with at least Z successes which we wanted to wait for and at least X overall successes and no more than Y failures, and cancelling the rest.

It should be used as follows:

	errCh := make(chan concurrency.Error)
	errgroupCtx, errgroupCancel := context.WithCancel(ctx)

	for _, arg := range args {
		arg := arg

		go func() {
			err := doWork(errGroupCtx, arg)
			errCh <- concurrency.Error{
				Err: err,
				MustWaitFor: <boolean>,
			}
		}()
	}

	errgroup := concurrency.ErrorGroup{
		NumGoroutines: len(args),
		NumRequiredSuccess: 5, // need at least 5 to respond with nil error before cancelling the rest
		NumAllowedErrors: 1, // if more than 1 responds with non-nil error, cancel the rest
		NumErrorsToWaitFor: 1, // if there is 1 response that we must wait for, before cancelling the rest
	}
	errRec := errgroup.Wait(errgroupCancel, errCh)

	if errRec.HasErrors() {
		// ...
	}

The NumErrorsToWaitFor should be equal to the number of
Errors that are received on the channel which have MustWaitFor set

func (ErrorGroup) Wait added in v0.10.0

func (eg ErrorGroup) Wait(cancel context.CancelFunc, errors chan Error) *AllErrorRecorder

Wait waits for a group of goroutines that are sending errors to the given Error channel, and are cancellable by the given cancel function.

Wait will cancel any outstanding goroutines when the following condition is met:

  • At least NumErrorsToWaitFor results with MustWaitFor set have been consumed on the error channel AND one of the following two - (1) More than NumAllowedErrors non-nil results have been consumed on the error channel.

    (2) At least NumRequiredSuccesses nil results have been consumed on the error channel.

After the cancellation condition is triggered, Wait will continue to consume results off the Error channel so as to not permanently block any of those cancelled goroutines.

When finished consuming results from all goroutines, cancelled or otherwise, Wait returns an AllErrorRecorder that contains all errors returned by any of those goroutines. It does not close the Error channel.

type ErrorRecorder

type ErrorRecorder interface {
	RecordError(error)
	HasErrors() bool
	Error() error
}

ErrorRecorder offers a way to record errors during complex asynchronous operations. Various implementation will offer different services.

type FirstErrorRecorder

type FirstErrorRecorder struct {
	// contains filtered or unexported fields
}

FirstErrorRecorder records the first error, logs the others. Error() will return the first recorded error or nil.

func (*FirstErrorRecorder) Error

func (fer *FirstErrorRecorder) Error() error

Error returns the first error we saw, or nil

func (*FirstErrorRecorder) HasErrors

func (fer *FirstErrorRecorder) HasErrors() bool

HasErrors returns true if we ever recorded an error

func (*FirstErrorRecorder) RecordError

func (fer *FirstErrorRecorder) RecordError(err error)

RecordError records a possible error: - does nothing if err is nil - only records the first error reported - the rest is just logged

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL