Documentation ¶
Overview ¶
Package bigbuff implements many useful concurrency primitives and utilities.
Index ¶
- Constants
- func Call(caller Callable, options ...CallOption) error
- func CombineContext(ctx context.Context, others ...context.Context) context.Context
- func DefaultCleaner(size int, offsets []int) int
- func ExponentialRetry(ctx context.Context, rate time.Duration, value func() (interface{}, error)) func() (interface{}, error)
- func FatalError(err error) error
- func LinearAttempt(ctx context.Context, rate time.Duration, count int) <-chan time.Time
- func MinDuration(d time.Duration, fn func() (interface{}, error)) func() (interface{}, error)
- func MustCall(caller Callable, options ...CallOption)
- func Range(ctx context.Context, consumer Consumer, ...) (err error)
- func WaitCond(ctx context.Context, cond *sync.Cond, fn func() bool) error
- type Buffer
- func (b *Buffer) CleanerConfig() CleanerConfig
- func (b *Buffer) Close() (err error)
- func (b *Buffer) Diff(c Consumer) (int, bool)
- func (b *Buffer) Done() <-chan struct{}
- func (b *Buffer) NewConsumer() (Consumer, error)
- func (b *Buffer) Put(ctx context.Context, values ...interface{}) error
- func (b *Buffer) Range(ctx context.Context, c Consumer, fn func(index int, value interface{}) bool) error
- func (b *Buffer) SetCleanerConfig(config CleanerConfig) error
- func (b *Buffer) Size() int
- func (b *Buffer) Slice() []interface{}
- type CallOption
- type Callable
- type Channel
- type Cleaner
- type CleanerConfig
- type Consumer
- type Exclusive
- func (e *Exclusive) Call(key interface{}, value func() (interface{}, error)) (interface{}, error)
- func (e *Exclusive) CallAfter(key interface{}, value func() (interface{}, error), wait time.Duration) (interface{}, error)
- func (e *Exclusive) CallAfterAsync(key interface{}, value func() (interface{}, error), wait time.Duration) <-chan *ExclusiveOutcome
- func (e *Exclusive) CallAsync(key interface{}, value func() (interface{}, error)) <-chan *ExclusiveOutcome
- func (e *Exclusive) CallWithOptions(options ...ExclusiveOption) <-chan *ExclusiveOutcome
- func (e *Exclusive) Start(key interface{}, value func() (interface{}, error))
- func (e *Exclusive) StartAfter(key interface{}, value func() (interface{}, error), wait time.Duration)
- type ExclusiveOption
- func ExclusiveKey(value interface{}) ExclusiveOption
- func ExclusiveRateLimit(ctx context.Context, minDuration time.Duration) ExclusiveOption
- func ExclusiveStart(value bool) ExclusiveOption
- func ExclusiveValue(value func() (interface{}, error)) ExclusiveOption
- func ExclusiveWait(value time.Duration) ExclusiveOption
- func ExclusiveWork(value WorkFunc) ExclusiveOption
- func ExclusiveWrapper(value func(value WorkFunc) WorkFunc) ExclusiveOption
- type ExclusiveOutcome
- type FixedBufferCleanerNotification
- type Notifier
- func (n *Notifier) Publish(key interface{}, value interface{})
- func (n *Notifier) PublishContext(ctx context.Context, key interface{}, value interface{})
- func (n *Notifier) Subscribe(key interface{}, target interface{})
- func (n *Notifier) SubscribeCancel(ctx context.Context, key interface{}, target interface{}) context.CancelFunc
- func (n *Notifier) SubscribeContext(ctx context.Context, key interface{}, target interface{})
- func (n *Notifier) Unsubscribe(key interface{}, target interface{})
- type Producer
- type WorkFunc
- type Worker
- type Workers
Examples ¶
Constants ¶
const ( // DefaultCleanerCooldown is how long the cleaner will wait between checks of the Buffer by default. DefaultCleanerCooldown = time.Millisecond * 10 // DefaultChannelPollRate is how frequently each waiting Channel.Get should try to receive from the channel (from // the first failure/non-receive). DefaultChannelPollRate = time.Millisecond )
Variables ¶
This section is empty.
Functions ¶
func Call ¶ added in v1.15.1
func Call(caller Callable, options ...CallOption) error
Call will call a Callable with any provided options, available as functions provided by this package that are prefixed with "Call", see also MustCall
Example (Rpc) ¶
ExampleCall_rpc provides a (contrived) example of how Call may be used as part of an RPC implementation
var ( methods = map[string]Callable{ `add`: NewCallable(func(a, b int) int { return a + b }), `sum`: NewCallable(func(values ...int) (r int) { for _, v := range values { r += v } return }), `bounds`: NewCallable(func(values ...int) (min, max int, ok bool) { for _, value := range values { if !ok { min, max, ok = value, value, true } else { if value < min { min = value } if value > max { max = value } } } return }), } call = func(name string, args ...interface{}) (results []interface{}) { MustCall( methods[name], CallArgs(args...), CallResultsSlice(&results), ) return } p = func(name string, args ...interface{}) { results := func() (v interface{}) { defer func() { if v == nil { v = recover() } }() v = call(name, args...) return }() fmt.Printf("%s %v -> %v\n", name, args, results) } ) fmt.Println(`success:`) p(`add`, 1, 2) p(`bounds`, -123884, 4737, 9, 0, -99999992, 4, 6, 8324884383, -3) p(`bounds`) p(`bounds`, 2) p(`sum`, -123884, 4737, 9, 0, -99999992, 4, 6, 8324884383, -3) fmt.Println(`failure:`) p(`add`, 1, 2, 3) p(`add`, 1, 2.0) p(`bounds`, 2.0)
Output: success: add [1 2] -> [3] bounds [-123884 4737 9 0 -99999992 4 6 8324884383 -3] -> [-99999992 8324884383 true] bounds [] -> [0 0 false] bounds [2] -> [2 2 true] sum [-123884 4737 9 0 -99999992 4 6 8324884383 -3] -> [8224765260] failure: add [1 2 3] -> bigbuff.CallArgs args error: invalid length: mandatory=2 variadic=false len=3 add [1 2] -> bigbuff.CallArgs args[1] error: float64 not assignable to int bounds [2] -> bigbuff.CallArgs args[0] error: float64 not assignable to int
func CombineContext ¶
CombineContext returns a context based on the ctx (first param), that will cancel when ANY of the other provided context values cancel CAUTION this spawns one or more blocking goroutines, if you call this with contexts that don't cancel in the reasonable lifetime of your application you will have a leak
func DefaultCleaner ¶
DefaultCleaner is the Buffer's default cleaner, if there is at least one "active" consumer it returns the lowest offset, defaulting to 0, effectively removing values from the buffer that all consumers have read, note the return value is limited to >= 0 and <= size, active consumers are defined as those registered with offsets >= 0.
func ExponentialRetry ¶ added in v1.6.0
func ExponentialRetry(ctx context.Context, rate time.Duration, value func() (interface{}, error)) func() (interface{}, error)
ExponentialRetry implements a simple exponential back off and retry, via closure wrapper, as described on Wikipedia (https://en.wikipedia.org/wiki/Exponential_backoff) supporting context canceling (while waiting / before starting), configurable base rate / slot time which will default to 300ms if rate is <= 0 (SUBJECT TO CHANGE), and the ability to support fatal errors via use of the FatalError error wrapper function provided by this package. Notes: 1. This function will panic if value is nil, but NOT if ctx is nil (the latter is not recommended but the existing implementations have this behavior already). 2. The exit case triggered via use of the FatalError wrapper will include any accompanying result, as well as the unpacked error value (which will always be non-nil since FatalError will panic otherwise). 3. Before each call to value the context error will be checked, and if non-nil will be propagated as-is with a nil result. 4. This implementation uses the math/rand package.
func FatalError ¶ added in v1.6.0
FatalError wraps a given error to indicate to functions or methods that receive a closure that they should no longer continue to operate (applies to: ExponentialRetry), note that the error type will be transparently and recursively unpacked, for any return values, from said methods or functions, so DO NOT attempt to chain such calls without explicit handling at the top level for each fatal-able operation NOTE calls to this function with a nil err will trigger a panic
func LinearAttempt ¶ added in v1.11.0
LinearAttempt returns a new channel that will be published, at most, every rate, for a maximum total of count messages, and will be closed after either reaching count or context cancel, whichever comes first. Note that it is buffered and will start with a single value, and behaves identically to time.NewTicker in that it will attempt to keep a constant rate, but compensates for slow consumers, and in that the value received will be the time at which the last attempt was scheduled (the offset from the current time being equivalent to the conflation rate). Either rate or count being <= 0 or ctx being nil will trigger a panic. Note that the initial publish will happen inline, and context errors are guarded, meaning if the context returns an error when first checked then the returned channel will always be closed, with no values sent.
This implementation is designed to be iterated over, by using range, with resource freeing via context cancel. It is very useful when implementing something that will attempt to perform an action at a maximum rate, for a maximum amount of times, e.g. for linear retry logic.
Example (AtMostOneTickAfterCancel) ¶
ctx, cancel := context.WithCancel(context.Background()) defer cancel() i := 0 for range LinearAttempt(contextNeverDone{ctx}, time.Millisecond*50, 10) { i++ fmt.Printf("iteration #%d\n", i) if i < 5 { continue } cancel() time.Sleep(time.Millisecond * 100) fmt.Println(`canceled...`) } fmt.Printf("%d iterations\n", i)
Output: iteration #1 iteration #2 iteration #3 iteration #4 iteration #5 canceled... 5 iterations
Example (Full) ¶
defer func() func() { start := runtime.NumGoroutine() return func() { finish := runtime.NumGoroutine() if start < finish { panic(fmt.Sprint(`started with`, start, `goroutines but finished with`, finish)) } } }()() start := time.Now() for range LinearAttempt(context.Background(), time.Millisecond*200, 5) { fmt.Println(int64((time.Now().Sub(start) + (time.Millisecond * 100)) / (time.Millisecond * 200))) } time.Sleep(time.Millisecond * 300)
Output: 0 1 2 3 4
Example (SlowConsumer) ¶
defer func() func() { start := runtime.NumGoroutine() return func() { finish := runtime.NumGoroutine() if start < finish { panic(fmt.Sprint(`started with`, start, `goroutines but finished with`, finish)) } } }()() start := time.Now() r, c := func() (func(), func()) { c := LinearAttempt(context.Background(), time.Millisecond*200, 7) return func() { ts := <-c fmt.Println(int64((time.Now().Sub(start)+(time.Millisecond*100))/(time.Millisecond*200)), int64((ts.Sub(start)+(time.Millisecond*100))/(time.Millisecond*200))) }, func() { r := reflect.ValueOf(c) if v, ok := r.TryRecv(); ok || v.Type() != reflect.TypeOf(time.Time{}) { panic(c) } } }() defer c() defer time.Sleep(time.Millisecond * 300) time.Sleep(time.Millisecond * 325) r() r() r() r() time.Sleep(time.Millisecond * 525) r() r() r()
Output: 2 0 2 2 3 3 4 4 7 5 7 7 8 8
func MinDuration ¶ added in v1.10.0
MinDuration is a simple wrapper for the function signature used by Exclusive and Workers which adds a sleep for any remainder of a given duration, is intended to make it trivial to build a debounced rate limited partitioned worker implementation. NOTE a panic will occur if the duration d is not greater than 0, or if the function fn is nil.
func MustCall ¶ added in v1.15.1
func MustCall(caller Callable, options ...CallOption)
MustCall is equivalent to Call but will panic on error
func Range ¶
func Range(ctx context.Context, consumer Consumer, fn func(index int, value interface{}) bool) (err error)
Range iterates over the consumer, encapsulating automatic commits and rollbacks, including rollbacks caused by panics, note that the index will be the index in THIS range, starting at 0, and incrementing by one with each call to fn. NOTE: the ctx value will be passed into the consumer.Get as-is.
func WaitCond ¶
WaitCond performs a conditional wait against a *sync.Cond, waiting until fn returns true, with a inbuilt escape hatch for context cancel. Note that the relevant locker must be locked before this is called. It should also be noted that cond.L.Lock will before a context triggered broadcast, in order to avoid a race condition (i.e. if context is cancelled while fn is being evaluated).
Types ¶
type Buffer ¶
type Buffer struct {
// contains filtered or unexported fields
}
Buffer is the core implementation, implementing the Producer interface, and providing auxiliary methods for configuration, as well as `NewConsumer`, to instance a new consumer, note that though it is safe to instance via new(bigbuff.Buffer), it must not be copied after first use. It's behavior regarding message retention may be configured via SetCleanerConfig, by default it will un-buffer only messages that have been read by all "active and valid" consumers, given at least one exists, otherwise it will retain messages indefinitely. NOTE: the buffer itself will not be cleared even after close, so the data can still be accessed.
func (*Buffer) CleanerConfig ¶
func (b *Buffer) CleanerConfig() CleanerConfig
CleanerConfig returns the current cleaner config (which has defaults)
func (*Buffer) Diff ¶ added in v1.1.0
Diff is provided to facilitate ranging over a buffer via a consumer, and returns the items remaining in the buffer (includes uncommitted), be aware that the value CAN be negative, in the event the consumer fell behind (the cleaner cleared item(s) from the buffer that the consumer hadn't read yet, which by default will never happen, as the default mode is a unbounded buffer). Note it will return (0, false) for any invalid consumers or any not registered on the receiver buffer.
func (*Buffer) NewConsumer ¶
NewConsumer constructs a new consumer instance.
func (*Buffer) Range ¶ added in v1.1.0
func (b *Buffer) Range(ctx context.Context, c Consumer, fn func(index int, value interface{}) bool) error
Range provides a way to iterate from the start to the end of the buffer, note that it will exit as soon as it reaches the end of the buffer (unlike ranging on a channel), it simply utilizes the package Range + Buffer.Diff.
func (*Buffer) SetCleanerConfig ¶
func (b *Buffer) SetCleanerConfig(config CleanerConfig) error
SetCleanerConfig updates the cleaner config, returning an error if the config was invalid.
type CallOption ¶ added in v1.15.1
type CallOption func(config *callConfig) error
CallOption models a configuration option for the Call function provided by this package, see also the Call-prefixed functions provided by this package
func CallArgs ¶ added in v1.15.1
func CallArgs(args ...interface{}) CallOption
CallArgs returns a CallOption that will pass the provided args to a Callable that is called via the Call function
Example (FuncResults) ¶
MustCall( NewCallable(fmt.Println), CallArgs(func() (int, string, bool) { return 3, `multiple return values -> varargs`, true }()), )
Output: 3 multiple return values -> varargs true
func CallArgsRaw ¶ added in v1.15.1
func CallArgsRaw(args interface{}) CallOption
CallArgsRaw returns a CallOption that will pass args to Callable.Call without modification or validation
func CallResults ¶ added in v1.15.1
func CallResults(results ...interface{}) CallOption
CallResults returns a CallOption that will assign the call's results to the values pointed at by results
func CallResultsRaw ¶ added in v1.15.1
func CallResultsRaw(results interface{}) CallOption
CallResultsRaw returns a CallOption that will pass results to Callable.Call without modification or validation
func CallResultsSlice ¶ added in v1.15.1
func CallResultsSlice(target interface{}) CallOption
CallResultsSlice returns a CallOption that will append the call's results to a slice pointed at by target
type Callable ¶ added in v1.15.1
type Callable interface { // Type must return a valid type of kind func, corresponding to the type of the Callable Type() reflect.Type // Call accepts an args function and passes results into a results function, to be utilised in a manner // generally equivalent to `results(callable(args()))`, treating nil (interface values) as omitting any // args, or not handling any return value. Note that Call will return an error if args or results are not // compatible with the underlying types, indicated by Callable.Type. See also bigbuff.Call and CallOption. Call(args, results interface{}) error }
Callable models a function, and is used by this package to provide a higher-level mechanism (than the reflect package) for calling arbitrary functions, in a generic way, see also the NewCallable factory function
func NewCallable ¶ added in v1.15.1
func NewCallable(fn interface{}) Callable
NewCallable initialises a new Callable from fn, which must be a non-nil function, but is otherwise unconstrained, note a panic will occur if fn is not a function, or is a function but isn't non-nil
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel implements Consumer based on data from a channel, note that because it uses reflection and polling internally, it is actually safe to close the input channel without invalid zero value reads (though the Channel itself still needs to be closed, and any Get calls will still be blocked until then).
func NewChannel ¶
NewChannel constructs a new consumer that implements Consumer, but receives it's data from a channel, which uses reflection to support any readable channel, note that a poll date of zero will use the default, and < 0 is an error.
func (*Channel) Buffer ¶
func (c *Channel) Buffer() []interface{}
Buffer returns any values that were drained from the source but not committed yet, in a new copy of the internal buffer, note that if you are trying to ensure no messages get lost in the void, block until Channel.Done before calling this.
type Cleaner ¶
Cleaner is a callback used to manage the size of a bigbuff.Buffer instance, it will be called when relevant to do so, with the size of the buffer, and the consumer offsets (relative to the buffer), and should return the number of elements from the buffer that should be attempted to be shifted from the buffer.
func FixedBufferCleaner ¶
func FixedBufferCleaner( max int, target int, callback func(notification FixedBufferCleanerNotification), ) Cleaner
FixedBufferCleaner builds a cleaner that will give a buffer a fixed threshold size, which will trigger forced reduction back to a fixed target size, note that if callback is supplied it will be called with the details of the cleanup, in the event that it forces cleanup past the default. This has the effect of causing any consumers that were running behind the target size (in terms of their read position in the buffer) to fail on any further Get calls.
type CleanerConfig ¶
type CleanerConfig struct { // Cleaner is used to determine if items are removed from the buffer Cleaner Cleaner // Cooldown is the minimum time between cleanup cycles Cooldown time.Duration }
CleanerConfig is a configuration for a bigbuff.Buffer, that defines how the size is managed
type Consumer ¶
type Consumer interface { io.Closer // Done should return a channel that will be closed after internal resources have been freed, after a `Close` // call, which may not be explicit. This *may* mean that it blocks on any pending changes, and it *may* also // be possible that the consumer will be closed due to external reasons, e.g. connection closing. Done() <-chan struct{} // Get will get a message from the message buffer, at the current offset, blocking if none are available, or // an error if it fails. Get(ctx context.Context) (interface{}, error) // Commit will save any offset changes, and will return an error if it fails, or if the offset saved is the // latest. Commit() error // Rollback will undo any offset changes, and will return an error if it fails, or if the offset saved is the // latest. Rollback() error }
Consumer models a consumer in a producer-consumer pattern, where the resource will be closed at most once.
type Exclusive ¶ added in v1.2.1
type Exclusive struct {
// contains filtered or unexported fields
}
Exclusive provides synchronous de-bouncing of operations that may also return a result or error, with consistent or controlled input via provided closures also supported, and the use of any comparable keys to match on, it provides a guarantee that the actual call that returns a given value will be started AFTER the Call method, so keep that in mind when implementing something using it. You may also use the CallAfter method to delay execution after initialising the key, e.g. to allow the first of many costly operations on a given key a grace period to be grouped with the remaining keys.
func (*Exclusive) Call ¶ added in v1.2.1
Call uses a given key to ensure that the operation that the value callback represents will not be performed concurrently, and in the event that one or more operations are attempted while a given operation is still being performed, these operations will be grouped such that they are debounced to a single call, sharing the output.
Note that this method will panic if the receiver is nil, or the value is nil, but a nil key is allowed.
func (*Exclusive) CallAfter ¶ added in v1.2.1
func (e *Exclusive) CallAfter(key interface{}, value func() (interface{}, error), wait time.Duration) (interface{}, error)
CallAfter performs exactly the same operation as the Exclusive.Call method, but with an added wait to allow operations sent through in close succession to be grouped together, note that if wait is <= 0 it will be ignored.
func (*Exclusive) CallAfterAsync ¶ added in v1.3.0
func (e *Exclusive) CallAfterAsync(key interface{}, value func() (interface{}, error), wait time.Duration) <-chan *ExclusiveOutcome
CallAfterAsync behaves exactly the same as CallAfter but guarantees order (the value func) for synchronous calls.
Note that the return value will always be closed after being sent the result, and will therefore any additional reads will always receive nil.
func (*Exclusive) CallAsync ¶ added in v1.3.0
func (e *Exclusive) CallAsync(key interface{}, value func() (interface{}, error)) <-chan *ExclusiveOutcome
CallAsync behaves exactly the same as Call but guarantees order (the value func) for synchronous calls.
Note that the return value will always be closed after being sent the result, and will therefore any additional reads will always receive nil.
func (*Exclusive) CallWithOptions ¶ added in v1.15.1
func (e *Exclusive) CallWithOptions(options ...ExclusiveOption) <-chan *ExclusiveOutcome
CallWithOptions consolidates the various different ways to use Exclusive into a single method, to improve maintainability w/o breaking API compatibility. Other methods such as Call and Start will continue to be supported.
type ExclusiveOption ¶ added in v1.15.1
type ExclusiveOption func(c *exclusiveConfig)
ExclusiveOption passes configuration into Exclusive.CallWithOptions, see also package functions prefixed with Exclusive, such as ExclusiveKey and ExclusiveWork.
func ExclusiveKey ¶ added in v1.15.1
func ExclusiveKey(value interface{}) ExclusiveOption
ExclusiveKey configures a comparable value for grouping calls, for debouncing, limiting, etc.
func ExclusiveRateLimit ¶ added in v1.15.1
func ExclusiveRateLimit(ctx context.Context, minDuration time.Duration) ExclusiveOption
ExclusiveRateLimit is typically a drop-in replacement for MinDuration that works properly with non-start calls, and returns a ExclusiveWrapper option. Note that the context is for cleaning up the resources required to apply the rate limit (the rate limit itself), and will also be used to guard the actual work, for safety reasons.
func ExclusiveStart ¶ added in v1.15.1
func ExclusiveStart(value bool) ExclusiveOption
ExclusiveStart configures "start" behavior for the call, which, if true, will avoid the overhead required to propagate results, which will also cause the return value (outcome channel) to be nil. See also Exclusive.Start.
func ExclusiveValue ¶ added in v1.15.1
func ExclusiveValue(value func() (interface{}, error)) ExclusiveOption
ExclusiveValue implements a simpler style of ExclusiveWork (that was originally the only supported behavior).
func ExclusiveWait ¶ added in v1.15.1
func ExclusiveWait(value time.Duration) ExclusiveOption
ExclusiveWait configures the duration (since the start of the call) that should be waited, before actually calling the work function, see also Exclusive.CallAfter.
func ExclusiveWork ¶ added in v1.15.1
func ExclusiveWork(value WorkFunc) ExclusiveOption
ExclusiveWork configures the work function (what will actually get called).
func ExclusiveWrapper ¶ added in v1.15.1
func ExclusiveWrapper(value func(value WorkFunc) WorkFunc) ExclusiveOption
ExclusiveWrapper facilitates programmatic building of work, note that the ExclusiveWork or ExclusiveValue option must still be provided, but may be provided in any order (after or before this option). If there are multiple wrappers, they will be applied sequentially (left -> right is inner -> outer).
type ExclusiveOutcome ¶ added in v1.3.0
type ExclusiveOutcome struct { Result interface{} Error error }
ExclusiveOutcome is the return value from an async bigbuff.Exclusive call
type FixedBufferCleanerNotification ¶
type FixedBufferCleanerNotification struct { Max int // Max size before forced cleanup is triggered. Target int // Target size when force cleanup is triggered. Size int // Size when cleanup was triggered. Offsets []int // Offsets when cleanup was triggered. Trim int // Trim number returned. }
FixedBufferCleanerNotification is the context provided to the optional callback provided to the FixedBufferCleaner function.
type Notifier ¶ added in v1.7.0
type Notifier struct {
// contains filtered or unexported fields
}
Notifier is a tool which may be used to facilitate event handling using a fan out pattern, modeling a pattern that is better described as publish-subscribe rather than produce-consume, and tries to be semantically equivalent to implementations using channels guarded via context cancels using select statements.
It sits between the Exclusive and Buffer implementations in terms of behavior, yet it's use case is still distinct. Where Buffer shines when providing multiplexing or fanning out of serializable streams of messages, and Exclusive is explicitly designed to be attached to existing expensive tasks which need to occur as a result of multiple triggers, Notifier targets reactive behavior based on asynchronous operations. It provides basic event handling without any buffering or queuing between the producer and subscriber present in the layer before the actual target channels.
Note that it uses reflect internally, to avoid clients needing to rely on generic interface values.
Example (ContextCancelSubscribe) ¶
var ( nf Notifier k = 0 c = make(chan string) d = make(chan struct{}) ctx, cancel = context.WithCancel(context.Background()) ) defer cancel() nf.SubscribeContext(ctx, k, c) fmt.Println(`starting blocking publish then waiting a bit...`) go func() { defer close(d) fmt.Println(`publish start`) nf.Publish(k, `one`) fmt.Println(`publish finish`) }() time.Sleep(time.Millisecond * 100) fmt.Println(`canceling context then blocking for publish exit...`) cancel() <-d fmt.Println(`closing publish channel...`) close(c) time.Sleep(time.Millisecond * 50) fmt.Println(`success!`)
Output: starting blocking publish then waiting a bit... publish start canceling context then blocking for publish exit... publish finish closing publish channel... success!
Example (PubSubKeys) ¶
var ( k1 = `some-key` k2 = 100 c1 = make(chan string) c2 = make(chan string) c3 = make(chan string) wg sync.WaitGroup nf Notifier ) wg.Add(3) go func() { defer wg.Done() for v := range c1 { fmt.Println("c1 recv:", v) } }() go func() { defer wg.Done() for v := range c2 { fmt.Println("c2 recv:", v) } }() go func() { defer wg.Done() for v := range c3 { time.Sleep(time.Millisecond * 100) fmt.Println("c3 recv:", v) } }() nf.Subscribe(k1, c1) nf.Subscribe(k2, c2) nf.Subscribe(k1, c3) nf.Subscribe(k2, c3) nf.Publish(k1, `one`) time.Sleep(time.Millisecond * 200) nf.Publish(k2, `two`) close(c1) close(c2) close(c3) wg.Wait()
Output: c1 recv: one c3 recv: one c2 recv: two c3 recv: two
func (*Notifier) Publish ¶ added in v1.7.0
func (n *Notifier) Publish(key interface{}, value interface{})
Publish is equivalent of PublishContext(nil, key, value)
func (*Notifier) PublishContext ¶ added in v1.7.0
PublishContext will send value to the targets of all active subscribers for a given key for which value is assignable, blocking until ctx is canceled (if non-nil), or each relevant subscriber is either sent value or cancels it's context
func (*Notifier) Subscribe ¶ added in v1.7.0
func (n *Notifier) Subscribe(key interface{}, target interface{})
Subscribe is equivalent of SubscribeContext(nil, key, target)
func (*Notifier) SubscribeCancel ¶ added in v1.8.0
func (n *Notifier) SubscribeCancel(ctx context.Context, key interface{}, target interface{}) context.CancelFunc
SubscribeCancel wraps SubscribeContext and Unsubscribe as well as the initialisation of a sub context, for defer statements using the result as a one-liner, and is the most fool-proof way to implement a subscriber, at the cost of less direct management of resources (including some which are potentially unnecessary, as it uses a sub-context and the returned cancel obeys the contract of context.CancelFunc and does not perform Unsubscribe inline)
Example ¶
defer func() func() { startGoroutines := runtime.NumGoroutine() return func() { time.Sleep(time.Millisecond * 200) endGoroutines := runtime.NumGoroutine() if endGoroutines <= startGoroutines { fmt.Println(`our resources were freed`) } } }()() var ( nf Notifier ping = make(chan float64) pong = make(chan float64) ctx, cancel = context.WithCancel(context.Background()) ) defer cancel() defer nf.SubscribeCancel(nil, `ping`, ping)() defer nf.SubscribeCancel(nil, `pong`, pong)() go func() { // worker which will receive all values and respond with that value x2 // but first... sleep, to demonstrate it's not racey time.Sleep(time.Millisecond * 100) for { select { case <-ctx.Done(): fmt.Println(`worker exiting`) return case value := <-ping: nf.PublishContext(ctx, `pong`, value*2) } } }() fmt.Println(`PING 5 x 2 = ...`) nf.PublishContext(ctx, `ping`, 5.0) fmt.Println(`PONG`, <-pong) fmt.Println(`PING -23 x 2 = ...`) nf.PublishContext(ctx, `ping`, -23.0) fmt.Println(`PONG`, <-pong)
Output: PING 5 x 2 = ... PONG 10 PING -23 x 2 = ... PONG -46 worker exiting our resources were freed
func (*Notifier) SubscribeContext ¶ added in v1.7.0
SubscribeContext registers a given target channel as a subscriber for a given key, which will block any attempts to publish to the key unless it is received from appropriately, or until context cancel (if a non-nil context was provided), be sure to unsubscribe exactly once to free references to ctx and target. A panic will occur if target is not a channel to which the notifier can send, or if there already exists a subscription for the given key and target combination. The key may be any comparable value.
func (*Notifier) Unsubscribe ¶ added in v1.7.0
func (n *Notifier) Unsubscribe(key interface{}, target interface{})
Unsubscribe deregisters a given key and target from the notifier, an action that may be performed exactly once after each subscription (for the combination of key and target), preventing further messages from being published to the target, and allowing freeing of associated resources WARNING subscribe context should always be canceled before calling this, or it may deadlock (especially under load)
type Producer ¶
type Producer interface { io.Closer // Done should return a channel that will be closed after internal resources have been freed, after a `Close` // call, which may not be explicit. This *may* mean that it blocks on any pending changes, and it *may* also // be possible that the consumer will be closed due to external reasons, e.g. connection closing. Done() <-chan struct{} // Put will send the provided values in-order to the message buffer, or return an error. // It MUST NOT block in such a way that it will be possible to cause a deadlock locally. Put(ctx context.Context, values ...interface{}) error }
Producer models a producer in a producer-consumer pattern, where the resource will be closed at most once.
type WorkFunc ¶ added in v1.15.1
type WorkFunc func(resolve func(result interface{}, err error))
WorkFunc is a work function, as used by Exclusive.
type Worker ¶ added in v1.15.1
type Worker struct {
// contains filtered or unexported fields
}
Worker implements a background worker pattern, providing synchronisation around running at most a single worker, for n number of callers (of it's Do method).
Note that although the sync.WaitGroup is used internally, sync.Once is still preferable for simpler cases.
func (*Worker) Do ¶ added in v1.15.1
func (x *Worker) Do(fn func(stop <-chan struct{})) (done func())
Do will call fn in a new goroutine, if the receiver is not already running, and will always return a done func which must be called, to indicate when the worker is no longer in use. Once no callers are using a worker, that worker will be stopped. Stopping involves closing the stop channel, causing further calls to Do to block, until the worker finishes. A panic will occur if either the receiver or fn are nil.
type Workers ¶ added in v1.4.0
type Workers struct {
// contains filtered or unexported fields
}
Workers represents a dynamically resizable pool of workers, for when you want to have up to x number of operations happening at any given point in time, it can work directly with the Exclusive implementation.
func (*Workers) Call ¶ added in v1.4.0
Call will call value synchronously, with up to count concurrency (with other concurrent calls), note that it will panic if the receiver is nil, the count is <= 0, or the value is nil.