extended

package
v1.7.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2019 License: Apache-2.0 Imports: 14 Imported by: 3

Documentation

Index

Constants

View Source
const (
	// TraceCtxKey is the grpc metadata key whose value is a ExtendedTrace
	// identifying the current RPC/commit
	TraceCtxKey = "commit-trace"

	// TracesCollectionPrefix is the prefix associated with the 'traces'
	// collection in etcd (which maps pipelines and commits to extended traces)
	TracesCollectionPrefix = "commit_traces"

	// TargetRepoEnvVar determines how long extended traces are updated until
	// they're deleted from the cluster
	TargetRepoEnvVar = "PACH_TRACING_TARGET_REPO"
)

Variables

View Source
var (
	// CommitIDIndex is a secondary index for extended traces by the set of
	// commit IDs watched by the trace
	CommitIDIndex = &col.Index{
		Field: "CommitIDs",
		Multi: true,
	}

	// PipelineIndex is a secondary index for extended traces by the pipelint
	// watched by the trace
	PipelineIndex = &col.Index{
		Field: "Pipeline",
	}

	// TraceGetOpts are the default options for retrieving a trace from
	// 'TracesCol'
	TraceGetOpts = &col.Options{Target: etcd.SortByKey, Order: etcd.SortNone, SelfSort: false}
)
View Source
var (
	ErrInvalidLengthExtendedTrace = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowExtendedTrace   = fmt.Errorf("proto: integer overflow")
)

Functions

func AddJobSpanToAnyTrace

func AddJobSpanToAnyTrace(ctx context.Context, c *etcd.Client, commit *pfs.CommitInfo) (opentracing.Span, context.Context)

AddJobSpanToAnyTrace is like AddPipelineSpanToAnyTrace but looks for traces associated with the output commit 'commit'. Unlike AddPipelineSpan, this also may delete extended traces that it finds.

In general, AddJobSpan will

  • Retrieve any trace associated with the pipeline identified with 'commit's repo
  • Delete (from etcd, but not Jaeger) any extended trace that it finds where all commits associated with the trace are in 'commit's provenance. This covers the following two cases: 1. A job is being traced through 'commit's output commit. Once commit is finished, no more spans should be added to the trace 2. The trace has no commits associated with it. This means the trace is for initial pipeline creation, but a worker is now starting jobs for the pipeline, so the pipeline is up and it's safe to stop tracing it
  • Add a span to any trace associated with 'commit's ID (case 2. above)

func AddPipelineSpanToAnyTrace

func AddPipelineSpanToAnyTrace(ctx context.Context, c *etcd.Client,
	pipeline, operation string, kvs ...interface{}) (opentracing.Span, context.Context)

AddPipelineSpanToAnyTrace finds any extended traces associated with 'pipeline', and if any such trace exists, it creates a new span associated with that trace and returns it

func TraceIn2Out

func TraceIn2Out(ctx context.Context) context.Context

TraceIn2Out copies any extended traces from the incoming RPC context in 'ctx' into the outgoing RPC context in 'ctx'. Currently, this is only called by CreatePipeline, when it forwards extended contexts to the PutFile RPC with the new commit info.

func TracesCol

func TracesCol(c *etcd.Client) col.Collection

TracesCol returns the etcd collection of extended traces

Types

type TraceProto

type TraceProto struct {
	// branch specifies a target branch of this trace; this would be set for a
	// trace created by 'pachctl finish-commit' (or some other call that would
	// spawn jobs) and if that new commit spawns a downstream commit in 'branch',
	// then this trace will cover RPCs util that downstream commit is finished
	Branch *pfs.Branch `protobuf:"bytes,1,opt,name=branch,proto3" json:"branch,omitempty"`
	// pipeline specifies the target pipeline of this trace; this would be set for
	// a trace created by 'pachctl create-pipeline' or 'pachctl update-pipeline'
	// and would include the kubernetes RPCs involved in creating a pipeline
	Pipeline string `protobuf:"bytes,4,opt,name=pipeline,proto3" json:"pipeline,omitempty"`
	// commit_ids are the new commits on which this trace's target commit is
	// provenant (which will be covered by this trace)
	CommitIDs []string `protobuf:"bytes,3,rep,name=commit_ids,json=commitIds,proto3" json:"commit_ids,omitempty"`
	// serialized_trace contains the info identifying a trace in Jaeger (a
	// (trace ID, span ID, sampled) tuple, basically)
	SerializedTrace      map[string]string `` /* 194-byte string literal not displayed */
	XXX_NoUnkeyedLiteral struct{}          `json:"-"`
	XXX_unrecognized     []byte            `json:"-"`
	XXX_sizecache        int32             `json:"-"`
}

TraceProto contains information identifying a Jaeger trace. It's used to propagate traces that follow the lifetime of a long operation (e.g. creating a pipeline or running a job), and which live longer than any single RPC.

func GetTraceFromCtx

func GetTraceFromCtx(ctx context.Context) (*TraceProto, error)

GetTraceFromCtx extracts any extended trace embeded in 'ctx's metadata

func (*TraceProto) Descriptor

func (*TraceProto) Descriptor() ([]byte, []int)

func (*TraceProto) GetBranch

func (m *TraceProto) GetBranch() *pfs.Branch

func (*TraceProto) GetCommitIDs

func (m *TraceProto) GetCommitIDs() []string

func (*TraceProto) GetPipeline

func (m *TraceProto) GetPipeline() string

func (*TraceProto) GetSerializedTrace

func (m *TraceProto) GetSerializedTrace() map[string]string

func (*TraceProto) Marshal

func (m *TraceProto) Marshal() (dAtA []byte, err error)

func (*TraceProto) MarshalTo

func (m *TraceProto) MarshalTo(dAtA []byte) (int, error)

func (*TraceProto) ProtoMessage

func (*TraceProto) ProtoMessage()

func (*TraceProto) Reset

func (m *TraceProto) Reset()

func (*TraceProto) Size

func (m *TraceProto) Size() (n int)

func (*TraceProto) String

func (m *TraceProto) String() string

func (*TraceProto) Unmarshal

func (m *TraceProto) Unmarshal(dAtA []byte) error

func (*TraceProto) XXX_DiscardUnknown

func (m *TraceProto) XXX_DiscardUnknown()

func (*TraceProto) XXX_Marshal

func (m *TraceProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*TraceProto) XXX_Merge

func (m *TraceProto) XXX_Merge(src proto.Message)

func (*TraceProto) XXX_Size

func (m *TraceProto) XXX_Size() int

func (*TraceProto) XXX_Unmarshal

func (m *TraceProto) XXX_Unmarshal(b []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL