datastreams

package
v1.70.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: Apache-2.0, BSD-3-Clause, Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// PropagationKeyBase64 is the key to use to propagate the pathway between services.
	PropagationKeyBase64 = "dd-pathway-ctx-base64"
)

Variables

This section is empty.

Functions

func ContextWithPathway

func ContextWithPathway(ctx context.Context, p Pathway) context.Context

ContextWithPathway returns a copy of the given context which includes the pathway p.

Types

type Backlog

type Backlog struct {
	// Tags that identify the backlog
	Tags []string
	// Value of the backlog
	Value int64
}

Backlog represents the size of a queue that hasn't been yet read by the consumer.

func (*Backlog) DecodeMsg

func (z *Backlog) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*Backlog) EncodeMsg

func (z *Backlog) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Backlog) Msgsize

func (z *Backlog) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type CommitOffset

type CommitOffset struct {
	ConsumerGroup string
	Topic         string
	Partition     int32
	Offset        int64
}

func (*CommitOffset) DecodeMsg

func (z *CommitOffset) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*CommitOffset) EncodeMsg

func (z *CommitOffset) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*CommitOffset) Msgsize

func (z *CommitOffset) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type Pathway

type Pathway struct {
	// contains filtered or unexported fields
}

Pathway is used to monitor how payloads are sent across different services. An example Pathway would be: service A -- edge 1 --> service B -- edge 2 --> service C So it's a branch of services (we also call them "nodes") connected via edges. As the payload is sent around, we save the start time (start of service A), and the start time of the previous service. This allows us to measure the latency of each edge, as well as the latency from origin of any service.

func Decode

func Decode(ctx context.Context, data []byte) (p Pathway, outCtx context.Context, err error)

Decode decodes a pathway

func DecodeBase64

func DecodeBase64(ctx context.Context, str string) (p Pathway, outCtx context.Context, err error)

DecodeBase64 decodes a pathway context from a string using base64 encoding.

func Merge

func Merge(pathways []Pathway) Pathway

Merge merges multiple pathways into one. The current implementation samples one resulting Pathway. A future implementation could be more clever and actually merge the Pathways.

func PathwayFromContext

func PathwayFromContext(ctx context.Context) (p Pathway, ok bool)

PathwayFromContext returns the pathway contained in the given context, and whether a pathway is found in ctx.

func (Pathway) EdgeStart

func (p Pathway) EdgeStart() time.Time

func (Pathway) Encode

func (p Pathway) Encode() []byte

Encode encodes the pathway

func (Pathway) EncodeBase64

func (p Pathway) EncodeBase64() string

EncodeBase64 encodes a pathway context into a string using base64 encoding.

func (Pathway) GetHash

func (p Pathway) GetHash() uint64

GetHash gets the hash of a pathway.

func (Pathway) PathwayStart

func (p Pathway) PathwayStart() time.Time

PathwayStart returns the start timestamp of the pathway

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(statsd internal.StatsdClient, env, service, version string, agentURL *url.URL, httpClient *http.Client) *Processor

func (*Processor) Flush

func (p *Processor) Flush()

Flush triggers a flush and waits for it to complete.

func (*Processor) SetCheckpoint

func (p *Processor) SetCheckpoint(ctx context.Context, edgeTags ...string) context.Context

func (*Processor) SetCheckpointWithParams

func (p *Processor) SetCheckpointWithParams(ctx context.Context, params options.CheckpointParams, edgeTags ...string) context.Context

func (*Processor) Start

func (p *Processor) Start()

func (*Processor) Stop

func (p *Processor) Stop()

func (*Processor) TrackKafkaCommitOffset

func (p *Processor) TrackKafkaCommitOffset(group string, topic string, partition int32, offset int64)

func (*Processor) TrackKafkaHighWatermarkOffset added in v1.61.0

func (p *Processor) TrackKafkaHighWatermarkOffset(_ string, topic string, partition int32, offset int64)

TrackKafkaHighWatermarkOffset should be used in the consumer, to track the high watermark offsets of each partition. The first argument is the Kafka cluster ID, and will be used later.

func (*Processor) TrackKafkaProduceOffset

func (p *Processor) TrackKafkaProduceOffset(topic string, partition int32, offset int64)

type ProduceOffset

type ProduceOffset struct {
	Topic     string
	Partition int32
	Offset    int64
}

func (*ProduceOffset) DecodeMsg

func (z *ProduceOffset) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (ProduceOffset) EncodeMsg

func (z ProduceOffset) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (ProduceOffset) Msgsize

func (z ProduceOffset) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type StatsBucket

type StatsBucket struct {
	// Start specifies the beginning of this bucket in unix nanoseconds.
	Start uint64
	// Duration specifies the duration of this bucket in nanoseconds.
	Duration uint64
	// Stats contains a set of statistics computed for the duration of this bucket.
	Stats []StatsPoint
	// Backlogs store information used to compute queue backlog
	Backlogs []Backlog
}

StatsBucket specifies a set of stats computed over a duration.

func (*StatsBucket) DecodeMsg

func (z *StatsBucket) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*StatsBucket) EncodeMsg

func (z *StatsBucket) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*StatsBucket) Msgsize

func (z *StatsBucket) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type StatsPayload

type StatsPayload struct {
	// Env specifies the env. of the application, as defined by the user.
	Env string
	// Service is the service of the application
	Service string
	// Stats holds all stats buckets computed within this payload.
	Stats []StatsBucket
	// TracerVersion is the version of the tracer
	TracerVersion string
	// Lang is the language of the tracer
	Lang string
	// Version is the version of the service
	Version string
}

StatsPayload stores client computed stats.

func (*StatsPayload) DecodeMsg

func (z *StatsPayload) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*StatsPayload) EncodeMsg

func (z *StatsPayload) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*StatsPayload) Msgsize

func (z *StatsPayload) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type StatsPoint

type StatsPoint struct {
	// These fields indicate the properties under which the stats were aggregated.
	Service    string // deprecated
	EdgeTags   []string
	Hash       uint64
	ParentHash uint64
	// These fields specify the stats for the above aggregation.
	// those are distributions of latency in seconds.
	PathwayLatency []byte
	EdgeLatency    []byte
	PayloadSize    []byte
	TimestampType  TimestampType
}

StatsPoint contains a set of statistics grouped under various aggregation keys.

func (*StatsPoint) DecodeMsg

func (z *StatsPoint) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*StatsPoint) EncodeMsg

func (z *StatsPoint) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*StatsPoint) Msgsize

func (z *StatsPoint) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

type TimestampType

type TimestampType string

TimestampType can be either current or origin.

const (
	// TimestampTypeCurrent is for when the recorded timestamp is based on the
	// timestamp of the current StatsPoint.
	TimestampTypeCurrent TimestampType = "current"
	// TimestampTypeOrigin is for when the recorded timestamp is based on the
	// time that the first StatsPoint in the pathway is sent out.
	TimestampTypeOrigin TimestampType = "origin"
)

func (*TimestampType) DecodeMsg

func (z *TimestampType) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (TimestampType) EncodeMsg

func (z TimestampType) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (TimestampType) Msgsize

func (z TimestampType) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL