Documentation ¶
Overview ¶
Metafora is a library for building distributed work systems. It's masterless and extensible via core Balancer and Coordinator interfaces.
If you use the builtin FairBalancer and EtcdCoordinator, all you have to do is implement a Handler and HandlerFunc, and then run the Consumer.
Index ¶
- Variables
- func SetLogger(l LogOutputter)
- type Balancer
- type BalancerContext
- type Client
- type ClusterState
- type Command
- type Consumer
- type Coordinator
- type CoordinatorContext
- type FairBalancer
- type Handler
- type HandlerFunc
- type LogOutputter
- type ResourceBalancer
- type ResourceReporter
- type RunningTask
- type SleepBalancer
- type Task
Constants ¶
This section is empty.
Variables ¶
var BalanceEvery = 15 * time.Minute //TODO make balance wait configurable
var Debug func(v ...interface{}) = gou.Debug
var Debugf func(format string, v ...interface{}) = gou.Debugf
var DumbBalancer = dumbBalancer{}
DumbBalancer is the simplest possible balancer implementation which simply accepts all tasks. Since it has no state a single global instance exists.
var Error func(v ...interface{}) = gou.Error
var Errorf func(format string, v ...interface{}) = gou.Errorf
var Info func(v ...interface{}) = gou.Info
var Infof func(format string, v ...interface{}) = gou.Infof
var LogLevel int = gou.LogLevel
var NoDelay = time.Time{}
NoDelay is simply the zero value for time and meant to be a more meaningful value for CanClaim methods to return instead of initializing a new empty time struct.
var Warn func(v ...interface{}) = gou.Warn
var Warnf func(format string, v ...interface{}) = gou.Warnf
Functions ¶
Types ¶
type Balancer ¶
type Balancer interface { // Init is called once and only once before any other Balancer methods are // called. The context argument is meant to expose functionality that might // be useful for CanClaim and Balance implementations. Init(BalancerContext) // CanClaim should return true if the consumer should accept a task. // // When denying a claim by returning false, CanClaim should return the time // at which to reconsider the task for claiming. CanClaim(task Task) (ignoreUntil time.Time, claim bool) // Balance should return the list of Task IDs that should be released. The // criteria used to determine which tasks should be released is left up to // the implementation. Balance() (release []string) }
Balancer is the core task balancing interface. Without a master Metafora clusters are cooperatively balanced -- meaning each node needs to know how to balance itself.
func NewDefaultFairBalancer ¶
func NewDefaultFairBalancer(nodeid string, cs ClusterState) Balancer
NewDefaultFairBalancer creates a new FairBalancer but requires a ClusterState implementation to gain more information about the cluster than BalancerContext provides.
func NewDefaultFairBalancerWithThreshold ¶
func NewDefaultFairBalancerWithThreshold(nodeid string, cs ClusterState, threshold float64) Balancer
NewDefaultFairBalancerWithThreshold allows callers to override FairBalancer's default 120% task load release threshold.
type BalancerContext ¶
type BalancerContext interface { // Tasks returns a sorted list of task IDs owned by this Consumer. The // Consumer stops task manipulations during claiming and balancing, so the // list will be accurate unless a task naturally completes. Tasks() []RunningTask }
BalancerContext is a limited interface exposed to Balancers from the Consumer for access to limited Consumer state.
type Client ¶
type Client interface { // SubmitTask submits a task to the system, the task id must be unique. SubmitTask(Task) error // Delete a task DeleteTask(taskId string) error // SubmitCommand submits a command to a particular node. SubmitCommand(node string, command Command) error // Nodes retrieves the current set of registered nodes. Nodes() ([]string, error) }
type ClusterState ¶
type ClusterState interface { // Provide the current number of jobs NodeTaskCount() (map[string]int, error) }
Provides information about the cluster to be used by FairBalancer
type Command ¶
type Command interface { // Name returns the name of the command. Name() string // Parameters returns the parameters, if any, the command will be executed // with. Parameters() map[string]interface{} // Marshal turns a command into its wire representation. Marshal() ([]byte, error) }
Commands are a way clients can communicate directly with nodes for cluster maintenance.
Use the Command functions to generate implementations of this interface. Metafora's consumer will discard unknown commands.
func CommandBalance ¶
func CommandBalance() Command
CommandBalance forces the node's balancer.Balance method to be called even if frozen.
func CommandFreeze ¶
func CommandFreeze() Command
CommandFreeze stops all task watching and balancing.
func CommandStopTask ¶
CommandStopTask forces a node to stop a task even if frozen.
func CommandUnfreeze ¶
func CommandUnfreeze() Command
CommandUnfreeze resumes task watching and balancing.
func UnmarshalCommand ¶
Unmarshal parses a command from its wire representation.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is the core Metafora task runner.
func NewConsumer ¶
func NewConsumer(coord Coordinator, h HandlerFunc, b Balancer) (*Consumer, error)
NewConsumer returns a new consumer and calls Init on the Balancer and Coordinator.
func (*Consumer) Frozen ¶
Frozen returns true if Metafora is no longer watching for new tasks or rebalancing.
Metafora will remain frozen until receiving an Unfreeze command or it is restarted (frozen state is not persisted).
func (*Consumer) Run ¶
func (c *Consumer) Run()
Run is the core run loop of Metafora. It is responsible for calling into the Coordinator to claim work and Balancer to rebalance work.
Run blocks until Shutdown is called or an internal error occurs.
func (*Consumer) Shutdown ¶
func (c *Consumer) Shutdown()
Shutdown stops the main Run loop, calls Stop on all handlers, and calls Close on the Coordinator. Running tasks will be released for other nodes to claim.
func (*Consumer) Tasks ¶
func (c *Consumer) Tasks() []RunningTask
Tasks returns a lexicographically sorted list of running Task IDs.
type Coordinator ¶
type Coordinator interface { // Init is called once by the consumer to provide a Logger to Coordinator // implementations. NewConsumer will return Init's return value. Init(CoordinatorContext) error // Watch the broker for claimable tasks. Watch blocks until Close is called // or it encounters an error. Tasks are sent to consumer via the tasks chan. Watch(tasks chan<- Task) (err error) // Claim is called by the Consumer when a Balancer has determined that a task // ID can be claimed. Claim returns false if another consumer has already // claimed the ID. Claim(Task) bool // Release a task for other consumers to claim. May be called after Close. Release(Task) // Done is called by Metafora when a task has been completed and should never // be scheduled to run again (in other words: deleted from the broker). // // May be called after Close. Done(Task) // Command blocks until a command for this node is received from the broker // by the coordinator. Command must return (nil, nil) when Close is called. Command() (Command, error) // Close the coordinator. Stop waiting for tasks and commands. Remove node from broker. // // Do not release tasks. The consumer will handle task releasing. Close() // Name of the coordinator for use in logs and other tooling. Name() string }
Coordinator is the core interface Metafora uses to discover, claim, and tasks as well as receive commands.
type CoordinatorContext ¶
type CoordinatorContext interface { // Lost is called by the Coordinator when a claimed task is lost to another // node. The Consumer will stop the task locally. // // Since this implies there is a window of time where the task is executing // more than once, this is a sign of an unhealthy cluster. Lost(Task) }
CoordinatorContext is the context passed to coordinators by the core consumer.
type FairBalancer ¶
type FairBalancer struct {
// contains filtered or unexported fields
}
An implementation of Balancer which attempts to randomly release tasks in the case when the count of those currently running on this node is greater than some percentage of the cluster average (default 120%).
This balancer will claim all tasks which were not released on the last call to Balance.
func (*FairBalancer) Balance ¶
func (e *FairBalancer) Balance() []string
Balance releases tasks if this node has 120% more tasks than the average node in the cluster.
func (*FairBalancer) CanClaim ¶
func (e *FairBalancer) CanClaim(task Task) (time.Time, bool)
CanClaim rejects tasks for a period of time if the last balance released tasks. Otherwise all tasks are accepted.
func (*FairBalancer) Init ¶
func (e *FairBalancer) Init(s BalancerContext)
type Handler ¶
type Handler interface { // Run handles a task and blocks until completion or Stop is called. // // If Run returns true, Metafora will mark the task as Done via the // Coordinator. The task will not be rescheduled. // // If Run returns false, Metafora will Release the task via the Coordinator. // The task will be scheduled to run again. // // Panics are treated the same as returning true. Run() (done bool) // Stop signals to the handler to shutdown gracefully. Stop implementations // should not block until Run exits. // // Stop may be called more than once but calls are serialized. Implmentations // may perform different operations on subsequent calls to Stop to implement // graceful vs. forced shutdown conditions. // // Run probably wants to return false when stop is called, but this is left // up to the implementation as races between Run finishing and Stop being // called can happen. Stop() }
Handler is the core task handling interface. The Consumer will create a new Handler for each claimed task, call Run once and only once, and call Stop when the task should persist its progress and exit.
type HandlerFunc ¶
HandlerFunc is called by the Consumer to create a new Handler for each task.
HandlerFunc is meant to be the New function for handlers. Since Run and Stop are called concurrently, any state used by both should be initialized in the HandlerFunc. Since Handlerfunc is uninterruptable, only the minimum amount of work necessary to initialize a handler should be done.
func SimpleHandler ¶
func SimpleHandler(f func(t Task, stop <-chan bool) bool) HandlerFunc
SimpleHander creates a HandlerFunc for a simple function that accepts a stop channel. The channel will be closed when Stop is called.
type LogOutputter ¶
type ResourceBalancer ¶
type ResourceBalancer struct {
// contains filtered or unexported fields
}
ResourceBalancer is a balancer implemntation which uses two thresholds to limit claiming and rebalance work based upon a resource reported by a ResourceReporter. When the claim threshold is exceeded, no new work will be claimed. When the release threshold is exceeded work will be released until below that threshold. The claim threshold must be less than the release threshold (otherwise claims would continue just to have the work rebalanced.)
Even below the claim limit, claims are delayed by the percent of resources used (in milliseconds) to give less loaded nodes a claim advantage.
The balancer releases the oldest tasks first (skipping those who are already stopping) to try to prevent rebalancing the same tasks repeatedly within a cluster.
func NewResourceBalancer ¶
func NewResourceBalancer(src ResourceReporter, claimLimit, releaseLimit int) (*ResourceBalancer, error)
NewResourceBalancer creates a new ResourceBalancer or returns an error if the limits are invalid.
Limits should be a percentage expressed as an integer between 1 and 100 inclusive.
func (*ResourceBalancer) Balance ¶
func (b *ResourceBalancer) Balance() []string
func (*ResourceBalancer) CanClaim ¶
func (b *ResourceBalancer) CanClaim(string) bool
func (*ResourceBalancer) Init ¶
func (b *ResourceBalancer) Init(ctx BalancerContext)
type ResourceReporter ¶
type ResourceReporter interface { // Used returns the amount of a resource used and the total amount of that // resource. Used() (used uint64, total uint64) // String returns the unit resources are reported in. String() string }
ResourceReporter is required by the ResourceBalancer to read the resource being used for balancing.
type RunningTask ¶
type RunningTask interface { Task() Task // Started is the time the task was started by this consumer. Started() time.Time // Stopped is the first time Stop() was called on this task or zero is it has // yet to be called. Tasks may take an indeterminate amount of time to // shutdown after Stop() is called. Stopped() time.Time // Handler implementation called for this task. Handler() Handler }
RunningTask represents tasks running within a consumer.
type SleepBalancer ¶
type SleepBalancer struct {
// contains filtered or unexported fields
}
SleepBalancer is a simplistic Balancer implementation which sleeps 30ms per claimed task in its CanClaim() method. This means the node with the fewest claimed tasks in a cluster should sleep the shortest length of time and win the claim race.
It never releases tasks during Balance() calls.
func (*SleepBalancer) Balance ¶
func (*SleepBalancer) Balance() []string
Balance never returns any tasks for the sleepy balancer.
func (*SleepBalancer) CanClaim ¶
func (b *SleepBalancer) CanClaim(string) bool
CanClaim sleeps 30ms per claimed task.
func (*SleepBalancer) Init ¶
func (b *SleepBalancer) Init(ctx BalancerContext)
Init is called by the Consumer.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
Package metcdv3 contains implementations of all Metafora interfaces using etcd as the broker/backing store.
|
Package metcdv3 contains implementations of all Metafora interfaces using etcd as the broker/backing store. |
testutil
Package testutil is a collection of utilities for use by Metafora's etcd tests.
|
Package testutil is a collection of utilities for use by Metafora's etcd tests. |
Statemachine is a featureful statemachine implementation for Metafora handlers to use.
|
Statemachine is a featureful statemachine implementation for Metafora handlers to use. |