Documentation ¶
Overview ¶
Package ppsutil contains utilities for various PPS-related tasks, which are shared by both the PPS API and the worker binary. These utilities include: - Getting the RC name and querying k8s reguarding pipelines - Reading and writing pipeline resource requests and limits - Reading and writing EtcdPipelineInfos and PipelineInfos[1]
[1] Note that PipelineInfo in particular is complicated because it contains fields that are not always set or are stored in multiple places ('job_state', for example, is not stored in PFS along with the rest of each PipelineInfo, because this field is volatile and we cannot commit to PFS every time it changes. 'job_counts' is the same, and 'reason' is in etcd because it is only updated alongside 'job_state'). As of 12/7/2017, these are the only fields not stored in PFS.
Index ¶
- func DescribeSyntaxError(originalErr error, parsedBuffer bytes.Buffer) error
- func FailPipeline(ctx context.Context, etcdClient *etcd.Client, ...) error
- func GetExpectedNumHashtrees(spec *ppsclient.HashtreeSpec) (int64, error)
- func GetExpectedNumWorkers(kubeClient *kube.Clientset, spec *ppsclient.ParallelismSpec) (int, error)
- func GetLimitsResourceListFromPipeline(pipelineInfo *pps.PipelineInfo) (*v1.ResourceList, error)
- func GetPipelineInfo(pachClient *client.APIClient, ptr *pps.EtcdPipelineInfo) (*pps.PipelineInfo, error)
- func GetRequestsResourceListFromPipeline(pipelineInfo *pps.PipelineInfo) (*v1.ResourceList, error)
- func IsTerminal(state pps.JobState) bool
- func JobInput(pipelineInfo *pps.PipelineInfo, outputCommitInfo *pfs.CommitInfo) *pps.Input
- func PipelineRcName(name string, version uint64) string
- func PipelineRepo(pipeline *ppsclient.Pipeline) *pfs.Repo
- func PipelineReqFromInfo(pipelineInfo *ppsclient.PipelineInfo) *ppsclient.CreatePipelineRequest
- func UpdateJobState(pipelines col.ReadWriteCollection, jobs col.ReadWriteCollection, ...) error
- type PipelineManifestReader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DescribeSyntaxError ¶
DescribeSyntaxError describes a syntax error encountered parsing json.
func FailPipeline ¶
func FailPipeline(ctx context.Context, etcdClient *etcd.Client, pipelinesCollection col.Collection, pipelineName string, reason string) error
FailPipeline updates the pipeline's state to failed and sets the failure reason
func GetExpectedNumHashtrees ¶ added in v1.8.0
func GetExpectedNumHashtrees(spec *ppsclient.HashtreeSpec) (int64, error)
GetExpectedNumHashtrees computes the expected number of hashtrees that Pachyderm will create given the HashtreeSpec 'spec'.
func GetExpectedNumWorkers ¶
func GetExpectedNumWorkers(kubeClient *kube.Clientset, spec *ppsclient.ParallelismSpec) (int, error)
GetExpectedNumWorkers computes the expected number of workers that pachyderm will start given the ParallelismSpec 'spec'.
This is only exported for testing
func GetLimitsResourceListFromPipeline ¶
func GetLimitsResourceListFromPipeline(pipelineInfo *pps.PipelineInfo) (*v1.ResourceList, error)
GetLimitsResourceListFromPipeline returns a list of resources that the pipeline, maximally is limited to.
func GetPipelineInfo ¶
func GetPipelineInfo(pachClient *client.APIClient, ptr *pps.EtcdPipelineInfo) (*pps.PipelineInfo, error)
GetPipelineInfo retrieves and returns a valid PipelineInfo from PFS. It does the PFS read/unmarshalling of bytes as well as filling in missing fields
func GetRequestsResourceListFromPipeline ¶
func GetRequestsResourceListFromPipeline(pipelineInfo *pps.PipelineInfo) (*v1.ResourceList, error)
GetRequestsResourceListFromPipeline returns a list of resources that the pipeline, minimally requires.
func IsTerminal ¶
IsTerminal returns 'true' if 'state' indicates that the job is done (i.e. the state will not change later: SUCCESS, FAILURE, KILLED) and 'false' otherwise.
func JobInput ¶
func JobInput(pipelineInfo *pps.PipelineInfo, outputCommitInfo *pfs.CommitInfo) *pps.Input
JobInput fills in the commits for a JobInfo
func PipelineRcName ¶
PipelineRcName generates the name of the k8s replication controller that manages a pipeline's workers
func PipelineRepo ¶
PipelineRepo creates a pfs repo for a given pipeline.
func PipelineReqFromInfo ¶
func PipelineReqFromInfo(pipelineInfo *ppsclient.PipelineInfo) *ppsclient.CreatePipelineRequest
PipelineReqFromInfo converts a PipelineInfo into a CreatePipelineRequest.
func UpdateJobState ¶ added in v1.8.0
func UpdateJobState(pipelines col.ReadWriteCollection, jobs col.ReadWriteCollection, jobPtr *pps.EtcdJobInfo, state pps.JobState, reason string) error
UpdateJobState performs the operations involved with a job state transition.
Types ¶
type PipelineManifestReader ¶
type PipelineManifestReader struct {
// contains filtered or unexported fields
}
PipelineManifestReader helps with unmarshalling pipeline configs from JSON. It's used by 'create pipeline' and 'update pipeline'
Note that the json decoder is able to parse text that gopkg.in/yaml.v3 cannot (multiple json documents) so we currently guess whether the document is JSON or not by looking at the first non-space character and seeing if it's '{' (we originally tried parsing the pipeline spec with both parsers, but that approach made it hard to return sensible errors). We may fail to parse valid YAML documents this way, so hopefully the yaml parser gains multi-document support and we can rely on it fully.
func NewPipelineManifestReader ¶
func NewPipelineManifestReader(path string) (result *PipelineManifestReader, retErr error)
NewPipelineManifestReader creates a new manifest reader from a path.
func (*PipelineManifestReader) NextCreatePipelineRequest ¶
func (r *PipelineManifestReader) NextCreatePipelineRequest() (*ppsclient.CreatePipelineRequest, error)
NextCreatePipelineRequest gets the next request from the manifest reader.
The implementation of this is currently somewhat wasteful: it parses the whole request into a map, modifies the map, serializes the map back into JSON, and then deserializes the JSON into a CreatePipelineRequest struct. This is to manage embedded data (currently just TFJobs, but could later be spark jobs or some such), which are represented as serialized JSON in the CreatePipelineRequest struct, but which we parse as structured data.
No effort is made to bypass this parse-serialize-parse process even in the common case where the pipeline contains no TFJob, because all of this happens only in 'pachctl', and only when a pipeline is created or updated.