Documentation ¶
Index ¶
- Constants
- Variables
- func Cancel(ctx context.Context, pipelineRcName string, etcdClient *etcd.Client, ...) error
- func Conns(ctx context.Context, pipelineRcName string, etcdClient *etcd.Client, ...) ([]*grpc.ClientConn, error)
- func HashDatum(pipelineName string, pipelineSalt string, data []*Input) string
- func HashDatum15(pipelineInfo *pps.PipelineInfo, data []*Input) (string, error)
- func MatchDatum(filter []string, data []*pps.InputFile) bool
- func RegisterWorkerServer(s *grpc.Server, srv WorkerServer)
- func Status(ctx context.Context, pipelineRcName string, etcdClient *etcd.Client, ...) ([]*pps.WorkerStatus, error)
- type APIServer
- type CancelRequest
- func (*CancelRequest) Descriptor() ([]byte, []int)
- func (m *CancelRequest) GetDataFilters() []string
- func (m *CancelRequest) GetJobID() string
- func (m *CancelRequest) Marshal() (dAtA []byte, err error)
- func (m *CancelRequest) MarshalTo(dAtA []byte) (int, error)
- func (*CancelRequest) ProtoMessage()
- func (m *CancelRequest) Reset()
- func (m *CancelRequest) Size() (n int)
- func (m *CancelRequest) String() string
- func (m *CancelRequest) Unmarshal(dAtA []byte) error
- func (m *CancelRequest) XXX_DiscardUnknown()
- func (m *CancelRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *CancelRequest) XXX_Merge(src proto.Message)
- func (m *CancelRequest) XXX_Size() int
- func (m *CancelRequest) XXX_Unmarshal(b []byte) error
- type CancelResponse
- func (*CancelResponse) Descriptor() ([]byte, []int)
- func (m *CancelResponse) GetSuccess() bool
- func (m *CancelResponse) Marshal() (dAtA []byte, err error)
- func (m *CancelResponse) MarshalTo(dAtA []byte) (int, error)
- func (*CancelResponse) ProtoMessage()
- func (m *CancelResponse) Reset()
- func (m *CancelResponse) Size() (n int)
- func (m *CancelResponse) String() string
- func (m *CancelResponse) Unmarshal(dAtA []byte) error
- func (m *CancelResponse) XXX_DiscardUnknown()
- func (m *CancelResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *CancelResponse) XXX_Merge(src proto.Message)
- func (m *CancelResponse) XXX_Size() int
- func (m *CancelResponse) XXX_Unmarshal(b []byte) error
- type ChunkState
- func (*ChunkState) Descriptor() ([]byte, []int)
- func (m *ChunkState) GetDatumID() string
- func (m *ChunkState) GetState() State
- func (m *ChunkState) Marshal() (dAtA []byte, err error)
- func (m *ChunkState) MarshalTo(dAtA []byte) (int, error)
- func (*ChunkState) ProtoMessage()
- func (m *ChunkState) Reset()
- func (m *ChunkState) Size() (n int)
- func (m *ChunkState) String() string
- func (m *ChunkState) Unmarshal(dAtA []byte) error
- func (m *ChunkState) XXX_DiscardUnknown()
- func (m *ChunkState) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *ChunkState) XXX_Merge(src proto.Message)
- func (m *ChunkState) XXX_Size() int
- func (m *ChunkState) XXX_Unmarshal(b []byte) error
- type Client
- type DatumFactory
- type Input
- func (*Input) Descriptor() ([]byte, []int)
- func (m *Input) GetBranch() string
- func (m *Input) GetEmptyFiles() bool
- func (m *Input) GetFileInfo() *pfs.FileInfo
- func (m *Input) GetGitURL() string
- func (m *Input) GetLazy() bool
- func (m *Input) GetName() string
- func (m *Input) GetParentCommit() *pfs.Commit
- func (m *Input) Marshal() (dAtA []byte, err error)
- func (m *Input) MarshalTo(dAtA []byte) (int, error)
- func (*Input) ProtoMessage()
- func (m *Input) Reset()
- func (m *Input) Size() (n int)
- func (m *Input) String() string
- func (m *Input) Unmarshal(dAtA []byte) error
- func (m *Input) XXX_DiscardUnknown()
- func (m *Input) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *Input) XXX_Merge(src proto.Message)
- func (m *Input) XXX_Size() int
- func (m *Input) XXX_Unmarshal(b []byte) error
- type MergeState
- func (*MergeState) Descriptor() ([]byte, []int)
- func (m *MergeState) GetSizeBytes() uint64
- func (m *MergeState) GetState() State
- func (m *MergeState) GetStatsSizeBytes() uint64
- func (m *MergeState) GetStatsTree() *pfs.Object
- func (m *MergeState) GetTree() *pfs.Object
- func (m *MergeState) Marshal() (dAtA []byte, err error)
- func (m *MergeState) MarshalTo(dAtA []byte) (int, error)
- func (*MergeState) ProtoMessage()
- func (m *MergeState) Reset()
- func (m *MergeState) Size() (n int)
- func (m *MergeState) String() string
- func (m *MergeState) Unmarshal(dAtA []byte) error
- func (m *MergeState) XXX_DiscardUnknown()
- func (m *MergeState) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *MergeState) XXX_Merge(src proto.Message)
- func (m *MergeState) XXX_Size() int
- func (m *MergeState) XXX_Unmarshal(b []byte) error
- type Plan
- func (*Plan) Descriptor() ([]byte, []int)
- func (m *Plan) GetChunks() []int64
- func (m *Plan) GetMerges() int64
- func (m *Plan) Marshal() (dAtA []byte, err error)
- func (m *Plan) MarshalTo(dAtA []byte) (int, error)
- func (*Plan) ProtoMessage()
- func (m *Plan) Reset()
- func (m *Plan) Size() (n int)
- func (m *Plan) String() string
- func (m *Plan) Unmarshal(dAtA []byte) error
- func (m *Plan) XXX_DiscardUnknown()
- func (m *Plan) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *Plan) XXX_Merge(src proto.Message)
- func (m *Plan) XXX_Size() int
- func (m *Plan) XXX_Unmarshal(b []byte) error
- type State
- type WorkerClient
- type WorkerServer
Constants ¶
const (
// PrometheusPort is the port the aggregated metrics are served on for scraping
PrometheusPort = 9090
)
const (
// WorkerEtcdPrefix is the prefix in etcd that we use to store worker information.
WorkerEtcdPrefix = "workers"
)
Variables ¶
var ( ErrInvalidLengthWorkerService = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowWorkerService = fmt.Errorf("proto: integer overflow") )
var State_name = map[int32]string{
0: "RUNNING",
1: "COMPLETE",
3: "FAILED",
}
var State_value = map[string]int32{
"RUNNING": 0,
"COMPLETE": 1,
"FAILED": 3,
}
Functions ¶
func Cancel ¶ added in v1.7.5
func Cancel(ctx context.Context, pipelineRcName string, etcdClient *etcd.Client, etcdPrefix string, workerGrpcPort uint16, jobID string, dataFilter []string) error
Cancel cancels a set of datums running on workers. pipelineRcName is the name of the pipeline's RC and can be gotten with ppsutil.PipelineRcName.
func Conns ¶ added in v1.7.5
func Conns(ctx context.Context, pipelineRcName string, etcdClient *etcd.Client, etcdPrefix string, workerGrpcPort uint16) ([]*grpc.ClientConn, error)
Conns returns a slice of connections to worker servers. pipelineRcName is the name of the pipeline's RC and can be gotten with ppsutil.PipelineRcName. You can also pass "" for pipelineRcName to get all clients for all workers.
func HashDatum ¶
HashDatum computes and returns the hash of datum + pipeline, with a pipeline-specific prefix.
func HashDatum15 ¶
func HashDatum15(pipelineInfo *pps.PipelineInfo, data []*Input) (string, error)
HashDatum15 computes and returns the hash of datum + pipeline for version <= 1.5.0, with a pipeline-specific prefix.
func MatchDatum ¶
MatchDatum checks if a datum matches a filter. To match each string in filter must correspond match at least 1 datum's Path or Hash. Order of filter and data is irrelevant.
func RegisterWorkerServer ¶
func RegisterWorkerServer(s *grpc.Server, srv WorkerServer)
func Status ¶ added in v1.7.5
func Status(ctx context.Context, pipelineRcName string, etcdClient *etcd.Client, etcdPrefix string, workerGrpcPort uint16) ([]*pps.WorkerStatus, error)
Status returns the statuses of workers referenced by pipelineRcName. pipelineRcName is the name of the pipeline's RC and can be gotten with ppsutil.PipelineRcName. You can also pass "" for pipelineRcName to get all clients for all workers.
Types ¶
type APIServer ¶
type APIServer struct {
// contains filtered or unexported fields
}
APIServer implements the worker API
func NewAPIServer ¶
func NewAPIServer(pachClient *client.APIClient, etcdClient *etcd.Client, etcdPrefix string, pipelineInfo *pps.PipelineInfo, workerName string, namespace string, hashtreeStorage string) (*APIServer, error)
NewAPIServer creates an APIServer for a given pipeline
func (*APIServer) Cancel ¶
func (a *APIServer) Cancel(ctx context.Context, request *CancelRequest) (*CancelResponse, error)
Cancel cancels the currently running datum
type CancelRequest ¶
type CancelRequest struct { JobID string `protobuf:"bytes,2,opt,name=job_id,json=jobId,proto3" json:"job_id,omitempty"` DataFilters []string `protobuf:"bytes,1,rep,name=data_filters,json=dataFilters,proto3" json:"data_filters,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*CancelRequest) Descriptor ¶
func (*CancelRequest) Descriptor() ([]byte, []int)
func (*CancelRequest) GetDataFilters ¶
func (m *CancelRequest) GetDataFilters() []string
func (*CancelRequest) GetJobID ¶
func (m *CancelRequest) GetJobID() string
func (*CancelRequest) Marshal ¶
func (m *CancelRequest) Marshal() (dAtA []byte, err error)
func (*CancelRequest) ProtoMessage ¶
func (*CancelRequest) ProtoMessage()
func (*CancelRequest) Reset ¶
func (m *CancelRequest) Reset()
func (*CancelRequest) Size ¶
func (m *CancelRequest) Size() (n int)
func (*CancelRequest) String ¶
func (m *CancelRequest) String() string
func (*CancelRequest) Unmarshal ¶
func (m *CancelRequest) Unmarshal(dAtA []byte) error
func (*CancelRequest) XXX_DiscardUnknown ¶ added in v1.7.12
func (m *CancelRequest) XXX_DiscardUnknown()
func (*CancelRequest) XXX_Marshal ¶ added in v1.7.12
func (m *CancelRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*CancelRequest) XXX_Merge ¶ added in v1.7.12
func (dst *CancelRequest) XXX_Merge(src proto.Message)
func (*CancelRequest) XXX_Size ¶ added in v1.7.12
func (m *CancelRequest) XXX_Size() int
func (*CancelRequest) XXX_Unmarshal ¶ added in v1.7.12
func (m *CancelRequest) XXX_Unmarshal(b []byte) error
type CancelResponse ¶
type CancelResponse struct { Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*CancelResponse) Descriptor ¶
func (*CancelResponse) Descriptor() ([]byte, []int)
func (*CancelResponse) GetSuccess ¶
func (m *CancelResponse) GetSuccess() bool
func (*CancelResponse) Marshal ¶
func (m *CancelResponse) Marshal() (dAtA []byte, err error)
func (*CancelResponse) ProtoMessage ¶
func (*CancelResponse) ProtoMessage()
func (*CancelResponse) Reset ¶
func (m *CancelResponse) Reset()
func (*CancelResponse) Size ¶
func (m *CancelResponse) Size() (n int)
func (*CancelResponse) String ¶
func (m *CancelResponse) String() string
func (*CancelResponse) Unmarshal ¶
func (m *CancelResponse) Unmarshal(dAtA []byte) error
func (*CancelResponse) XXX_DiscardUnknown ¶ added in v1.7.12
func (m *CancelResponse) XXX_DiscardUnknown()
func (*CancelResponse) XXX_Marshal ¶ added in v1.7.12
func (m *CancelResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*CancelResponse) XXX_Merge ¶ added in v1.7.12
func (dst *CancelResponse) XXX_Merge(src proto.Message)
func (*CancelResponse) XXX_Size ¶ added in v1.7.12
func (m *CancelResponse) XXX_Size() int
func (*CancelResponse) XXX_Unmarshal ¶ added in v1.7.12
func (m *CancelResponse) XXX_Unmarshal(b []byte) error
type ChunkState ¶ added in v1.6.6
type ChunkState struct { State State `protobuf:"varint,1,opt,name=state,proto3,enum=worker.State" json:"state,omitempty"` DatumID string `protobuf:"bytes,2,opt,name=datum_id,json=datumId,proto3" json:"datum_id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*ChunkState) Descriptor ¶ added in v1.6.6
func (*ChunkState) Descriptor() ([]byte, []int)
func (*ChunkState) GetDatumID ¶ added in v1.6.6
func (m *ChunkState) GetDatumID() string
func (*ChunkState) GetState ¶ added in v1.6.6
func (m *ChunkState) GetState() State
func (*ChunkState) Marshal ¶ added in v1.6.6
func (m *ChunkState) Marshal() (dAtA []byte, err error)
func (*ChunkState) MarshalTo ¶ added in v1.6.6
func (m *ChunkState) MarshalTo(dAtA []byte) (int, error)
func (*ChunkState) ProtoMessage ¶ added in v1.6.6
func (*ChunkState) ProtoMessage()
func (*ChunkState) Reset ¶ added in v1.6.6
func (m *ChunkState) Reset()
func (*ChunkState) Size ¶ added in v1.6.6
func (m *ChunkState) Size() (n int)
func (*ChunkState) String ¶ added in v1.6.6
func (m *ChunkState) String() string
func (*ChunkState) Unmarshal ¶ added in v1.6.6
func (m *ChunkState) Unmarshal(dAtA []byte) error
func (*ChunkState) XXX_DiscardUnknown ¶ added in v1.7.12
func (m *ChunkState) XXX_DiscardUnknown()
func (*ChunkState) XXX_Marshal ¶ added in v1.7.12
func (m *ChunkState) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*ChunkState) XXX_Merge ¶ added in v1.7.12
func (dst *ChunkState) XXX_Merge(src proto.Message)
func (*ChunkState) XXX_Size ¶ added in v1.7.12
func (m *ChunkState) XXX_Size() int
func (*ChunkState) XXX_Unmarshal ¶ added in v1.7.12
func (m *ChunkState) XXX_Unmarshal(b []byte) error
type Client ¶ added in v1.7.5
type Client struct { WorkerClient debug.DebugClient }
Client combines the WorkerAPI and the DebugAPI into a single client.
func Clients ¶ added in v1.7.5
func Clients(ctx context.Context, pipelineRcName string, etcdClient *etcd.Client, etcdPrefix string, workerGrpcPort uint16) ([]Client, error)
Clients returns a slice of worker clients for a pipeline. pipelineRcName is the name of the pipeline's RC and can be gotten with ppsutil.PipelineRcName. You can also pass "" for pipelineRcName to get all clients for all workers.
type DatumFactory ¶ added in v1.5.3
DatumFactory is an interface which allows you to iterate through the datums for a job.
func NewDatumFactory ¶ added in v1.5.3
NewDatumFactory creates a datumFactory for an input.
type Input ¶
type Input struct { FileInfo *pfs.FileInfo `protobuf:"bytes,1,opt,name=file_info,json=fileInfo,proto3" json:"file_info,omitempty"` ParentCommit *pfs.Commit `protobuf:"bytes,5,opt,name=parent_commit,json=parentCommit,proto3" json:"parent_commit,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` Lazy bool `protobuf:"varint,3,opt,name=lazy,proto3" json:"lazy,omitempty"` Branch string `protobuf:"bytes,4,opt,name=branch,proto3" json:"branch,omitempty"` GitURL string `protobuf:"bytes,6,opt,name=git_url,json=gitUrl,proto3" json:"git_url,omitempty"` EmptyFiles bool `protobuf:"varint,7,opt,name=empty_files,json=emptyFiles,proto3" json:"empty_files,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*Input) Descriptor ¶
func (*Input) GetEmptyFiles ¶ added in v1.6.8
func (*Input) GetFileInfo ¶
func (*Input) GetParentCommit ¶
func (*Input) ProtoMessage ¶
func (*Input) ProtoMessage()
func (*Input) XXX_DiscardUnknown ¶ added in v1.7.12
func (m *Input) XXX_DiscardUnknown()
func (*Input) XXX_Marshal ¶ added in v1.7.12
func (*Input) XXX_Unmarshal ¶ added in v1.7.12
type MergeState ¶ added in v1.8.0
type MergeState struct { State State `protobuf:"varint,1,opt,name=state,proto3,enum=worker.State" json:"state,omitempty"` Tree *pfs.Object `protobuf:"bytes,2,opt,name=tree,proto3" json:"tree,omitempty"` SizeBytes uint64 `protobuf:"varint,3,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` StatsTree *pfs.Object `protobuf:"bytes,4,opt,name=stats_tree,json=statsTree,proto3" json:"stats_tree,omitempty"` StatsSizeBytes uint64 `protobuf:"varint,5,opt,name=stats_size_bytes,json=statsSizeBytes,proto3" json:"stats_size_bytes,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*MergeState) Descriptor ¶ added in v1.8.0
func (*MergeState) Descriptor() ([]byte, []int)
func (*MergeState) GetSizeBytes ¶ added in v1.8.0
func (m *MergeState) GetSizeBytes() uint64
func (*MergeState) GetState ¶ added in v1.8.0
func (m *MergeState) GetState() State
func (*MergeState) GetStatsSizeBytes ¶ added in v1.8.0
func (m *MergeState) GetStatsSizeBytes() uint64
func (*MergeState) GetStatsTree ¶ added in v1.8.0
func (m *MergeState) GetStatsTree() *pfs.Object
func (*MergeState) GetTree ¶ added in v1.8.0
func (m *MergeState) GetTree() *pfs.Object
func (*MergeState) Marshal ¶ added in v1.8.0
func (m *MergeState) Marshal() (dAtA []byte, err error)
func (*MergeState) MarshalTo ¶ added in v1.8.0
func (m *MergeState) MarshalTo(dAtA []byte) (int, error)
func (*MergeState) ProtoMessage ¶ added in v1.8.0
func (*MergeState) ProtoMessage()
func (*MergeState) Reset ¶ added in v1.8.0
func (m *MergeState) Reset()
func (*MergeState) Size ¶ added in v1.8.0
func (m *MergeState) Size() (n int)
func (*MergeState) String ¶ added in v1.8.0
func (m *MergeState) String() string
func (*MergeState) Unmarshal ¶ added in v1.8.0
func (m *MergeState) Unmarshal(dAtA []byte) error
func (*MergeState) XXX_DiscardUnknown ¶ added in v1.8.1
func (m *MergeState) XXX_DiscardUnknown()
func (*MergeState) XXX_Marshal ¶ added in v1.8.1
func (m *MergeState) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*MergeState) XXX_Merge ¶ added in v1.8.1
func (dst *MergeState) XXX_Merge(src proto.Message)
func (*MergeState) XXX_Size ¶ added in v1.8.1
func (m *MergeState) XXX_Size() int
func (*MergeState) XXX_Unmarshal ¶ added in v1.8.1
func (m *MergeState) XXX_Unmarshal(b []byte) error
type Plan ¶ added in v1.8.0
type Plan struct { Chunks []int64 `protobuf:"varint,1,rep,packed,name=chunks,proto3" json:"chunks,omitempty"` Merges int64 `protobuf:"varint,2,opt,name=merges,proto3" json:"merges,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*Plan) Descriptor ¶ added in v1.8.0
func (*Plan) ProtoMessage ¶ added in v1.8.0
func (*Plan) ProtoMessage()
func (*Plan) XXX_DiscardUnknown ¶ added in v1.8.1
func (m *Plan) XXX_DiscardUnknown()
func (*Plan) XXX_Marshal ¶ added in v1.8.1
func (*Plan) XXX_Unmarshal ¶ added in v1.8.1
type WorkerClient ¶
type WorkerClient interface { Status(ctx context.Context, in *types.Empty, opts ...grpc.CallOption) (*pps.WorkerStatus, error) Cancel(ctx context.Context, in *CancelRequest, opts ...grpc.CallOption) (*CancelResponse, error) }
WorkerClient is the client API for Worker service.
For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
func NewWorkerClient ¶
func NewWorkerClient(cc *grpc.ClientConn) WorkerClient
type WorkerServer ¶
type WorkerServer interface { Status(context.Context, *types.Empty) (*pps.WorkerStatus, error) Cancel(context.Context, *CancelRequest) (*CancelResponse, error) }
WorkerServer is the server API for Worker service.