unison

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2021 License: Apache-2.0 Imports: 6 Imported by: 81

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrGroupClosed = errors.New("group closed")

ErrGroupClosed indicates that the WaitGroup is currently closed, and no more routines can be started.

Functions

This section is empty.

Types

type Canceler added in v0.0.2

type Canceler interface {
	Done() <-chan struct{}
	Err() error
}

Canceler interface, that can be used to pass along some shutdown signal to child goroutines.

type Cell added in v0.1.0

type Cell struct {
	// contains filtered or unexported fields
}

Cell stores some state of type interface{}. Intermittent updates are lost, in case the Cell is updated faster than the consumer tries to read for state updates. Updates are immediate, there will be no backpressure applied to producers.

In case the Cell is used to transmit updates without backpressure, the absolute state must be computed by the producer beforehand.

A typical use-case for cell is to generate asynchronous configuration updates (no deltas).

The zero value of Cell is valid, but a value of type Cell can not be copied.

Example (Acking)

ExampleCellACK tracks the number of ACKed events without backpressure in the generating thread, even if the consumer is blocked. The consumer computes

type exampleACKer struct {
	state      *Cell
	ackedWrite uint
	ackedRead  uint
}

// exampleACK acks a single event by updating the 'absolute' state.
// The function return immediately, even if the "reader" process is
// blocking for a long time.
exampleACK := func(acker *exampleACKer) {
	acker.ackedWrite++
	acker.state.Set(acker.ackedWrite)
}

// exampleWaitACKed waits for state changes and returns the number of
// events that have been acked since the last read. It returns the accumulated state.
exampleWaitACKed := func(acker *exampleACKer, ctx context.Context) (uint, error) {
	st, err := acker.state.Wait(ctx)
	if err != nil {
		return 0, err
	}

	v := st.(uint)
	acker.ackedRead, v = v, v-acker.ackedRead
	return v, nil
}

const max = 100
acker := &exampleACKer{state: NewCell(0)}

// start go-routine that ACKs single events
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() { // ACKer thread
	defer wg.Done()

	// We ACK event by event, but merge the overall state
	// by reporting the absolute value.
	for send := 0; send < max; send++ {
		exampleACK(acker)
	}
}()

// reader loop
var totalACKed uint
for totalACKed < max {
	acked, _ := exampleWaitACKed(acker, context.TODO())

	// Handle 'N" events being ACKed
	totalACKed += acked
}

fmt.Println("Total:", totalACKed)
Output:

Total: 100

func NewCell added in v0.1.0

func NewCell(st interface{}) *Cell

NewCell creates a new call instance with its initial state. Subsequent reads will return this state, if there have been no updates.

func (*Cell) Get added in v0.1.0

func (c *Cell) Get() interface{}

Get returns the current state.

func (*Cell) Set added in v0.1.0

func (c *Cell) Set(st interface{})

Set updates the state of the Cell and unblocks a waiting consumer. Set does not block.

func (*Cell) Wait added in v0.1.0

func (c *Cell) Wait(cancel Canceler) (interface{}, error)

Wait blocks until it an update since the last call to Get or Wait has been found. The cancel context can be used to interrupt the call to Wait early. The error value will be set to the value returned by cancel.Err() in case Wait was interrupted. Wait does not produce any errors that need to be handled by itself.

type Group added in v0.0.2

type Group interface {
	// Go method returns an error if the task can not be started. The error
	// returned by the task itself is not supposed to be returned, as the error is
	// assumed to be generated asynchronously.
	Go(fn func(context.Context) error) error
}

Group interface, that can be used to start tasks. The tasks started will spawn go-routines, and will get a shutdown signal by the provided Canceler.

func ClosedGroup added in v0.0.2

func ClosedGroup(reportedError error) Group

ClosedGroup creates a Group that always fails to start a go-routine. Go will return reportedError on each attempt to create a go routine. If reportedError is nil, ErrGroupClosed will be used.

type MultiErrGroup added in v0.0.2

type MultiErrGroup struct {
	// contains filtered or unexported fields
}

MultiErrGroup is a collection of goroutines working on subtasks concurrently. The group waits until all subtasks have finished and collects all errors encountered.

The zero value of MultiErrGroup is a valid group.

func (*MultiErrGroup) Go added in v0.0.2

func (g *MultiErrGroup) Go(fn func() error)

Go starts a new go-routine, collecting errors encounted into the MultiErrGroup.

func (*MultiErrGroup) Wait added in v0.0.2

func (g *MultiErrGroup) Wait() []error

Wait waits until all go-routines have been stopped and returns all errors encountered.

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

Mutex provides a mutex based on go channels. The lock operations support timeout or cancellation by a context. Moreover one can try to lock the mutex from within a select statement when using Await.

The zero value of Mutex will not be able to Lock the mutex ever. The Lock method will never return. Calling Unlock will panic.

func MakeMutex

func MakeMutex() Mutex

MakeMutex creates a mutex.

func (Mutex) Await

func (c Mutex) Await() <-chan struct{}

Await returns a channel that will be triggered if the lock attempt did succeed. One can use the channel with select-case. The mutex is assumed to be locked if the branch waiting on the mutex has been triggered.

func (Mutex) Lock

func (c Mutex) Lock()

Lock blocks until the mutex has been acquired. The zero value of Mutex will block forever.

func (Mutex) LockContext

func (c Mutex) LockContext(context doneContext) error

LockContext tries to lock the mutex. The Log operation can be cancelled by the context. LockContext returns nil on success, otherwise the error value returned by context.Err, which MUST NOT return nil after cancellation.

func (Mutex) LockTimeout

func (c Mutex) LockTimeout(duration time.Duration) bool

LockTimeout will try to lock the mutex. A failed lock attempt returns false, once the amount of configured duration has been passed.

If duration is 0, then the call behaves like TryLock. If duration is <0, then the call behaves like Lock if the Mutex has been initialized, otherwise fails.

The zero value of Mutex will never succeed.

func (Mutex) TryLock

func (c Mutex) TryLock() bool

TryLock attempts to lock the mutex. If the mutex has been already locked false is returned.

func (Mutex) Unlock

func (c Mutex) Unlock()

Unlock unlocks the mutex.

The zero value of Mutex will panic.

type SafeWaitGroup added in v0.0.2

type SafeWaitGroup struct {
	// contains filtered or unexported fields
}

SafeWaitGroup provides a safe alternative to WaitGroup, that instead of panicing returns an error when Wait has been called.

func SafeWaitGroupWithCancel added in v0.1.0

func SafeWaitGroupWithCancel(parent Canceler) *SafeWaitGroup

SafeWaitGroupWithCancel creates a SafeWaitGroup that will be closed when the given canceler signals shutdown.

Associated resources are cleaned when the parent context is cancelled, or Stop is called.

func (*SafeWaitGroup) Add added in v0.0.2

func (s *SafeWaitGroup) Add(n int) error

Add adds the delta to the WaitGroup counter. If the counter becomes 0, all goroutines are blocked on Wait will continue.

Add returns an error if 'Wait' has already been called, indicating that no more go-routines should be started.

func (*SafeWaitGroup) Close added in v0.0.2

func (s *SafeWaitGroup) Close()

Close marks the wait group as closed. All calls to Add will fail with ErrGroupClosed after close has been called. Close does not wait until the WaitGroup counter has reached zero, but will return immediately. Use Wait to wait for the counter to become 0.

func (*SafeWaitGroup) Done added in v0.0.2

func (s *SafeWaitGroup) Done()

Done decrements the WaitGroup counter.

func (*SafeWaitGroup) Wait added in v0.0.2

func (s *SafeWaitGroup) Wait()

Wait closes the WaitGroup and blocks until the WaitGroup counter is zero. Add will return errors the moment 'Wait' has been called.

type TaskGroup added in v0.0.2

type TaskGroup struct {
	// OnQuit  configures the behavior when a sub-task returned. If not set
	// all other tasks will continue to run. If the function return true, a
	// shutdown signal is passed, and Go will fail on attempts to start new
	// tasks.
	// Next to the action, does the OnQuit error also return the error value
	// to be recorded. The context.Cancel error will never be recorded.
	//
	// Common OnError handlers are given by ContinueOnErrors, StopOnError,
	// StopOnErrorOrCancel.
	// By default StopOnError will be used.
	OnQuit TaskGroupQuitHandler

	// MaxErrors configures the maximum amount of errors the TaskGroup will record.
	// Older errors will be replaced once the limit is exceeded.
	// If MaxErrors is set to a value < 0, all errors will be recorded.
	MaxErrors int
	// contains filtered or unexported fields
}

TaskGroup implements the Group interface. Once the group is shutting down, no more goroutines can be created via Go. The Stop method of TaskGroup will block until all sub-tasks have returned. Errors from sub-tasks are collected. The Stop method collects all errors and returns a single error summarizing all errors encountered.

By default sub-tasks continue running if any task did encounter an error. This behavior can be modified by setting StopOnError.

The zero value of TaskGroup is fully functional. StopOnError must not be set after the first go-routine has been spawned.

func TaskGroupWithCancel added in v0.1.0

func TaskGroupWithCancel(canceler Canceler) *TaskGroup

TaskGroupWithCancel creates a TaskGroup that gets stopped when the parent context signals shutdown or the Stop method is called.

Although the managed go-routines are signalled to stop when the parent context is done, one still might want to call Stop in order to wait for the managed go-routines to stop.

Associated resources are cleaned when the parent context is cancelled, or Stop is called.

func (*TaskGroup) Context added in v0.2.0

func (t *TaskGroup) Context() context.Context

Context returns the task groups internal context. The internal context will be cancelled if the groups parent context gets cancelled, or Stop has been called.

func (*TaskGroup) Go added in v0.0.2

func (t *TaskGroup) Go(fn func(context.Context) error) error

Go starts a new go-routine and passes a Canceler to signal group shutdown. Errors returned by the function are collected and finally returned on Stop. If the group was stopped before calling Go, then Go will return the ErrGroupClosed error.

func (*TaskGroup) Stop added in v0.0.2

func (t *TaskGroup) Stop() error

Stop sends a shutdown signal to all tasks, and waits for them to finish. It returns an error that contains all errors encountered.

func (*TaskGroup) Wait added in v0.2.0

func (t *TaskGroup) Wait() error

Wait blocks until all owned child routines have been stopped.

type TaskGroupQuitHandler added in v0.2.0

type TaskGroupQuitHandler func(error) (TaskGroupStopAction, error)

type TaskGroupStopAction added in v0.2.0

type TaskGroupStopAction uint

TaskGroupStopAction signals the action to take when a go-routine owned by a TaskGroup did quit.

const (
	// TaskGroupStopActionContinue notifies the TaskGroup that other managed go-routines
	// should not be signalled to shutdown.
	TaskGroupStopActionContinue TaskGroupStopAction = iota

	// TaskGroupStopActionShutdown notifies the TaskGroup that shutdown should be signaled
	// to all maanaged go-routines.
	TaskGroupStopActionShutdown

	// TaskGroupStopActionRestart signals the TaskGroup that the managed go-routine that has
	// just been returned should be restarted.
	TaskGroupStopActionRestart
)

func ContinueOnErrors added in v0.2.0

func ContinueOnErrors(err error) (TaskGroupStopAction, error)

ContinueOnErrors provides a TaskGroup.OnQuit handler, that will ignore any errors. Other go-routines owned by the TaskGroup will continue to run.

func RestartOnError added in v0.2.0

func RestartOnError(err error) (TaskGroupStopAction, error)

RestartOnError provides a TaskGroup.OnQuit handler, that will restart a go-routine if the routine failed with an error.

func StopAll added in v0.2.0

func StopAll(err error) (TaskGroupStopAction, error)

StopAll provides a Taskgroup.OnError handler, that will signal the TaskGroup to shutdown once an owned go-routine returns. The TaskGroup is supposed to stop even on successful return.

func StopOnError added in v0.2.0

func StopOnError(err error) (TaskGroupStopAction, error)

StopOnError provides a TaskGroup.OnError handler, that will signal the Taskgroup to stop all owned go-routines. The context.Canceled error value will be ignored.

func StopOnErrorOrCancel added in v0.2.0

func StopOnErrorOrCancel(err error) (TaskGroupStopAction, error)

StopOnErrorOrCancel provides a TaskGroup.OnError handler, that will signal the Taskgroup to stop all owned go-routines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL