Documentation ¶
Index ¶
- Constants
- Variables
- func AddJobSpanToAnyTrace(ctx context.Context, c *etcd.Client, commit *pfs.CommitInfo) (opentracing.Span, context.Context)
- func AddPipelineSpanToAnyTrace(ctx context.Context, c *etcd.Client, pipeline, operation string, ...) (opentracing.Span, context.Context)
- func TraceIn2Out(ctx context.Context) context.Context
- func TracesCol(c *etcd.Client) col.Collection
- type TraceProto
- func (*TraceProto) Descriptor() ([]byte, []int)
- func (m *TraceProto) GetBranch() *pfs.Branch
- func (m *TraceProto) GetCommitIDs() []string
- func (m *TraceProto) GetPipeline() string
- func (m *TraceProto) GetSerializedTrace() map[string]string
- func (m *TraceProto) Marshal() (dAtA []byte, err error)
- func (m *TraceProto) MarshalTo(dAtA []byte) (int, error)
- func (*TraceProto) ProtoMessage()
- func (m *TraceProto) Reset()
- func (m *TraceProto) Size() (n int)
- func (m *TraceProto) String() string
- func (m *TraceProto) Unmarshal(dAtA []byte) error
- func (m *TraceProto) XXX_DiscardUnknown()
- func (m *TraceProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *TraceProto) XXX_Merge(src proto.Message)
- func (m *TraceProto) XXX_Size() int
- func (m *TraceProto) XXX_Unmarshal(b []byte) error
Constants ¶
const ( // TraceCtxKey is the grpc metadata key whose value is a ExtendedTrace // identifying the current RPC/commit TraceCtxKey = "commit-trace" // TracesCollectionPrefix is the prefix associated with the 'traces' // collection in etcd (which maps pipelines and commits to extended traces) TracesCollectionPrefix = "commit_traces" // TargetRepoEnvVar determines how long extended traces are updated until // they're deleted from the cluster TargetRepoEnvVar = "PACH_TRACING_TARGET_REPO" )
Variables ¶
var ( // CommitIDIndex is a secondary index for extended traces by the set of // commit IDs watched by the trace CommitIDIndex = &col.Index{ Field: "CommitIDs", Multi: true, } // PipelineIndex is a secondary index for extended traces by the pipelint // watched by the trace PipelineIndex = &col.Index{ Field: "Pipeline", } // TraceGetOpts are the default options for retrieving a trace from // 'TracesCol' TraceGetOpts = &col.Options{Target: etcd.SortByKey, Order: etcd.SortNone, SelfSort: false} )
var ( ErrInvalidLengthExtendedTrace = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowExtendedTrace = fmt.Errorf("proto: integer overflow") )
Functions ¶
func AddJobSpanToAnyTrace ¶
func AddJobSpanToAnyTrace(ctx context.Context, c *etcd.Client, commit *pfs.CommitInfo) (opentracing.Span, context.Context)
AddJobSpanToAnyTrace is like AddPipelineSpanToAnyTrace but looks for traces associated with the output commit 'commit'. Unlike AddPipelineSpan, this also may delete extended traces that it finds.
In general, AddJobSpan will
- Retrieve any trace associated with the pipeline identified with 'commit's repo
- Delete (from etcd, but not Jaeger) any extended trace that it finds where all commits associated with the trace are in 'commit's provenance. This covers the following two cases: 1. A job is being traced through 'commit's output commit. Once commit is finished, no more spans should be added to the trace 2. The trace has no commits associated with it. This means the trace is for initial pipeline creation, but a worker is now starting jobs for the pipeline, so the pipeline is up and it's safe to stop tracing it
- Add a span to any trace associated with 'commit's ID (case 2. above)
func AddPipelineSpanToAnyTrace ¶
func AddPipelineSpanToAnyTrace(ctx context.Context, c *etcd.Client, pipeline, operation string, kvs ...interface{}) (opentracing.Span, context.Context)
AddPipelineSpanToAnyTrace finds any extended traces associated with 'pipeline', and if any such trace exists, it creates a new span associated with that trace and returns it
func TraceIn2Out ¶
TraceIn2Out copies any extended traces from the incoming RPC context in 'ctx' into the outgoing RPC context in 'ctx'. Currently, this is only called by CreatePipeline, when it forwards extended contexts to the PutFile RPC with the new commit info.
Types ¶
type TraceProto ¶
type TraceProto struct { // branch specifies a target branch of this trace; this would be set for a // trace created by 'pachctl finish-commit' (or some other call that would // spawn jobs) and if that new commit spawns a downstream commit in 'branch', // then this trace will cover RPCs util that downstream commit is finished Branch *pfs.Branch `protobuf:"bytes,1,opt,name=branch,proto3" json:"branch,omitempty"` // pipeline specifies the target pipeline of this trace; this would be set for // a trace created by 'pachctl create-pipeline' or 'pachctl update-pipeline' // and would include the kubernetes RPCs involved in creating a pipeline Pipeline string `protobuf:"bytes,4,opt,name=pipeline,proto3" json:"pipeline,omitempty"` // commit_ids are the new commits on which this trace's target commit is // provenant (which will be covered by this trace) CommitIDs []string `protobuf:"bytes,3,rep,name=commit_ids,json=commitIds,proto3" json:"commit_ids,omitempty"` // serialized_trace contains the info identifying a trace in Jaeger (a // (trace ID, span ID, sampled) tuple, basically) SerializedTrace map[string]string `` /* 194-byte string literal not displayed */ XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
TraceProto contains information identifying a Jaeger trace. It's used to propagate traces that follow the lifetime of a long operation (e.g. creating a pipeline or running a job), and which live longer than any single RPC.
func GetTraceFromCtx ¶
func GetTraceFromCtx(ctx context.Context) (*TraceProto, error)
GetTraceFromCtx extracts any extended trace embeded in 'ctx's metadata
func (*TraceProto) Descriptor ¶
func (*TraceProto) Descriptor() ([]byte, []int)
func (*TraceProto) GetBranch ¶
func (m *TraceProto) GetBranch() *pfs.Branch
func (*TraceProto) GetCommitIDs ¶
func (m *TraceProto) GetCommitIDs() []string
func (*TraceProto) GetPipeline ¶
func (m *TraceProto) GetPipeline() string
func (*TraceProto) GetSerializedTrace ¶
func (m *TraceProto) GetSerializedTrace() map[string]string
func (*TraceProto) Marshal ¶
func (m *TraceProto) Marshal() (dAtA []byte, err error)
func (*TraceProto) ProtoMessage ¶
func (*TraceProto) ProtoMessage()
func (*TraceProto) Reset ¶
func (m *TraceProto) Reset()
func (*TraceProto) Size ¶
func (m *TraceProto) Size() (n int)
func (*TraceProto) String ¶
func (m *TraceProto) String() string
func (*TraceProto) Unmarshal ¶
func (m *TraceProto) Unmarshal(dAtA []byte) error
func (*TraceProto) XXX_DiscardUnknown ¶
func (m *TraceProto) XXX_DiscardUnknown()
func (*TraceProto) XXX_Marshal ¶
func (m *TraceProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*TraceProto) XXX_Merge ¶
func (m *TraceProto) XXX_Merge(src proto.Message)
func (*TraceProto) XXX_Size ¶
func (m *TraceProto) XXX_Size() int
func (*TraceProto) XXX_Unmarshal ¶
func (m *TraceProto) XXX_Unmarshal(b []byte) error