Documentation ¶
Overview ¶
Package governor controls the concurrency of a network wide process
Using this one can, for example, create CRON jobs that can trigger 100s or 1000s concurrently but where most will wait for a set limit to complete. In effect limiting the overall concurrency of these execution.
To do this a Stream is created that has a maximum message limit and that will reject new entries when full.
Workers will try to place themselves in the Stream, they do their work if they succeed and remove themselves from the Stream once they are done.
As a fail safe the stack will evict entries after a set time based on Stream max age.
A manager is included to create, observe and edit these streams and the choria CLI has a new command build on this library: choria governor
Index ¶
Constants ¶
const DefaultInterval = 250 * time.Millisecond
DefaultInterval default sleep between tries, set with WithInterval()
Variables ¶
This section is empty.
Functions ¶
func StreamName ¶
Types ¶
type Finisher ¶
type Finisher func() error
Finisher signals that work is completed releasing the slot on the stack
type Governor ¶
type Governor interface { // Start attempts to get a spot in the Governor, gives up on context, call Finisher to signal end of work Start(ctx context.Context, name string) (fin Finisher, seq uint64, err error) // Connection is the NATS connection used to communicate Connection() *nats.Conn }
Governor controls concurrency of distributed processes using a named governor stream
type Logger ¶
type Logger interface { Debugf(format string, a ...any) Infof(format string, a ...any) Warnf(format string, a ...any) Errorf(format string, a ...any) }
Logger is a custom logger
type Manager ¶
type Manager interface { // Limit is the configured maximum entries in the Governor Limit() int64 // MaxAge is the time after which entries will be evicted MaxAge() time.Duration // Name is the Governor name Name() string // Replicas is how many data replicas are kept of the data Replicas() int // SetLimit configures the maximum entries in the Governor and takes immediate effect SetLimit(uint64) error // SetMaxAge configures the maximum age of entries, takes immediate effect SetMaxAge(time.Duration) error // SetSubject configures the underlying NATS subject the Governor listens on for entry campaigns SetSubject(subj string) error // Stream is the underlying JetStream stream Stream() *jsm.Stream // Subject is the subject the Governor listens on for entry campaigns Subject() string // Reset resets the governor removing all current entries from it Reset() error // Active is the number of active entries in the Governor Active() (uint64, error) // Evict removes an entry from the Governor given its unique id, returns the name that was on that entry Evict(entry uint64) (name string, err error) // LastActive returns the the since entry was added to the Governor, can be zero time when no entries were added LastActive() (time.Time, error) // Connection is the NATS connection used to communicate Connection() *nats.Conn }
Manager controls concurrent executions of work distributed throughout a nats network by using a stream as a capped stack where workers reserve a slot and later release the slot
type Option ¶
type Option func(mgr *jsGMgr)
func WithBackoff ¶
WithBackoff sets a backoff policy for gradually reducing try interval
func WithInterval ¶
WithInterval sets the interval between tries
func WithLogger ¶
WithLogger configures the logger to use, no logging when none is given
func WithSubject ¶
WithSubject configures a specific subject for the governor to act on
func WithoutLeavingOnCompletion ¶
func WithoutLeavingOnCompletion() Option
WithoutLeavingOnCompletion prevents removal from the governor after execution