retry

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2021 License: Apache-2.0 Imports: 25 Imported by: 0

README

retry package

Introduction

If the network becomes unavailable, the Agent stores the metrics in memory. The maximum memory usage for storing the metrics is defined by the forwarder_retry_queue_payloads_max_size configuration setting. When this limit is reached and Agent's on-disk transaction storage is not enabled (see below), the metrics are dropped.

The Agent can also store the metrics that have not yet been sent on disk when the memory limit is reached. Enable this capability by setting forwarder_storage_max_size_in_bytes to a positive value indicating the maximum amount of storage space, in bytes, that the Agent can use to store the metrics on disk.

On-disk metrics are stored in the folder defined by the forwarder_storage_path setting, which is by default /opt/datadog-agent/run/transactions_to_retry on Unix systems and C:\ProgramData\Datadog\run\transactions_to_retry on Windows.

To avoid running out of storage space, by default the Agent stores the metrics on disk only if the target disk has not reached 95% capacity. This limit can be adjusted via forwarder_storage_max_disk_ratio setting.

How does it work?

When the retry queue in memory is full and a new transaction need to be added, some transactions from the retry queue are removed and serialized into a new file on disk. The amount of transaction data serialized at a time from the Agent is controlled by the option forwarder_flush_to_disk_mem_ratio.

When the forwarder attempts to retry previously failed transactions, it first retries the HTTP transactions stored in memory. Once the in-memory retry queue is empty, the forwarder then retries the transactions stored in the newest on-disk transaction file and removes it.

Adding transactions to the retry queue

Consider the following example where the in-memory retry queue can store up to 4 transactions.

At the beginning, the retry queue in memory is empty. When the transactions tr1, tr2, tr3 and tr4 fail, they are added sequentially to the retry queue which then becomes full. When trying to add tr5, the oldest transactions are removed from the retry queue in memory and serialized into the file File 1. The number of transactions to serialize to disk is defined as a percent of the size of the retry queue. In this example, when the retry queue is full, 50% of the transactions (in terms of payload size) are serialized on disk. When adding the transactions tr6 and tr7, the retry queue becomes full again and the tr4 and tr3 are serialized to the on-disk transaction file File 2.

Adding transactions to the retry queue

Removing transactions from the retry queue

Once the Agent is able to send data again, initially tr7, tr6 and tr5 are returned since they are the ones residing in the in-memory queue. When those transactions are sent, File 2 content is read and removed, sending the transactions tr4 and tr3. Finally, File 1 is read and removed, sending the tr2 and tr1 to the intake.

Removing transactions from the retry queue

Implementations notes
  • There is a single retry queue for all the endpoints.
  • The files are read and written as a whole which is efficient as few reads and writes on disk are performed.
  • At agent startup, previous files are reloaded. Unknown domains and old files are removed.
  • Protobuf is used to serialize on disk. See Retry file dump to dump the content of a .retry file.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthHttpTransactionProto        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowHttpTransactionProto          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupHttpTransactionProto = fmt.Errorf("proto: unexpected end of group")
)
View Source
var TransactionPriorityProto_name = map[int32]string{
	0: "NORMAL",
	1: "HIGH",
}
View Source
var TransactionPriorityProto_value = map[string]int32{
	"NORMAL": 0,
	"HIGH":   1,
}

Functions

This section is empty.

Types

type EndpointProto

type EndpointProto struct {
	Route                string   `protobuf:"bytes,1,opt,name=route,proto3" json:"route,omitempty"`
	Name                 string   `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*EndpointProto) Descriptor

func (*EndpointProto) Descriptor() ([]byte, []int)

func (*EndpointProto) GetName

func (m *EndpointProto) GetName() string

func (*EndpointProto) GetRoute

func (m *EndpointProto) GetRoute() string

func (*EndpointProto) Marshal

func (m *EndpointProto) Marshal() (dAtA []byte, err error)

func (*EndpointProto) MarshalTo

func (m *EndpointProto) MarshalTo(dAtA []byte) (int, error)

func (*EndpointProto) MarshalToSizedBuffer

func (m *EndpointProto) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*EndpointProto) ProtoMessage

func (*EndpointProto) ProtoMessage()

func (*EndpointProto) Reset

func (m *EndpointProto) Reset()

func (*EndpointProto) Size

func (m *EndpointProto) Size() (n int)

func (*EndpointProto) String

func (m *EndpointProto) String() string

func (*EndpointProto) Unmarshal

func (m *EndpointProto) Unmarshal(dAtA []byte) error

func (*EndpointProto) XXX_DiscardUnknown

func (m *EndpointProto) XXX_DiscardUnknown()

func (*EndpointProto) XXX_Marshal

func (m *EndpointProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*EndpointProto) XXX_Merge

func (m *EndpointProto) XXX_Merge(src proto.Message)

func (*EndpointProto) XXX_Size

func (m *EndpointProto) XXX_Size() int

func (*EndpointProto) XXX_Unmarshal

func (m *EndpointProto) XXX_Unmarshal(b []byte) error

type FileRemovalPolicy

type FileRemovalPolicy struct {
	// contains filtered or unexported fields
}

FileRemovalPolicy handles the removal policy for `.retry` files.

func NewFileRemovalPolicy

func NewFileRemovalPolicy(
	rootPath string,
	outdatedFileDayCount int,
	telemetry FileRemovalPolicyTelemetry) (*FileRemovalPolicy, error)

NewFileRemovalPolicy creates a new instance of FileRemovalPolicy

func (*FileRemovalPolicy) RegisterDomain

func (p *FileRemovalPolicy) RegisterDomain(domainName string) (string, error)

RegisterDomain registers a domain name.

func (*FileRemovalPolicy) RemoveOutdatedFiles

func (p *FileRemovalPolicy) RemoveOutdatedFiles() ([]string, error)

RemoveOutdatedFiles removes the outdated files when a file is older than outDatedFileDayCount days.

func (*FileRemovalPolicy) RemoveUnknownDomains

func (p *FileRemovalPolicy) RemoveUnknownDomains() ([]string, error)

RemoveUnknownDomains remove unknown domains.

type FileRemovalPolicyTelemetry

type FileRemovalPolicyTelemetry struct{}

FileRemovalPolicyTelemetry handles the telemetry for FileRemovalPolicy.

type HTTPTransactionsSerializer

type HTTPTransactionsSerializer struct {
	// contains filtered or unexported fields
}

HTTPTransactionsSerializer serializes Transaction instances. To support a new Transaction implementation, add a new method `func (s *HTTPTransactionsSerializer) Add(transaction NEW_TYPE) error {`

func NewHTTPTransactionsSerializer

func NewHTTPTransactionsSerializer(domain string, apiKeys []string) *HTTPTransactionsSerializer

NewHTTPTransactionsSerializer creates a new instance of HTTPTransactionsSerializer

func (*HTTPTransactionsSerializer) Add

Add adds a transaction to the serializer. This function uses references on HTTPTransaction.Payload and HTTPTransaction.Headers and so the transaction must not be updated until a call to `GetBytesAndReset`.

func (*HTTPTransactionsSerializer) Deserialize

func (s *HTTPTransactionsSerializer) Deserialize(bytes []byte) ([]transaction.Transaction, int, error)

Deserialize deserializes from bytes.

func (*HTTPTransactionsSerializer) GetBytesAndReset

func (s *HTTPTransactionsSerializer) GetBytesAndReset() ([]byte, error)

GetBytesAndReset returns as bytes the serialized transactions and reset the transaction collection.

type HeaderValuesProto

type HeaderValuesProto struct {
	Values               []string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*HeaderValuesProto) Descriptor

func (*HeaderValuesProto) Descriptor() ([]byte, []int)

func (*HeaderValuesProto) GetValues

func (m *HeaderValuesProto) GetValues() []string

func (*HeaderValuesProto) Marshal

func (m *HeaderValuesProto) Marshal() (dAtA []byte, err error)

func (*HeaderValuesProto) MarshalTo

func (m *HeaderValuesProto) MarshalTo(dAtA []byte) (int, error)

func (*HeaderValuesProto) MarshalToSizedBuffer

func (m *HeaderValuesProto) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*HeaderValuesProto) ProtoMessage

func (*HeaderValuesProto) ProtoMessage()

func (*HeaderValuesProto) Reset

func (m *HeaderValuesProto) Reset()

func (*HeaderValuesProto) Size

func (m *HeaderValuesProto) Size() (n int)

func (*HeaderValuesProto) String

func (m *HeaderValuesProto) String() string

func (*HeaderValuesProto) Unmarshal

func (m *HeaderValuesProto) Unmarshal(dAtA []byte) error

func (*HeaderValuesProto) XXX_DiscardUnknown

func (m *HeaderValuesProto) XXX_DiscardUnknown()

func (*HeaderValuesProto) XXX_Marshal

func (m *HeaderValuesProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*HeaderValuesProto) XXX_Merge

func (m *HeaderValuesProto) XXX_Merge(src proto.Message)

func (*HeaderValuesProto) XXX_Size

func (m *HeaderValuesProto) XXX_Size() int

func (*HeaderValuesProto) XXX_Unmarshal

func (m *HeaderValuesProto) XXX_Unmarshal(b []byte) error

type HttpTransactionProto

type HttpTransactionProto struct {
	Domain               string                        `protobuf:"bytes,1,opt,name=Domain,proto3" json:"Domain,omitempty"`
	Endpoint             *EndpointProto                `protobuf:"bytes,2,opt,name=Endpoint,proto3" json:"Endpoint,omitempty"`
	Headers              map[string]*HeaderValuesProto `` /* 155-byte string literal not displayed */
	Payload              []byte                        `protobuf:"bytes,4,opt,name=Payload,proto3" json:"Payload,omitempty"`
	ErrorCount           int64                         `protobuf:"varint,5,opt,name=ErrorCount,proto3" json:"ErrorCount,omitempty"`
	CreatedAt            int64                         `protobuf:"varint,6,opt,name=CreatedAt,proto3" json:"CreatedAt,omitempty"`
	Retryable            bool                          `protobuf:"varint,7,opt,name=Retryable,proto3" json:"Retryable,omitempty"`
	Priority             TransactionPriorityProto      `protobuf:"varint,8,opt,name=priority,proto3,enum=forwarder.TransactionPriorityProto" json:"priority,omitempty"`
	XXX_NoUnkeyedLiteral struct{}                      `json:"-"`
	XXX_unrecognized     []byte                        `json:"-"`
	XXX_sizecache        int32                         `json:"-"`
}

func (*HttpTransactionProto) Descriptor

func (*HttpTransactionProto) Descriptor() ([]byte, []int)

func (*HttpTransactionProto) GetCreatedAt

func (m *HttpTransactionProto) GetCreatedAt() int64

func (*HttpTransactionProto) GetDomain

func (m *HttpTransactionProto) GetDomain() string

func (*HttpTransactionProto) GetEndpoint

func (m *HttpTransactionProto) GetEndpoint() *EndpointProto

func (*HttpTransactionProto) GetErrorCount

func (m *HttpTransactionProto) GetErrorCount() int64

func (*HttpTransactionProto) GetHeaders

func (m *HttpTransactionProto) GetHeaders() map[string]*HeaderValuesProto

func (*HttpTransactionProto) GetPayload

func (m *HttpTransactionProto) GetPayload() []byte

func (*HttpTransactionProto) GetPriority

func (*HttpTransactionProto) GetRetryable

func (m *HttpTransactionProto) GetRetryable() bool

func (*HttpTransactionProto) Marshal

func (m *HttpTransactionProto) Marshal() (dAtA []byte, err error)

func (*HttpTransactionProto) MarshalTo

func (m *HttpTransactionProto) MarshalTo(dAtA []byte) (int, error)

func (*HttpTransactionProto) MarshalToSizedBuffer

func (m *HttpTransactionProto) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*HttpTransactionProto) ProtoMessage

func (*HttpTransactionProto) ProtoMessage()

func (*HttpTransactionProto) Reset

func (m *HttpTransactionProto) Reset()

func (*HttpTransactionProto) Size

func (m *HttpTransactionProto) Size() (n int)

func (*HttpTransactionProto) String

func (m *HttpTransactionProto) String() string

func (*HttpTransactionProto) Unmarshal

func (m *HttpTransactionProto) Unmarshal(dAtA []byte) error

func (*HttpTransactionProto) XXX_DiscardUnknown

func (m *HttpTransactionProto) XXX_DiscardUnknown()

func (*HttpTransactionProto) XXX_Marshal

func (m *HttpTransactionProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*HttpTransactionProto) XXX_Merge

func (m *HttpTransactionProto) XXX_Merge(src proto.Message)

func (*HttpTransactionProto) XXX_Size

func (m *HttpTransactionProto) XXX_Size() int

func (*HttpTransactionProto) XXX_Unmarshal

func (m *HttpTransactionProto) XXX_Unmarshal(b []byte) error

type HttpTransactionProtoCollection

type HttpTransactionProtoCollection struct {
	Version              int32                   `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
	Values               []*HttpTransactionProto `protobuf:"bytes,2,rep,name=values,proto3" json:"values,omitempty"`
	XXX_NoUnkeyedLiteral struct{}                `json:"-"`
	XXX_unrecognized     []byte                  `json:"-"`
	XXX_sizecache        int32                   `json:"-"`
}

func (*HttpTransactionProtoCollection) Descriptor

func (*HttpTransactionProtoCollection) Descriptor() ([]byte, []int)

func (*HttpTransactionProtoCollection) GetValues

func (*HttpTransactionProtoCollection) GetVersion

func (m *HttpTransactionProtoCollection) GetVersion() int32

func (*HttpTransactionProtoCollection) Marshal

func (m *HttpTransactionProtoCollection) Marshal() (dAtA []byte, err error)

func (*HttpTransactionProtoCollection) MarshalTo

func (m *HttpTransactionProtoCollection) MarshalTo(dAtA []byte) (int, error)

func (*HttpTransactionProtoCollection) MarshalToSizedBuffer

func (m *HttpTransactionProtoCollection) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*HttpTransactionProtoCollection) ProtoMessage

func (*HttpTransactionProtoCollection) ProtoMessage()

func (*HttpTransactionProtoCollection) Reset

func (m *HttpTransactionProtoCollection) Reset()

func (*HttpTransactionProtoCollection) Size

func (m *HttpTransactionProtoCollection) Size() (n int)

func (*HttpTransactionProtoCollection) String

func (*HttpTransactionProtoCollection) Unmarshal

func (m *HttpTransactionProtoCollection) Unmarshal(dAtA []byte) error

func (*HttpTransactionProtoCollection) XXX_DiscardUnknown

func (m *HttpTransactionProtoCollection) XXX_DiscardUnknown()

func (*HttpTransactionProtoCollection) XXX_Marshal

func (m *HttpTransactionProtoCollection) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*HttpTransactionProtoCollection) XXX_Merge

func (m *HttpTransactionProtoCollection) XXX_Merge(src proto.Message)

func (*HttpTransactionProtoCollection) XXX_Size

func (m *HttpTransactionProtoCollection) XXX_Size() int

func (*HttpTransactionProtoCollection) XXX_Unmarshal

func (m *HttpTransactionProtoCollection) XXX_Unmarshal(b []byte) error

type TransactionPriorityProto

type TransactionPriorityProto int32
const (
	TransactionPriorityProto_NORMAL TransactionPriorityProto = 0
	TransactionPriorityProto_HIGH   TransactionPriorityProto = 1
)

func (TransactionPriorityProto) EnumDescriptor

func (TransactionPriorityProto) EnumDescriptor() ([]byte, []int)

func (TransactionPriorityProto) String

func (x TransactionPriorityProto) String() string

type TransactionPrioritySorter

type TransactionPrioritySorter interface {
	Sort([]transaction.Transaction)
}

TransactionPrioritySorter is an interface to sort transactions.

type TransactionRetryQueue

type TransactionRetryQueue struct {
	// contains filtered or unexported fields
}

TransactionRetryQueue stores transactions in memory and flush them to disk when the memory limit is exceeded.

func BuildTransactionRetryQueue

func BuildTransactionRetryQueue(
	maxMemSizeInBytes int,
	flushToStorageRatio float64,
	optionalDomainFolderPath string,
	storageMaxSize int64,
	dropPrioritySorter TransactionPrioritySorter,
	domain string,
	apiKeys []string) *TransactionRetryQueue

BuildTransactionRetryQueue builds a new instance of TransactionRetryQueue

func NewTransactionRetryQueue

func NewTransactionRetryQueue(
	dropPrioritySorter TransactionPrioritySorter,
	optionalTransactionSerializer TransactionSerializer,
	maxMemSizeInBytes int,
	flushToStorageRatio float64,
	telemetry TransactionRetryQueueTelemetry) *TransactionRetryQueue

NewTransactionRetryQueue creates a new instance of NewTransactionRetryQueue

func (*TransactionRetryQueue) Add

Add adds a new transaction and flush transactions to disk if the memory limit is exceeded. The amount of transactions flushed to disk is control by `flushToStorageRatio` which is the ratio of the transactions to be flushed. Consider the following payload sizes 10, 20, 30, 40, 15 with `maxMemSizeInBytes=100` and `flushToStorageRatio=0.6` When adding the last payload `15`, the buffer becomes full (10+20+30+40+15 > 100) and 100*0.6=60 bytes must be flushed on disk. The first 3 transactions are flushed to the disk as 10 + 20 + 30 >= 60 If disk serialization failed or is not enabled, remove old transactions such as `currentMemSizeInBytes` <= `maxMemSizeInBytes`

func (*TransactionRetryQueue) ExtractTransactions

func (tc *TransactionRetryQueue) ExtractTransactions() ([]transaction.Transaction, error)

ExtractTransactions extracts transactions from the container. If some transactions exist in memory extract them otherwise extract transactions from the disk. No transactions are in memory after calling this method.

func (*TransactionRetryQueue) GetMaxMemSizeInBytes

func (tc *TransactionRetryQueue) GetMaxMemSizeInBytes() int

GetMaxMemSizeInBytes gets the maximum memory usage for storing transactions

func (*TransactionRetryQueue) GetTransactionCount

func (tc *TransactionRetryQueue) GetTransactionCount() int

GetTransactionCount gets the number of transactions in the container

type TransactionRetryQueueTelemetry

type TransactionRetryQueueTelemetry struct {
	// contains filtered or unexported fields
}

TransactionRetryQueueTelemetry handles the telemetry for TransactionRetryQueue

func NewTransactionRetryQueueTelemetry

func NewTransactionRetryQueueTelemetry(domainName string) TransactionRetryQueueTelemetry

NewTransactionRetryQueueTelemetry creates a new TransactionRetryQueueTelemetry

type TransactionSerializer

type TransactionSerializer interface {
	Serialize([]transaction.Transaction) error
	Deserialize() ([]transaction.Transaction, error)
}

TransactionSerializer is an interface to serialize / deserialize transactions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL