Documentation ¶
Index ¶
- func OpenDB(dburi string) (*sql.DB, error)
- func SqliteServerInterceptor() grpc.UnaryServerInterceptor
- type Dialer
- type Engine
- func (s *Engine) Close()
- func (s *Engine) Done(ctx context.Context, ts *pb.TaskStateTransition) (*emptypb.Empty, error)
- func (s *Engine) GetLogs(req *pb.GetLogsRequest, stream pb.Queue_GetLogsServer) error
- func (s *Engine) Keepalive(ctx context.Context, req *pb.KeepaliveRequest) (*emptypb.Empty, error)
- func (s *Engine) Log(ctx context.Context, req *pb.LogRequest) (*emptypb.Empty, error)
- func (s *Engine) Poll(ctx context.Context, req *pb.PollRequest) (*pb.Task, error)
- func (s *Engine) Query(ctx context.Context, req *pb.QueryRequest) (*pb.TaskInfo, error)
- func (s *Engine) ReportSuccess(ctx context.Context, req *pb.ReportSuccessRequest) (*pb.ReportSuccessResponse, error)
- func (s *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request)
- func (s *Engine) SetCancel(ctx context.Context, req *pb.SetCancelRequest) (*pb.SetCancelResponse, error)
- func (s *Engine) SetFailed(ctx context.Context, req *pb.SetFailedRequest) (*emptypb.Empty, error)
- func (s *Engine) Submit(ctx context.Context, req *pb.SubmitRequest) (*pb.SubmitResponse, error)
- func (s *Engine) SubmitTask(ctx context.Context, spec *pb.TaskSpec) (*emptypb.Empty, error)
- type Topology
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OpenDB ¶
OpenDB opens a SQLite database and runs the database migrations.
func SqliteServerInterceptor ¶
func SqliteServerInterceptor() grpc.UnaryServerInterceptor
SqliteServerInterceptor is a GRPC interceptor that translates low-level sqlite locking errors to meaningful GRPC error codes.
Types ¶
type Dialer ¶
Dialer dials a network address. Override to set specific GRPC options. The default implementation just calls grpc.DialContext.
type Engine ¶
type Engine struct { pb.UnimplementedInternalServer pb.UnimplementedWorkerServer pb.UnimplementedQueueServer // contains filtered or unexported fields }
Engine implements a queue engine for a specific shard.
func New ¶
New creates a new Engine with the specified parameters.
You're supposed to provide it with an open SQL database handler, and a set of network-related parameters to tell it how to find the other peer nodes in the service.
func (*Engine) Done ¶
Done reports a task termination from a worker. In order to free the worker from having to maintain any persistent state (consider the case where handling the task state transition involves making remote RPCs to an unresponsive shard), we do not process the transition right away but instead add it to a queue.
func (*Engine) GetLogs ¶
func (s *Engine) GetLogs(req *pb.GetLogsRequest, stream pb.Queue_GetLogsServer) error
GetLogs streams logs for a task (logs can grow large).
func (*Engine) Keepalive ¶
Keepalive renews the lease for a running task on a worker.
func (*Engine) Log ¶
Log stores a log fragment in the db.
func (*Engine) Poll ¶
Poll returns a task to be processed by a worker, if any are available. An empty result is indicated not by an error (which would confuse the GRPC statistics) but by an empty message with no fields set.
When a task is ready to be run, the results of all its upstream dependencies will already be present on this shard.
func (*Engine) Query ¶
Query the database for information about a specific task on this shard.
func (*Engine) ReportSuccess ¶
func (s *Engine) ReportSuccess(ctx context.Context, req *pb.ReportSuccessRequest) (*pb.ReportSuccessResponse, error)
ReportSuccess is received when an upstream task has succeeded. We will store its result locally, so that it is available to the downstream task on this same shard. If the downstream task has no other dependencies, it will be marked as schedulable.
func (*Engine) SetCancel ¶
func (s *Engine) SetCancel(ctx context.Context, req *pb.SetCancelRequest) (*pb.SetCancelResponse, error)
SetCancel is received when a downstream task has failed. This method is called recursively from the failed task's shard, so in order to avoid an extra GetTaskSpec RPC we directly return the list of upstream tasks to the caller.
func (*Engine) SetFailed ¶
SetFailed is received when an upstream task has failed. Set the task state to FAILED, and cancel all of its upstream dependencies.
The recursive tree navigation is performed locally on this shard, to avoid building a complex network of nested RPCs.
func (*Engine) Submit ¶
func (s *Engine) Submit(ctx context.Context, req *pb.SubmitRequest) (*pb.SubmitResponse, error)
Submit a task tree to the global queue service. The list of tasks must represent a single, connected, tree. This function acts as a proxy, and will call other shards to submit individual tasks.
In order to express task relationships, IDs in the submitted task tree are just placeholders, where only the *shard* part is significant: the unique ID will be randomly generated by this function and replaced.
Since we don't have global transactions, the order of individual task creation is important: this function will proceed from root to the leaves, so as to make sure that downstream tasks always exists (otherwise we'd risk errors when propagating results, should a task be run before we complete the creation of the rest of the tree).
type Topology ¶
Topology provides the network addresses corresponding to specific shards of the service.
func NewShardedTopology ¶
NewShardedTopology returns a Topology that dynamically builds network addresses based on a pattern. The pattern, in host:port format, must contain a literal '%s' token, that will be replaced by the shard ID to obtain the full network address.