Documentation ¶
Index ¶
- Constants
- func ContextWithPathway(ctx context.Context, p Pathway) context.Context
- type Backlog
- type CommitOffset
- type Pathway
- type Processor
- func (p *Processor) Flush()
- func (p *Processor) SetCheckpoint(ctx context.Context, edgeTags ...string) context.Context
- func (p *Processor) SetCheckpointWithParams(ctx context.Context, params options.CheckpointParams, edgeTags ...string) context.Context
- func (p *Processor) Start()
- func (p *Processor) Stop()
- func (p *Processor) TrackKafkaCommitOffset(group string, topic string, partition int32, offset int64)
- func (p *Processor) TrackKafkaHighWatermarkOffset(_ string, topic string, partition int32, offset int64)
- func (p *Processor) TrackKafkaProduceOffset(topic string, partition int32, offset int64)
- type ProduceOffset
- type StatsBucket
- type StatsPayload
- type StatsPoint
- type TimestampType
Constants ¶
const (
// PropagationKeyBase64 is the key to use to propagate the pathway between services.
PropagationKeyBase64 = "dd-pathway-ctx-base64"
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Backlog ¶
type Backlog struct { // Tags that identify the backlog Tags []string // Value of the backlog Value int64 }
Backlog represents the size of a queue that hasn't been yet read by the consumer.
type CommitOffset ¶
func (*CommitOffset) DecodeMsg ¶
func (z *CommitOffset) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*CommitOffset) EncodeMsg ¶
func (z *CommitOffset) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*CommitOffset) Msgsize ¶
func (z *CommitOffset) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type Pathway ¶
type Pathway struct {
// contains filtered or unexported fields
}
Pathway is used to monitor how payloads are sent across different services. An example Pathway would be: service A -- edge 1 --> service B -- edge 2 --> service C So it's a branch of services (we also call them "nodes") connected via edges. As the payload is sent around, we save the start time (start of service A), and the start time of the previous service. This allows us to measure the latency of each edge, as well as the latency from origin of any service.
func DecodeBase64 ¶
DecodeBase64 decodes a pathway context from a string using base64 encoding.
func Merge ¶
Merge merges multiple pathways into one. The current implementation samples one resulting Pathway. A future implementation could be more clever and actually merge the Pathways.
func PathwayFromContext ¶
PathwayFromContext returns the pathway contained in the given context, and whether a pathway is found in ctx.
func (Pathway) EncodeBase64 ¶
EncodeBase64 encodes a pathway context into a string using base64 encoding.
func (Pathway) PathwayStart ¶
PathwayStart returns the start timestamp of the pathway
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
func NewProcessor ¶
func (*Processor) Flush ¶
func (p *Processor) Flush()
Flush triggers a flush and waits for it to complete.
func (*Processor) SetCheckpoint ¶
func (*Processor) SetCheckpointWithParams ¶
func (*Processor) TrackKafkaCommitOffset ¶
func (*Processor) TrackKafkaHighWatermarkOffset ¶ added in v1.61.0
func (p *Processor) TrackKafkaHighWatermarkOffset(_ string, topic string, partition int32, offset int64)
TrackKafkaHighWatermarkOffset should be used in the consumer, to track the high watermark offsets of each partition. The first argument is the Kafka cluster ID, and will be used later.
type ProduceOffset ¶
func (*ProduceOffset) DecodeMsg ¶
func (z *ProduceOffset) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (ProduceOffset) EncodeMsg ¶
func (z ProduceOffset) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (ProduceOffset) Msgsize ¶
func (z ProduceOffset) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type StatsBucket ¶
type StatsBucket struct { // Start specifies the beginning of this bucket in unix nanoseconds. Start uint64 // Duration specifies the duration of this bucket in nanoseconds. Duration uint64 // Stats contains a set of statistics computed for the duration of this bucket. Stats []StatsPoint // Backlogs store information used to compute queue backlog Backlogs []Backlog }
StatsBucket specifies a set of stats computed over a duration.
func (*StatsBucket) DecodeMsg ¶
func (z *StatsBucket) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*StatsBucket) EncodeMsg ¶
func (z *StatsBucket) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*StatsBucket) Msgsize ¶
func (z *StatsBucket) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type StatsPayload ¶
type StatsPayload struct { // Env specifies the env. of the application, as defined by the user. Env string // Service is the service of the application Service string // Stats holds all stats buckets computed within this payload. Stats []StatsBucket // TracerVersion is the version of the tracer TracerVersion string // Lang is the language of the tracer Lang string // Version is the version of the service Version string }
StatsPayload stores client computed stats.
func (*StatsPayload) DecodeMsg ¶
func (z *StatsPayload) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*StatsPayload) EncodeMsg ¶
func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*StatsPayload) Msgsize ¶
func (z *StatsPayload) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type StatsPoint ¶
type StatsPoint struct { // These fields indicate the properties under which the stats were aggregated. Service string // deprecated EdgeTags []string Hash uint64 ParentHash uint64 // These fields specify the stats for the above aggregation. // those are distributions of latency in seconds. PathwayLatency []byte EdgeLatency []byte PayloadSize []byte TimestampType TimestampType }
StatsPoint contains a set of statistics grouped under various aggregation keys.
func (*StatsPoint) DecodeMsg ¶
func (z *StatsPoint) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*StatsPoint) EncodeMsg ¶
func (z *StatsPoint) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*StatsPoint) Msgsize ¶
func (z *StatsPoint) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
type TimestampType ¶
type TimestampType string
TimestampType can be either current or origin.
const ( // TimestampTypeCurrent is for when the recorded timestamp is based on the // timestamp of the current StatsPoint. TimestampTypeCurrent TimestampType = "current" // TimestampTypeOrigin is for when the recorded timestamp is based on the // time that the first StatsPoint in the pathway is sent out. TimestampTypeOrigin TimestampType = "origin" )
func (*TimestampType) DecodeMsg ¶
func (z *TimestampType) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (TimestampType) EncodeMsg ¶
func (z TimestampType) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (TimestampType) Msgsize ¶
func (z TimestampType) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message