orchestrator

package
v2.33.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2024 License: MIT Imports: 12 Imported by: 0

README

This is the orchestrator itself

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnknownFromNode         = errors.New("unknown 'from' node")
	ErrUnknownToNode           = errors.New("unknown 'to' node")
	ErrFromNodeNotRunningShard = errors.New("'from' node not running shard")
	ErrNodeBusy                = errors.New("node is busy")
)
View Source
var (
	ErrShardAlreadyRunning = errors.New("shard already running")
	ErrUnknownNode         = errors.New("unknown node")
)
View Source
var (
	ErrNoNodeLauncher = errors.New("orchestrator.NodeLauncher is nil")
)

Functions

This section is empty.

Types

type IDGenerator

type IDGenerator interface {
	GenerateID() (string, error)
}

type NodeConn

type NodeConn struct {
	Orchestrator *Orchestrator
	Conn         *dshardorchestrator.Conn
	// contains filtered or unexported fields
}

NodeConn represents a connection from a master server to a slave

func (*NodeConn) GetFullStatus

func (nc *NodeConn) GetFullStatus() *NodeStatus

GetFullStatus returns the current status of the node

func (*NodeConn) Shutdown

func (nc *NodeConn) Shutdown()

Shutdown tells the node to shut down compleely

func (*NodeConn) StartShards

func (nc *NodeConn) StartShards(shards ...int)

StartShards tells the node to start the following shards

func (*NodeConn) StopShard

func (nc *NodeConn) StopShard(shard int)

StopShard tells the node to stop the provided shard

type NodeIDProvider

type NodeIDProvider interface {
	GenerateID() string
}

type NodeLauncher

type NodeLauncher interface {
	// Launches a new node, returning the id and a error if something went wrong
	LaunchNewNode() (nodeID string, err error)

	// Retrieves the version of nodes we would launch if we were to call LaunchNewNode()
	// for example, the vesion of the binary deployed on the server.
	LaunchVersion() (version string, err error)
}

NodeLauncher is responsible for the logic of spinning up new processes and also receiving the version of the node that would be launched

func NewNodeLauncher

func NewNodeLauncher(cmdName string, args []string, idGen IDGenerator) NodeLauncher

type NodeStatus

type NodeStatus struct {
	ID                 string
	Version            string
	SessionEstablished bool
	Shards             []int
	Connected          bool
	DisconnectedAt     time.Time
	Blacklisted        bool

	MigratingFrom  string
	MigratingTo    string
	MigratingShard int
}

type Orchestrator

type Orchestrator struct {
	ShardCountProvider RecommendTotalShardCountProvider
	NodeLauncher       NodeLauncher
	Logger             dshardorchestrator.Logger
	VersionUpdater     VersionUpdater

	// this is for running in a multi-host mode
	// this allows you to have 1 shard orchestrator per host, then only have that orchestator care about the specified shards
	// this also requires that the total shard count is fixed.
	// FixedTotalShardCount will be ignored if <1
	// ResponsibleForShards will be ignored if len < 1
	FixedTotalShardCount int
	ResponsibleForShards []int

	// if set, the orchestrator will make sure that all the shards are always running
	EnsureAllShardsRunning bool

	// For large bot sharding the bucket size should be 16
	// the orchestrator will only put shards in the same (bucket/bucketspernode) on the same node
	ShardBucketSize int
	// The number of buckets per node, this * shardBucketSize should equal to the actual bots bucket size, but this allows more gradual control of the startup process
	BucketsPerNode int

	// the max amount of downtime for a node before we consider it dead and it will start a new node for those shards
	// if set to below zero then it will not perform the restart at all
	MaxNodeDowntimeBeforeRestart time.Duration

	// the maximum amount of shards per node, note that this is solely for the automated tasks the orchestrator provides
	// and you can still go over it if you manually start shards on a node
	MaxShardsPerNode int

	// in case we are intiailizing max shards from nodes, we wait 10 seconds when we start before we decide we need to fetch a fresh shard count
	SkipSafeStartupDelayMaxShards bool
	// contains filtered or unexported fields
}

func NewStandardOrchestrator

func NewStandardOrchestrator(session *discordgo.Session) *Orchestrator

func (*Orchestrator) BlacklistNode

func (o *Orchestrator) BlacklistNode(node string)

BlacklistNode blacklists a node from beign designated new shards

func (*Orchestrator) FindNodeByID

func (o *Orchestrator) FindNodeByID(id string) *NodeConn

func (*Orchestrator) GetFullNodesStatus

func (o *Orchestrator) GetFullNodesStatus() []*NodeStatus

GetFullNodesStatus returns the full status of all nodes

func (*Orchestrator) Log

func (o *Orchestrator) Log(level dshardorchestrator.LogLevel, err error, msg string)

Log will log to the designated logger or he standard logger

func (*Orchestrator) MigrateAllNodesToNewNodes

func (o *Orchestrator) MigrateAllNodesToNewNodes(returnOnError bool) error

MigrateAllNodesToNewNodes performs a full migration of all nodes if returnOnError is set then it will return when one of the nodes fail migrating, otherwise it will just log and continue onto the next

func (*Orchestrator) MigrateFullNode

func (o *Orchestrator) MigrateFullNode(fromNode string, toNodeID string, shutdownOldNode bool) error

MigrateFullNode migrates all the shards on the origin node to the destination node optionally also shutting the origin node down at the end

func (*Orchestrator) NewNodeConn

func (o *Orchestrator) NewNodeConn(netConn net.Conn) *NodeConn

NewNodeConn creates a new NodeConn (connection from master to slave) from a net.Conn

func (*Orchestrator) ShutdownNode

func (o *Orchestrator) ShutdownNode(nodeID string) error

ShutdownNode shuts down the specified node

func (*Orchestrator) Start

func (o *Orchestrator) Start(listenAddr string) error

Start will start the orchestrator, and start to listen for clients on the specified address IMPORTANT: opening this up to the outer internet is bad because there's no authentication.

func (*Orchestrator) StartNewNode

func (o *Orchestrator) StartNewNode() (string, error)

StartNewNode will launch a new node, it will not wait for it to connect

func (*Orchestrator) StartShardMigration

func (o *Orchestrator) StartShardMigration(toNodeID string, shardID int) error

StartShardMigration attempts to start a shard migration, moving shardID from a origin node to a destination node

func (*Orchestrator) StartShards

func (o *Orchestrator) StartShards(nodeID string, shards ...int) error

StartShard will start the specified shard on the specified node it will return ErrShardAlreadyRunning if the shard is running on another node already

func (*Orchestrator) Stop

func (o *Orchestrator) Stop()

Stop will stop the orchestrator and the monitor

func (*Orchestrator) StopShard

func (o *Orchestrator) StopShard(shard int) error

StopShard will stop the specified shard on whatever node it's running on, or do nothing if it's not running

func (*Orchestrator) WaitForShardMigration

func (o *Orchestrator) WaitForShardMigration(fromNode *NodeConn, toNode *NodeConn, shardID int)

WaitForShardMigration blocks until a shard migration is complete

type RecommendTotalShardCountProvider

type RecommendTotalShardCountProvider interface {
	GetTotalShardCount() (int, error)
}

RecommendTotalShardCountProvider will only be called when a fresh new shard count is needed this is only the case if: no nodes were running with any shards 10 seconds after the orchestrator starts up

if new nodes without shards connect during this period, they will be put on hold while waiting for nodes with running shards to re-identify with the orchestrator, thereby keeping the total shard count across restarts of the orchestrator in a fairly realiable manner

but using this interface you can implement completely fine grained control (say storing the shard count in persistent store and updating it manually)

type StdNodeLauncher

type StdNodeLauncher struct {
	IDGenerator   IDGenerator
	LaunchCmdName string
	LaunchArgs    []string

	VersionCmdName string
	VersionArgs    []string
	// contains filtered or unexported fields
}

func (*StdNodeLauncher) GenerateID

func (nl *StdNodeLauncher) GenerateID() string

func (*StdNodeLauncher) LaunchNewNode

func (nl *StdNodeLauncher) LaunchNewNode() (string, error)

LaunchNewNode implements NodeLauncher.LaunchNewNode

func (*StdNodeLauncher) LaunchVersion

func (nl *StdNodeLauncher) LaunchVersion() (string, error)

LaunchVersion implements NodeLauncher.LaunchVersion

func (*StdNodeLauncher) PrintOutput

func (nl *StdNodeLauncher) PrintOutput(reader io.Reader)

type StdShardCountProvider

type StdShardCountProvider struct {
	DiscordSession *discordgo.Session
}

StdShardCountProvider is a standard implementation of RecommendedShardCountProvider

func (*StdShardCountProvider) GetTotalShardCount

func (sc *StdShardCountProvider) GetTotalShardCount() (int, error)

type VersionUpdater

type VersionUpdater interface {
	PullNewVersion() (newVersion string, err error)
}

VersionUpdater is reponsible for updating the deployment, for example pulling a new version from a CI server

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL