engine

package
v0.0.0-...-b9dc28f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2024 License: GPL-3.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OpenDB

func OpenDB(dburi string) (*sql.DB, error)

OpenDB opens a SQLite database and runs the database migrations.

func SqliteServerInterceptor

func SqliteServerInterceptor() grpc.UnaryServerInterceptor

SqliteServerInterceptor is a GRPC interceptor that translates low-level sqlite locking errors to meaningful GRPC error codes.

Types

type Dialer

type Dialer func(context.Context, string) (*grpc.ClientConn, error)

Dialer dials a network address. Override to set specific GRPC options. The default implementation just calls grpc.DialContext.

type Engine

type Engine struct {
	pb.UnimplementedInternalServer
	pb.UnimplementedWorkerServer
	pb.UnimplementedQueueServer
	// contains filtered or unexported fields
}

Engine implements a queue engine for a specific shard.

func New

func New(db *sql.DB, shard string, topology Topology, dialer Dialer) *Engine

New creates a new Engine with the specified parameters.

You're supposed to provide it with an open SQL database handler, and a set of network-related parameters to tell it how to find the other peer nodes in the service.

func (*Engine) Close

func (s *Engine) Close()

Close the engine and all associated resources.

func (*Engine) Done

Done reports a task termination from a worker. In order to free the worker from having to maintain any persistent state (consider the case where handling the task state transition involves making remote RPCs to an unresponsive shard), we do not process the transition right away but instead add it to a queue.

func (*Engine) GetLogs

func (s *Engine) GetLogs(req *pb.GetLogsRequest, stream pb.Queue_GetLogsServer) error

GetLogs streams logs for a task (logs can grow large).

func (*Engine) Keepalive

func (s *Engine) Keepalive(ctx context.Context, req *pb.KeepaliveRequest) (*emptypb.Empty, error)

Keepalive renews the lease for a running task on a worker.

func (*Engine) Log

func (s *Engine) Log(ctx context.Context, req *pb.LogRequest) (*emptypb.Empty, error)

Log stores a log fragment in the db.

func (*Engine) Poll

func (s *Engine) Poll(ctx context.Context, req *pb.PollRequest) (*pb.Task, error)

Poll returns a task to be processed by a worker, if any are available. An empty result is indicated not by an error (which would confuse the GRPC statistics) but by an empty message with no fields set.

When a task is ready to be run, the results of all its upstream dependencies will already be present on this shard.

func (*Engine) Query

func (s *Engine) Query(ctx context.Context, req *pb.QueryRequest) (*pb.TaskInfo, error)

Query the database for information about a specific task on this shard.

func (*Engine) ReportSuccess

func (s *Engine) ReportSuccess(ctx context.Context, req *pb.ReportSuccessRequest) (*pb.ReportSuccessResponse, error)

ReportSuccess is received when an upstream task has succeeded. We will store its result locally, so that it is available to the downstream task on this same shard. If the downstream task has no other dependencies, it will be marked as schedulable.

func (*Engine) ServeHTTP

func (s *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*Engine) SetCancel

func (s *Engine) SetCancel(ctx context.Context, req *pb.SetCancelRequest) (*pb.SetCancelResponse, error)

SetCancel is received when a downstream task has failed. This method is called recursively from the failed task's shard, so in order to avoid an extra GetTaskSpec RPC we directly return the list of upstream tasks to the caller.

func (*Engine) SetFailed

func (s *Engine) SetFailed(ctx context.Context, req *pb.SetFailedRequest) (*emptypb.Empty, error)

SetFailed is received when an upstream task has failed. Set the task state to FAILED, and cancel all of its upstream dependencies.

The recursive tree navigation is performed locally on this shard, to avoid building a complex network of nested RPCs.

func (*Engine) Submit

func (s *Engine) Submit(ctx context.Context, req *pb.SubmitRequest) (*pb.SubmitResponse, error)

Submit a task tree to the global queue service. The list of tasks must represent a single, connected, tree. This function acts as a proxy, and will call other shards to submit individual tasks.

In order to express task relationships, IDs in the submitted task tree are just placeholders, where only the *shard* part is significant: the unique ID will be randomly generated by this function and replaced.

Since we don't have global transactions, the order of individual task creation is important: this function will proceed from root to the leaves, so as to make sure that downstream tasks always exists (otherwise we'd risk errors when propagating results, should a task be run before we complete the creation of the rest of the tree).

func (*Engine) SubmitTask

func (s *Engine) SubmitTask(ctx context.Context, spec *pb.TaskSpec) (*emptypb.Empty, error)

SubmitTask creates an individual task on this shard.

type Topology

type Topology interface {
	GetShardAddr(string) (string, error)
}

Topology provides the network addresses corresponding to specific shards of the service.

func NewShardedTopology

func NewShardedTopology(pattern string) Topology

NewShardedTopology returns a Topology that dynamically builds network addresses based on a pattern. The pattern, in host:port format, must contain a literal '%s' token, that will be replaced by the shard ID to obtain the full network address.

func NewStaticTopology

func NewStaticTopology(peers map[string]string) Topology

NewStaticTopology returns a Topology with a statically-defined assignment of shards to addresses.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL