Documentation ¶
Index ¶
- Constants
- Variables
- type Cluster
- type Config
- type Dispatcher
- func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*api.HeartbeatResponse, error)
- func (d *Dispatcher) NodeCount() int
- func (d *Dispatcher) Run(ctx context.Context) error
- func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error
- func (d *Dispatcher) Stop() error
- func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServer) error
- func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error)
Constants ¶
const ( // DefaultHeartBeatPeriod is used for setting default value in cluster config // and in case if cluster config is missing. DefaultHeartBeatPeriod = 5 * time.Second )
Variables ¶
var ( // ErrNodeAlreadyRegistered returned if node with same ID was already // registered with this dispatcher. ErrNodeAlreadyRegistered = errors.New("node already registered") // ErrNodeNotRegistered returned if node with such ID wasn't registered // with this dispatcher. ErrNodeNotRegistered = errors.New("node not registered") // ErrSessionInvalid returned when the session in use is no longer valid. // The node should re-register and start a new session. ErrSessionInvalid = errors.New("session invalid") // ErrNodeNotFound returned when the Node doesn't exists in raft. ErrNodeNotFound = errors.New("node not found") )
Functions ¶
This section is empty.
Types ¶
type Cluster ¶
type Cluster interface { GetMemberlist() map[uint64]*api.RaftMember MemoryStore() *store.MemoryStore }
Cluster is interface which represent raft cluster. mananger/state/raft.Node is implenents it. This interface needed only for easier unit-testing.
type Config ¶
type Config struct { // Addr configures the address the dispatcher reports to agents. Addr string HeartbeatPeriod time.Duration HeartbeatEpsilon time.Duration GracePeriodMultiplier int }
Config is configuration for Dispatcher. For default you should use DefautConfig.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns default config for Dispatcher.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher is responsible for dispatching tasks and tracking agent health.
func New ¶
func New(cluster Cluster, c *Config) *Dispatcher
New returns Dispatcher with cluster interface(usually raft.Node). NOTE: each handler which does something with raft must add to Dispatcher.wg
func (*Dispatcher) Heartbeat ¶
func (d *Dispatcher) Heartbeat(ctx context.Context, r *api.HeartbeatRequest) (*api.HeartbeatResponse, error)
Heartbeat is heartbeat method for nodes. It returns new TTL in response. Node should send new heartbeat earlier than now + TTL, otherwise it will be deregistered from dispatcher and its status will be updated to NodeStatus_DOWN
func (*Dispatcher) NodeCount ¶
func (d *Dispatcher) NodeCount() int
NodeCount returns number of nodes which connected to this dispatcher.
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run(ctx context.Context) error
Run runs dispatcher tasks which should be run on leader dispatcher. Dispatcher can be stopped with cancelling ctx or calling Stop().
func (*Dispatcher) Session ¶
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error
Session is stream which controls agent connection. Each message contains list of backup Managers with weights. Also there is special boolean field Disconnect which if true indicates that node should reconnect to another Manager immediately.
func (*Dispatcher) Stop ¶
func (d *Dispatcher) Stop() error
Stop stops dispatcher and closes all grpc streams.
func (*Dispatcher) Tasks ¶
func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServer) error
Tasks is a stream of tasks state for node. Each message contains full list of tasks which should be run on node, if task is not present in that list, it should be terminated.
func (*Dispatcher) UpdateTaskStatus ¶
func (d *Dispatcher) UpdateTaskStatus(ctx context.Context, r *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error)
UpdateTaskStatus updates status of task. Node should send such updates on every status change of its tasks.