retry

package
v0.55.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

README

retry package

Introduction

If the network becomes unavailable, the Agent stores the metrics in memory. The maximum memory usage for storing the metrics is defined by the forwarder_retry_queue_payloads_max_size configuration setting. When this limit is reached and Agent's on-disk transaction storage is not enabled (see below), the metrics are dropped.

The Agent can also store the metrics that have not yet been sent on disk when the memory limit is reached. Enable this capability by setting forwarder_storage_max_size_in_bytes to a positive value indicating the maximum amount of storage space, in bytes, that the Agent can use to store the metrics on disk.

On-disk metrics are stored in the folder defined by the forwarder_storage_path setting, which is by default /opt/datadog-agent/run/transactions_to_retry on Unix systems and C:\ProgramData\Datadog\run\transactions_to_retry on Windows.

To avoid running out of storage space, by default the Agent stores the metrics on disk only if the target disk has not reached 95% capacity. This limit can be adjusted via forwarder_storage_max_disk_ratio setting.

How does it work?

When the retry queue in memory is full and a new transaction need to be added, some transactions from the retry queue are removed and serialized into a new file on disk. The amount of transaction data serialized at a time from the Agent is controlled by the option forwarder_flush_to_disk_mem_ratio.

When the forwarder attempts to retry previously failed transactions, it first retries the HTTP transactions stored in memory. Once the in-memory retry queue is empty, the forwarder then retries the transactions stored in the newest on-disk transaction file and removes it.

Adding transactions to the retry queue

Consider the following example where the in-memory retry queue can store up to 4 transactions.

At the beginning, the retry queue in memory is empty. When the transactions tr1, tr2, tr3 and tr4 fail, they are added sequentially to the retry queue which then becomes full. When trying to add tr5, the oldest transactions are removed from the retry queue in memory and serialized into the file File 1. The number of transactions to serialize to disk is defined as a percent of the size of the retry queue. In this example, when the retry queue is full, 50% of the transactions (in terms of payload size) are serialized on disk. When adding the transactions tr6 and tr7, the retry queue becomes full again and the tr4 and tr3 are serialized to the on-disk transaction file File 2.

Adding transactions to the retry queue

Removing transactions from the retry queue

Once the Agent is able to send data again, initially tr7, tr6 and tr5 are returned since they are the ones residing in the in-memory queue. When those transactions are sent, File 2 content is read and removed, sending the transactions tr4 and tr3. Finally, File 1 is read and removed, sending the tr2 and tr1 to the intake.

Removing transactions from the retry queue

Implementations notes
  • There is a single retry queue for all the endpoints.
  • The files are read and written as a whole which is efficient as few reads and writes on disk are performed.
  • At agent startup, previous files are reloaded. Unknown domains and old files are removed.
  • Protobuf is used to serialize on disk. See Retry file dump to dump the content of a .retry file.

Documentation

Overview

Package retry provides retry mechanisms for the forwarder.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthHttpTransactionProto        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowHttpTransactionProto          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupHttpTransactionProto = fmt.Errorf("proto: unexpected end of group")
)
View Source
var TransactionPriorityProto_name = map[int32]string{
	0: "NORMAL",
	1: "HIGH",
}
View Source
var TransactionPriorityProto_value = map[string]int32{
	"NORMAL": 0,
	"HIGH":   1,
}

Functions

This section is empty.

Types

type DiskUsageLimit

type DiskUsageLimit struct {
	// contains filtered or unexported fields
}

DiskUsageLimit provides `computeAvailableSpace` which returns the amount of disk space that can be used to store transactions.

func NewDiskUsageLimit

func NewDiskUsageLimit(
	diskPath string,
	disk diskUsageRetriever,
	maxSizeInBytes int64,
	maxDiskRatio float64) *DiskUsageLimit

NewDiskUsageLimit creates a new instance of NewDiskUsageLimit

type EndpointProto

type EndpointProto struct {
	Route                string   `protobuf:"bytes,1,opt,name=route,proto3" json:"route,omitempty"`
	Name                 string   `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*EndpointProto) Descriptor

func (*EndpointProto) Descriptor() ([]byte, []int)

func (*EndpointProto) GetName

func (m *EndpointProto) GetName() string

func (*EndpointProto) GetRoute

func (m *EndpointProto) GetRoute() string

func (*EndpointProto) Marshal

func (m *EndpointProto) Marshal() (dAtA []byte, err error)

func (*EndpointProto) MarshalTo

func (m *EndpointProto) MarshalTo(dAtA []byte) (int, error)

func (*EndpointProto) MarshalToSizedBuffer

func (m *EndpointProto) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*EndpointProto) ProtoMessage

func (*EndpointProto) ProtoMessage()

func (*EndpointProto) Reset

func (m *EndpointProto) Reset()

func (*EndpointProto) Size

func (m *EndpointProto) Size() (n int)

func (*EndpointProto) String

func (m *EndpointProto) String() string

func (*EndpointProto) Unmarshal

func (m *EndpointProto) Unmarshal(dAtA []byte) error

func (*EndpointProto) XXX_DiscardUnknown

func (m *EndpointProto) XXX_DiscardUnknown()

func (*EndpointProto) XXX_Marshal

func (m *EndpointProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*EndpointProto) XXX_Merge

func (m *EndpointProto) XXX_Merge(src proto.Message)

func (*EndpointProto) XXX_Size

func (m *EndpointProto) XXX_Size() int

func (*EndpointProto) XXX_Unmarshal

func (m *EndpointProto) XXX_Unmarshal(b []byte) error

type FileRemovalPolicy

type FileRemovalPolicy struct {
	// contains filtered or unexported fields
}

FileRemovalPolicy handles the removal policy for `.retry` files.

func NewFileRemovalPolicy

func NewFileRemovalPolicy(
	rootPath string,
	outdatedFileDayCount int,
	telemetry FileRemovalPolicyTelemetry) (*FileRemovalPolicy, error)

NewFileRemovalPolicy creates a new instance of FileRemovalPolicy

func (*FileRemovalPolicy) RegisterDomain

func (p *FileRemovalPolicy) RegisterDomain(domainName string) (string, error)

RegisterDomain registers a domain name.

func (*FileRemovalPolicy) RemoveOutdatedFiles

func (p *FileRemovalPolicy) RemoveOutdatedFiles() ([]string, error)

RemoveOutdatedFiles removes the outdated files when a file is older than outDatedFileDayCount days.

func (*FileRemovalPolicy) RemoveUnknownDomains

func (p *FileRemovalPolicy) RemoveUnknownDomains() ([]string, error)

RemoveUnknownDomains remove unknown domains.

type FileRemovalPolicyTelemetry

type FileRemovalPolicyTelemetry struct{}

FileRemovalPolicyTelemetry handles the telemetry for FileRemovalPolicy.

type HTTPTransactionsSerializer

type HTTPTransactionsSerializer struct {
	// contains filtered or unexported fields
}

HTTPTransactionsSerializer serializes Transaction instances. To support a new Transaction implementation, add a new method `func (s *HTTPTransactionsSerializer) Add(transaction NEW_TYPE) error {`

func NewHTTPTransactionsSerializer

func NewHTTPTransactionsSerializer(log log.Component, resolver resolver.DomainResolver) *HTTPTransactionsSerializer

NewHTTPTransactionsSerializer creates a new instance of HTTPTransactionsSerializer

func (*HTTPTransactionsSerializer) Add

Add adds a transaction to the serializer. This function uses references on HTTPTransaction.Payload and HTTPTransaction.Headers and so the transaction must not be updated until a call to `GetBytesAndReset`.

func (*HTTPTransactionsSerializer) Deserialize

func (s *HTTPTransactionsSerializer) Deserialize(bytes []byte) ([]transaction.Transaction, int, error)

Deserialize deserializes from bytes.

func (*HTTPTransactionsSerializer) GetBytesAndReset

func (s *HTTPTransactionsSerializer) GetBytesAndReset() ([]byte, error)

GetBytesAndReset returns as bytes the serialized transactions and reset the transaction collection.

type HeaderValuesProto

type HeaderValuesProto struct {
	Values               []string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*HeaderValuesProto) Descriptor

func (*HeaderValuesProto) Descriptor() ([]byte, []int)

func (*HeaderValuesProto) GetValues

func (m *HeaderValuesProto) GetValues() []string

func (*HeaderValuesProto) Marshal

func (m *HeaderValuesProto) Marshal() (dAtA []byte, err error)

func (*HeaderValuesProto) MarshalTo

func (m *HeaderValuesProto) MarshalTo(dAtA []byte) (int, error)

func (*HeaderValuesProto) MarshalToSizedBuffer

func (m *HeaderValuesProto) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*HeaderValuesProto) ProtoMessage

func (*HeaderValuesProto) ProtoMessage()

func (*HeaderValuesProto) Reset

func (m *HeaderValuesProto) Reset()

func (*HeaderValuesProto) Size

func (m *HeaderValuesProto) Size() (n int)

func (*HeaderValuesProto) String

func (m *HeaderValuesProto) String() string

func (*HeaderValuesProto) Unmarshal

func (m *HeaderValuesProto) Unmarshal(dAtA []byte) error

func (*HeaderValuesProto) XXX_DiscardUnknown

func (m *HeaderValuesProto) XXX_DiscardUnknown()

func (*HeaderValuesProto) XXX_Marshal

func (m *HeaderValuesProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*HeaderValuesProto) XXX_Merge

func (m *HeaderValuesProto) XXX_Merge(src proto.Message)

func (*HeaderValuesProto) XXX_Size

func (m *HeaderValuesProto) XXX_Size() int

func (*HeaderValuesProto) XXX_Unmarshal

func (m *HeaderValuesProto) XXX_Unmarshal(b []byte) error

type HttpTransactionProto

type HttpTransactionProto struct {
	Domain               string                        `protobuf:"bytes,1,opt,name=Domain,proto3" json:"Domain,omitempty"`
	Endpoint             *EndpointProto                `protobuf:"bytes,2,opt,name=Endpoint,proto3" json:"Endpoint,omitempty"`
	Headers              map[string]*HeaderValuesProto `` /* 155-byte string literal not displayed */
	Payload              []byte                        `protobuf:"bytes,4,opt,name=Payload,proto3" json:"Payload,omitempty"`
	ErrorCount           int64                         `protobuf:"varint,5,opt,name=ErrorCount,proto3" json:"ErrorCount,omitempty"`
	CreatedAt            int64                         `protobuf:"varint,6,opt,name=CreatedAt,proto3" json:"CreatedAt,omitempty"`
	Retryable            bool                          `protobuf:"varint,7,opt,name=Retryable,proto3" json:"Retryable,omitempty"`
	Priority             TransactionPriorityProto      `protobuf:"varint,8,opt,name=priority,proto3,enum=retry.TransactionPriorityProto" json:"priority,omitempty"`
	PointCount           int32                         `protobuf:"varint,9,opt,name=PointCount,proto3" json:"PointCount,omitempty"`
	XXX_NoUnkeyedLiteral struct{}                      `json:"-"`
	XXX_unrecognized     []byte                        `json:"-"`
	XXX_sizecache        int32                         `json:"-"`
}

func (*HttpTransactionProto) Descriptor

func (*HttpTransactionProto) Descriptor() ([]byte, []int)

func (*HttpTransactionProto) GetCreatedAt

func (m *HttpTransactionProto) GetCreatedAt() int64

func (*HttpTransactionProto) GetDomain

func (m *HttpTransactionProto) GetDomain() string

func (*HttpTransactionProto) GetEndpoint

func (m *HttpTransactionProto) GetEndpoint() *EndpointProto

func (*HttpTransactionProto) GetErrorCount

func (m *HttpTransactionProto) GetErrorCount() int64

func (*HttpTransactionProto) GetHeaders

func (m *HttpTransactionProto) GetHeaders() map[string]*HeaderValuesProto

func (*HttpTransactionProto) GetPayload

func (m *HttpTransactionProto) GetPayload() []byte

func (*HttpTransactionProto) GetPointCount

func (m *HttpTransactionProto) GetPointCount() int32

func (*HttpTransactionProto) GetPriority

func (*HttpTransactionProto) GetRetryable

func (m *HttpTransactionProto) GetRetryable() bool

func (*HttpTransactionProto) Marshal

func (m *HttpTransactionProto) Marshal() (dAtA []byte, err error)

func (*HttpTransactionProto) MarshalTo

func (m *HttpTransactionProto) MarshalTo(dAtA []byte) (int, error)

func (*HttpTransactionProto) MarshalToSizedBuffer

func (m *HttpTransactionProto) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*HttpTransactionProto) ProtoMessage

func (*HttpTransactionProto) ProtoMessage()

func (*HttpTransactionProto) Reset

func (m *HttpTransactionProto) Reset()

func (*HttpTransactionProto) Size

func (m *HttpTransactionProto) Size() (n int)

func (*HttpTransactionProto) String

func (m *HttpTransactionProto) String() string

func (*HttpTransactionProto) Unmarshal

func (m *HttpTransactionProto) Unmarshal(dAtA []byte) error

func (*HttpTransactionProto) XXX_DiscardUnknown

func (m *HttpTransactionProto) XXX_DiscardUnknown()

func (*HttpTransactionProto) XXX_Marshal

func (m *HttpTransactionProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*HttpTransactionProto) XXX_Merge

func (m *HttpTransactionProto) XXX_Merge(src proto.Message)

func (*HttpTransactionProto) XXX_Size

func (m *HttpTransactionProto) XXX_Size() int

func (*HttpTransactionProto) XXX_Unmarshal

func (m *HttpTransactionProto) XXX_Unmarshal(b []byte) error

type HttpTransactionProtoCollection

type HttpTransactionProtoCollection struct {
	Version              int32                   `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
	Values               []*HttpTransactionProto `protobuf:"bytes,2,rep,name=values,proto3" json:"values,omitempty"`
	XXX_NoUnkeyedLiteral struct{}                `json:"-"`
	XXX_unrecognized     []byte                  `json:"-"`
	XXX_sizecache        int32                   `json:"-"`
}

func (*HttpTransactionProtoCollection) Descriptor

func (*HttpTransactionProtoCollection) Descriptor() ([]byte, []int)

func (*HttpTransactionProtoCollection) GetValues

func (*HttpTransactionProtoCollection) GetVersion

func (m *HttpTransactionProtoCollection) GetVersion() int32

func (*HttpTransactionProtoCollection) Marshal

func (m *HttpTransactionProtoCollection) Marshal() (dAtA []byte, err error)

func (*HttpTransactionProtoCollection) MarshalTo

func (m *HttpTransactionProtoCollection) MarshalTo(dAtA []byte) (int, error)

func (*HttpTransactionProtoCollection) MarshalToSizedBuffer

func (m *HttpTransactionProtoCollection) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*HttpTransactionProtoCollection) ProtoMessage

func (*HttpTransactionProtoCollection) ProtoMessage()

func (*HttpTransactionProtoCollection) Reset

func (m *HttpTransactionProtoCollection) Reset()

func (*HttpTransactionProtoCollection) Size

func (m *HttpTransactionProtoCollection) Size() (n int)

func (*HttpTransactionProtoCollection) String

func (*HttpTransactionProtoCollection) Unmarshal

func (m *HttpTransactionProtoCollection) Unmarshal(dAtA []byte) error

func (*HttpTransactionProtoCollection) XXX_DiscardUnknown

func (m *HttpTransactionProtoCollection) XXX_DiscardUnknown()

func (*HttpTransactionProtoCollection) XXX_Marshal

func (m *HttpTransactionProtoCollection) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*HttpTransactionProtoCollection) XXX_Merge

func (m *HttpTransactionProtoCollection) XXX_Merge(src proto.Message)

func (*HttpTransactionProtoCollection) XXX_Size

func (m *HttpTransactionProtoCollection) XXX_Size() int

func (*HttpTransactionProtoCollection) XXX_Unmarshal

func (m *HttpTransactionProtoCollection) XXX_Unmarshal(b []byte) error

type PointCountTelemetry

type PointCountTelemetry struct {
	// contains filtered or unexported fields
}

PointCountTelemetry sends the number of points successfully sent and the number of points dropped.

func NewPointCountTelemetry

func NewPointCountTelemetry(domain string) *PointCountTelemetry

NewPointCountTelemetry creates a new instance of PointCountTelemetry.

func (*PointCountTelemetry) OnPointDropped

func (t *PointCountTelemetry) OnPointDropped(count int)

OnPointDropped increases the telemetry that counts the number of points droppped

func (*PointCountTelemetry) OnPointSuccessfullySent

func (t *PointCountTelemetry) OnPointSuccessfullySent(count int)

OnPointSuccessfullySent increases the telemetry that counts the number of points successfully sent.

type QueueCapacityStats

type QueueCapacityStats struct {
	Capacity       time.Duration
	BytesPerSec    float64
	AvailableSpace int64
}

QueueCapacityStats represents statistics about the capacity of the retry queue.

type QueueDurationCapacity

type QueueDurationCapacity struct {
	// contains filtered or unexported fields
}

QueueDurationCapacity provides a method to know how much data (express as a duration) the in-memory retry queue and the disk storage retry queue can store. For each domain, the capacity in bytes is the sum of: - the in-memory retry queue capacity. We assume there is enough memory for all in-memory retry queues (one by domain) - the available disk storage * `domain relative speed` where `domain relative speed` is the number of bytes per second for this domain, divided by the total number of bytes per second for all domains. If a domain receives twice the traffic compared to anoter one, twice disk storage capacity is allocated to this domain. Disk storage is shared across domain. If there is no traffic during the time period for a domain, no statistic is reported.

func NewQueueDurationCapacity

func NewQueueDurationCapacity(
	historyDuration time.Duration,
	bucketDuration time.Duration,
	maxMemSizeInBytes int,
	optionalDiskSpace diskSpace) *QueueDurationCapacity

NewQueueDurationCapacity creates a new instance of *QueueDurationCapacity. if `optionalDiskSpace` is not nil, the capacity also use the storage on disk. `historyDuration` is the duration used to compute the number of bytes received per second. `bucketDuration` is the size of a bucket.

func (*QueueDurationCapacity) ComputeCapacity

func (r *QueueDurationCapacity) ComputeCapacity(t time.Time) (map[string]QueueCapacityStats, error)

ComputeCapacity computes the capacity of the retry queue express as a duration. Return statistics by domain name.

func (*QueueDurationCapacity) OnTransaction

func (r *QueueDurationCapacity) OnTransaction(
	transaction *transaction.HTTPTransaction,
	mainDomain string,
	now time.Time) error

OnTransaction must be called for each transaction. Note: because of alternateDomains, `mainDomain` is not necessary the same as transaction.Domain.

type TransactionDiskStorage

type TransactionDiskStorage interface {
	Store([]transaction.Transaction) error
	ExtractLast() ([]transaction.Transaction, error)
	GetDiskSpaceUsed() int64
}

TransactionDiskStorage is an interface to store and load transactions from disk

type TransactionPriorityProto

type TransactionPriorityProto int32
const (
	TransactionPriorityProto_NORMAL TransactionPriorityProto = 0
	TransactionPriorityProto_HIGH   TransactionPriorityProto = 1
)

func (TransactionPriorityProto) EnumDescriptor

func (TransactionPriorityProto) EnumDescriptor() ([]byte, []int)

func (TransactionPriorityProto) String

func (x TransactionPriorityProto) String() string

type TransactionPrioritySorter

type TransactionPrioritySorter interface {
	Sort([]transaction.Transaction)
}

TransactionPrioritySorter is an interface to sort transactions.

type TransactionRetryQueue

type TransactionRetryQueue struct {
	// contains filtered or unexported fields
}

TransactionRetryQueue stores transactions in memory and flush them to disk when the memory limit is exceeded.

func BuildTransactionRetryQueue

func BuildTransactionRetryQueue(
	log log.Component,
	maxMemSizeInBytes int,
	flushToStorageRatio float64,
	optionalDomainFolderPath string,
	optionalDiskUsageLimit *DiskUsageLimit,
	dropPrioritySorter TransactionPrioritySorter,
	resolver resolver.DomainResolver,
	pointCountTelemetry *PointCountTelemetry) *TransactionRetryQueue

BuildTransactionRetryQueue builds a new instance of TransactionRetryQueue

func NewTransactionRetryQueue

func NewTransactionRetryQueue(
	dropPrioritySorter TransactionPrioritySorter,
	optionalTransactionStorage TransactionDiskStorage,
	maxMemSizeInBytes int,
	flushToStorageRatio float64,
	telemetry TransactionRetryQueueTelemetry,
	pointCountTelemetry *PointCountTelemetry) *TransactionRetryQueue

NewTransactionRetryQueue creates a new instance of NewTransactionRetryQueue

func (*TransactionRetryQueue) Add

Add adds a new transaction and flush transactions to disk if the memory limit is exceeded. The amount of transactions flushed to disk is control by `flushToStorageRatio` which is the ratio of the transactions to be flushed. Consider the following payload sizes 10, 20, 30, 40, 15 with `maxMemSizeInBytes=100` and `flushToStorageRatio=0.6` When adding the last payload `15`, the buffer becomes full (10+20+30+40+15 > 100) and 100*0.6=60 bytes must be flushed on disk. The first 3 transactions are flushed to the disk as 10 + 20 + 30 >= 60 If disk serialization failed or is not enabled, remove old transactions such as `currentMemSizeInBytes` <= `maxMemSizeInBytes`

func (*TransactionRetryQueue) ExtractTransactions

func (tc *TransactionRetryQueue) ExtractTransactions() ([]transaction.Transaction, error)

ExtractTransactions extracts transactions from the container. If some transactions exist in memory extract them otherwise extract transactions from the disk. No transactions are in memory after calling this method.

func (*TransactionRetryQueue) FlushToDisk

func (tc *TransactionRetryQueue) FlushToDisk() error

FlushToDisk is called on shutdown and persists all transactions to disk. The normal limits on capacity still apply, and the same rules are followed as during normal operation in terms of which transactions are dropped and which are persisted.

func (*TransactionRetryQueue) GetDiskSpaceUsed

func (tc *TransactionRetryQueue) GetDiskSpaceUsed() int64

GetDiskSpaceUsed returns the current disk space used for storing transactions.

func (*TransactionRetryQueue) GetMaxMemSizeInBytes

func (tc *TransactionRetryQueue) GetMaxMemSizeInBytes() int

GetMaxMemSizeInBytes gets the maximum memory usage for storing transactions

func (*TransactionRetryQueue) GetTransactionCount

func (tc *TransactionRetryQueue) GetTransactionCount() int

GetTransactionCount gets the number of transactions in the container

type TransactionRetryQueueTelemetry

type TransactionRetryQueueTelemetry struct {
	// contains filtered or unexported fields
}

TransactionRetryQueueTelemetry handles the telemetry for TransactionRetryQueue

func NewTransactionRetryQueueTelemetry

func NewTransactionRetryQueueTelemetry(domainName string) TransactionRetryQueueTelemetry

NewTransactionRetryQueueTelemetry creates a new TransactionRetryQueueTelemetry

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL