Documentation ¶
Overview ¶
Package retry provides retry mechanisms for the forwarder.
Index ¶
- Variables
- type DiskUsageLimit
- type EndpointProto
- func (*EndpointProto) Descriptor() ([]byte, []int)
- func (m *EndpointProto) GetName() string
- func (m *EndpointProto) GetRoute() string
- func (m *EndpointProto) Marshal() (dAtA []byte, err error)
- func (m *EndpointProto) MarshalTo(dAtA []byte) (int, error)
- func (m *EndpointProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*EndpointProto) ProtoMessage()
- func (m *EndpointProto) Reset()
- func (m *EndpointProto) Size() (n int)
- func (m *EndpointProto) String() string
- func (m *EndpointProto) Unmarshal(dAtA []byte) error
- func (m *EndpointProto) XXX_DiscardUnknown()
- func (m *EndpointProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *EndpointProto) XXX_Merge(src proto.Message)
- func (m *EndpointProto) XXX_Size() int
- func (m *EndpointProto) XXX_Unmarshal(b []byte) error
- type FileRemovalPolicy
- type FileRemovalPolicyTelemetry
- type HTTPTransactionsSerializer
- type HeaderValuesProto
- func (*HeaderValuesProto) Descriptor() ([]byte, []int)
- func (m *HeaderValuesProto) GetValues() []string
- func (m *HeaderValuesProto) Marshal() (dAtA []byte, err error)
- func (m *HeaderValuesProto) MarshalTo(dAtA []byte) (int, error)
- func (m *HeaderValuesProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*HeaderValuesProto) ProtoMessage()
- func (m *HeaderValuesProto) Reset()
- func (m *HeaderValuesProto) Size() (n int)
- func (m *HeaderValuesProto) String() string
- func (m *HeaderValuesProto) Unmarshal(dAtA []byte) error
- func (m *HeaderValuesProto) XXX_DiscardUnknown()
- func (m *HeaderValuesProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *HeaderValuesProto) XXX_Merge(src proto.Message)
- func (m *HeaderValuesProto) XXX_Size() int
- func (m *HeaderValuesProto) XXX_Unmarshal(b []byte) error
- type HttpTransactionProto
- func (*HttpTransactionProto) Descriptor() ([]byte, []int)
- func (m *HttpTransactionProto) GetCreatedAt() int64
- func (m *HttpTransactionProto) GetDomain() string
- func (m *HttpTransactionProto) GetEndpoint() *EndpointProto
- func (m *HttpTransactionProto) GetErrorCount() int64
- func (m *HttpTransactionProto) GetHeaders() map[string]*HeaderValuesProto
- func (m *HttpTransactionProto) GetPayload() []byte
- func (m *HttpTransactionProto) GetPointCount() int32
- func (m *HttpTransactionProto) GetPriority() TransactionPriorityProto
- func (m *HttpTransactionProto) GetRetryable() bool
- func (m *HttpTransactionProto) Marshal() (dAtA []byte, err error)
- func (m *HttpTransactionProto) MarshalTo(dAtA []byte) (int, error)
- func (m *HttpTransactionProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*HttpTransactionProto) ProtoMessage()
- func (m *HttpTransactionProto) Reset()
- func (m *HttpTransactionProto) Size() (n int)
- func (m *HttpTransactionProto) String() string
- func (m *HttpTransactionProto) Unmarshal(dAtA []byte) error
- func (m *HttpTransactionProto) XXX_DiscardUnknown()
- func (m *HttpTransactionProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *HttpTransactionProto) XXX_Merge(src proto.Message)
- func (m *HttpTransactionProto) XXX_Size() int
- func (m *HttpTransactionProto) XXX_Unmarshal(b []byte) error
- type HttpTransactionProtoCollection
- func (*HttpTransactionProtoCollection) Descriptor() ([]byte, []int)
- func (m *HttpTransactionProtoCollection) GetValues() []*HttpTransactionProto
- func (m *HttpTransactionProtoCollection) GetVersion() int32
- func (m *HttpTransactionProtoCollection) Marshal() (dAtA []byte, err error)
- func (m *HttpTransactionProtoCollection) MarshalTo(dAtA []byte) (int, error)
- func (m *HttpTransactionProtoCollection) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*HttpTransactionProtoCollection) ProtoMessage()
- func (m *HttpTransactionProtoCollection) Reset()
- func (m *HttpTransactionProtoCollection) Size() (n int)
- func (m *HttpTransactionProtoCollection) String() string
- func (m *HttpTransactionProtoCollection) Unmarshal(dAtA []byte) error
- func (m *HttpTransactionProtoCollection) XXX_DiscardUnknown()
- func (m *HttpTransactionProtoCollection) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *HttpTransactionProtoCollection) XXX_Merge(src proto.Message)
- func (m *HttpTransactionProtoCollection) XXX_Size() int
- func (m *HttpTransactionProtoCollection) XXX_Unmarshal(b []byte) error
- type PointCountTelemetry
- type QueueCapacityStats
- type QueueDurationCapacity
- type TransactionDiskStorage
- type TransactionPriorityProto
- type TransactionPrioritySorter
- type TransactionRetryQueue
- func (tc *TransactionRetryQueue) Add(t transaction.Transaction) (int, error)
- func (tc *TransactionRetryQueue) ExtractTransactions() ([]transaction.Transaction, error)
- func (tc *TransactionRetryQueue) FlushToDisk() error
- func (tc *TransactionRetryQueue) GetDiskSpaceUsed() int64
- func (tc *TransactionRetryQueue) GetMaxMemSizeInBytes() int
- func (tc *TransactionRetryQueue) GetTransactionCount() int
- type TransactionRetryQueueTelemetry
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthHttpTransactionProto = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowHttpTransactionProto = fmt.Errorf("proto: integer overflow") ErrUnexpectedEndOfGroupHttpTransactionProto = fmt.Errorf("proto: unexpected end of group") )
var TransactionPriorityProto_name = map[int32]string{
0: "NORMAL",
1: "HIGH",
}
var TransactionPriorityProto_value = map[string]int32{
"NORMAL": 0,
"HIGH": 1,
}
Functions ¶
This section is empty.
Types ¶
type DiskUsageLimit ¶
type DiskUsageLimit struct {
// contains filtered or unexported fields
}
DiskUsageLimit provides `computeAvailableSpace` which returns the amount of disk space that can be used to store transactions.
func NewDiskUsageLimit ¶
func NewDiskUsageLimit( diskPath string, disk diskUsageRetriever, maxSizeInBytes int64, maxDiskRatio float64) *DiskUsageLimit
NewDiskUsageLimit creates a new instance of NewDiskUsageLimit
type EndpointProto ¶
type EndpointProto struct { Route string `protobuf:"bytes,1,opt,name=route,proto3" json:"route,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*EndpointProto) Descriptor ¶
func (*EndpointProto) Descriptor() ([]byte, []int)
func (*EndpointProto) GetName ¶
func (m *EndpointProto) GetName() string
func (*EndpointProto) GetRoute ¶
func (m *EndpointProto) GetRoute() string
func (*EndpointProto) Marshal ¶
func (m *EndpointProto) Marshal() (dAtA []byte, err error)
func (*EndpointProto) MarshalToSizedBuffer ¶
func (m *EndpointProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*EndpointProto) ProtoMessage ¶
func (*EndpointProto) ProtoMessage()
func (*EndpointProto) Reset ¶
func (m *EndpointProto) Reset()
func (*EndpointProto) Size ¶
func (m *EndpointProto) Size() (n int)
func (*EndpointProto) String ¶
func (m *EndpointProto) String() string
func (*EndpointProto) Unmarshal ¶
func (m *EndpointProto) Unmarshal(dAtA []byte) error
func (*EndpointProto) XXX_DiscardUnknown ¶
func (m *EndpointProto) XXX_DiscardUnknown()
func (*EndpointProto) XXX_Marshal ¶
func (m *EndpointProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*EndpointProto) XXX_Merge ¶
func (m *EndpointProto) XXX_Merge(src proto.Message)
func (*EndpointProto) XXX_Size ¶
func (m *EndpointProto) XXX_Size() int
func (*EndpointProto) XXX_Unmarshal ¶
func (m *EndpointProto) XXX_Unmarshal(b []byte) error
type FileRemovalPolicy ¶
type FileRemovalPolicy struct {
// contains filtered or unexported fields
}
FileRemovalPolicy handles the removal policy for `.retry` files.
func NewFileRemovalPolicy ¶
func NewFileRemovalPolicy( rootPath string, outdatedFileDayCount int, telemetry FileRemovalPolicyTelemetry) (*FileRemovalPolicy, error)
NewFileRemovalPolicy creates a new instance of FileRemovalPolicy
func (*FileRemovalPolicy) RegisterDomain ¶
func (p *FileRemovalPolicy) RegisterDomain(domainName string) (string, error)
RegisterDomain registers a domain name.
func (*FileRemovalPolicy) RemoveOutdatedFiles ¶
func (p *FileRemovalPolicy) RemoveOutdatedFiles() ([]string, error)
RemoveOutdatedFiles removes the outdated files when a file is older than outDatedFileDayCount days.
func (*FileRemovalPolicy) RemoveUnknownDomains ¶
func (p *FileRemovalPolicy) RemoveUnknownDomains() ([]string, error)
RemoveUnknownDomains remove unknown domains.
type FileRemovalPolicyTelemetry ¶
type FileRemovalPolicyTelemetry struct{}
FileRemovalPolicyTelemetry handles the telemetry for FileRemovalPolicy.
type HTTPTransactionsSerializer ¶
type HTTPTransactionsSerializer struct {
// contains filtered or unexported fields
}
HTTPTransactionsSerializer serializes Transaction instances. To support a new Transaction implementation, add a new method `func (s *HTTPTransactionsSerializer) Add(transaction NEW_TYPE) error {`
func NewHTTPTransactionsSerializer ¶
func NewHTTPTransactionsSerializer(log log.Component, resolver resolver.DomainResolver) *HTTPTransactionsSerializer
NewHTTPTransactionsSerializer creates a new instance of HTTPTransactionsSerializer
func (*HTTPTransactionsSerializer) Add ¶
func (s *HTTPTransactionsSerializer) Add(transaction *transaction.HTTPTransaction) error
Add adds a transaction to the serializer. This function uses references on HTTPTransaction.Payload and HTTPTransaction.Headers and so the transaction must not be updated until a call to `GetBytesAndReset`.
func (*HTTPTransactionsSerializer) Deserialize ¶
func (s *HTTPTransactionsSerializer) Deserialize(bytes []byte) ([]transaction.Transaction, int, error)
Deserialize deserializes from bytes.
func (*HTTPTransactionsSerializer) GetBytesAndReset ¶
func (s *HTTPTransactionsSerializer) GetBytesAndReset() ([]byte, error)
GetBytesAndReset returns as bytes the serialized transactions and reset the transaction collection.
type HeaderValuesProto ¶
type HeaderValuesProto struct { Values []string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*HeaderValuesProto) Descriptor ¶
func (*HeaderValuesProto) Descriptor() ([]byte, []int)
func (*HeaderValuesProto) GetValues ¶
func (m *HeaderValuesProto) GetValues() []string
func (*HeaderValuesProto) Marshal ¶
func (m *HeaderValuesProto) Marshal() (dAtA []byte, err error)
func (*HeaderValuesProto) MarshalTo ¶
func (m *HeaderValuesProto) MarshalTo(dAtA []byte) (int, error)
func (*HeaderValuesProto) MarshalToSizedBuffer ¶
func (m *HeaderValuesProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*HeaderValuesProto) ProtoMessage ¶
func (*HeaderValuesProto) ProtoMessage()
func (*HeaderValuesProto) Reset ¶
func (m *HeaderValuesProto) Reset()
func (*HeaderValuesProto) Size ¶
func (m *HeaderValuesProto) Size() (n int)
func (*HeaderValuesProto) String ¶
func (m *HeaderValuesProto) String() string
func (*HeaderValuesProto) Unmarshal ¶
func (m *HeaderValuesProto) Unmarshal(dAtA []byte) error
func (*HeaderValuesProto) XXX_DiscardUnknown ¶
func (m *HeaderValuesProto) XXX_DiscardUnknown()
func (*HeaderValuesProto) XXX_Marshal ¶
func (m *HeaderValuesProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*HeaderValuesProto) XXX_Merge ¶
func (m *HeaderValuesProto) XXX_Merge(src proto.Message)
func (*HeaderValuesProto) XXX_Size ¶
func (m *HeaderValuesProto) XXX_Size() int
func (*HeaderValuesProto) XXX_Unmarshal ¶
func (m *HeaderValuesProto) XXX_Unmarshal(b []byte) error
type HttpTransactionProto ¶
type HttpTransactionProto struct { Domain string `protobuf:"bytes,1,opt,name=Domain,proto3" json:"Domain,omitempty"` Endpoint *EndpointProto `protobuf:"bytes,2,opt,name=Endpoint,proto3" json:"Endpoint,omitempty"` Headers map[string]*HeaderValuesProto `` /* 155-byte string literal not displayed */ Payload []byte `protobuf:"bytes,4,opt,name=Payload,proto3" json:"Payload,omitempty"` ErrorCount int64 `protobuf:"varint,5,opt,name=ErrorCount,proto3" json:"ErrorCount,omitempty"` CreatedAt int64 `protobuf:"varint,6,opt,name=CreatedAt,proto3" json:"CreatedAt,omitempty"` Retryable bool `protobuf:"varint,7,opt,name=Retryable,proto3" json:"Retryable,omitempty"` Priority TransactionPriorityProto `protobuf:"varint,8,opt,name=priority,proto3,enum=retry.TransactionPriorityProto" json:"priority,omitempty"` PointCount int32 `protobuf:"varint,9,opt,name=PointCount,proto3" json:"PointCount,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*HttpTransactionProto) Descriptor ¶
func (*HttpTransactionProto) Descriptor() ([]byte, []int)
func (*HttpTransactionProto) GetCreatedAt ¶
func (m *HttpTransactionProto) GetCreatedAt() int64
func (*HttpTransactionProto) GetDomain ¶
func (m *HttpTransactionProto) GetDomain() string
func (*HttpTransactionProto) GetEndpoint ¶
func (m *HttpTransactionProto) GetEndpoint() *EndpointProto
func (*HttpTransactionProto) GetErrorCount ¶
func (m *HttpTransactionProto) GetErrorCount() int64
func (*HttpTransactionProto) GetHeaders ¶
func (m *HttpTransactionProto) GetHeaders() map[string]*HeaderValuesProto
func (*HttpTransactionProto) GetPayload ¶
func (m *HttpTransactionProto) GetPayload() []byte
func (*HttpTransactionProto) GetPointCount ¶
func (m *HttpTransactionProto) GetPointCount() int32
func (*HttpTransactionProto) GetPriority ¶
func (m *HttpTransactionProto) GetPriority() TransactionPriorityProto
func (*HttpTransactionProto) GetRetryable ¶
func (m *HttpTransactionProto) GetRetryable() bool
func (*HttpTransactionProto) Marshal ¶
func (m *HttpTransactionProto) Marshal() (dAtA []byte, err error)
func (*HttpTransactionProto) MarshalTo ¶
func (m *HttpTransactionProto) MarshalTo(dAtA []byte) (int, error)
func (*HttpTransactionProto) MarshalToSizedBuffer ¶
func (m *HttpTransactionProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*HttpTransactionProto) ProtoMessage ¶
func (*HttpTransactionProto) ProtoMessage()
func (*HttpTransactionProto) Reset ¶
func (m *HttpTransactionProto) Reset()
func (*HttpTransactionProto) Size ¶
func (m *HttpTransactionProto) Size() (n int)
func (*HttpTransactionProto) String ¶
func (m *HttpTransactionProto) String() string
func (*HttpTransactionProto) Unmarshal ¶
func (m *HttpTransactionProto) Unmarshal(dAtA []byte) error
func (*HttpTransactionProto) XXX_DiscardUnknown ¶
func (m *HttpTransactionProto) XXX_DiscardUnknown()
func (*HttpTransactionProto) XXX_Marshal ¶
func (m *HttpTransactionProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*HttpTransactionProto) XXX_Merge ¶
func (m *HttpTransactionProto) XXX_Merge(src proto.Message)
func (*HttpTransactionProto) XXX_Size ¶
func (m *HttpTransactionProto) XXX_Size() int
func (*HttpTransactionProto) XXX_Unmarshal ¶
func (m *HttpTransactionProto) XXX_Unmarshal(b []byte) error
type HttpTransactionProtoCollection ¶
type HttpTransactionProtoCollection struct { Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` Values []*HttpTransactionProto `protobuf:"bytes,2,rep,name=values,proto3" json:"values,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*HttpTransactionProtoCollection) Descriptor ¶
func (*HttpTransactionProtoCollection) Descriptor() ([]byte, []int)
func (*HttpTransactionProtoCollection) GetValues ¶
func (m *HttpTransactionProtoCollection) GetValues() []*HttpTransactionProto
func (*HttpTransactionProtoCollection) GetVersion ¶
func (m *HttpTransactionProtoCollection) GetVersion() int32
func (*HttpTransactionProtoCollection) Marshal ¶
func (m *HttpTransactionProtoCollection) Marshal() (dAtA []byte, err error)
func (*HttpTransactionProtoCollection) MarshalTo ¶
func (m *HttpTransactionProtoCollection) MarshalTo(dAtA []byte) (int, error)
func (*HttpTransactionProtoCollection) MarshalToSizedBuffer ¶
func (m *HttpTransactionProtoCollection) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*HttpTransactionProtoCollection) ProtoMessage ¶
func (*HttpTransactionProtoCollection) ProtoMessage()
func (*HttpTransactionProtoCollection) Reset ¶
func (m *HttpTransactionProtoCollection) Reset()
func (*HttpTransactionProtoCollection) Size ¶
func (m *HttpTransactionProtoCollection) Size() (n int)
func (*HttpTransactionProtoCollection) String ¶
func (m *HttpTransactionProtoCollection) String() string
func (*HttpTransactionProtoCollection) Unmarshal ¶
func (m *HttpTransactionProtoCollection) Unmarshal(dAtA []byte) error
func (*HttpTransactionProtoCollection) XXX_DiscardUnknown ¶
func (m *HttpTransactionProtoCollection) XXX_DiscardUnknown()
func (*HttpTransactionProtoCollection) XXX_Marshal ¶
func (m *HttpTransactionProtoCollection) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*HttpTransactionProtoCollection) XXX_Merge ¶
func (m *HttpTransactionProtoCollection) XXX_Merge(src proto.Message)
func (*HttpTransactionProtoCollection) XXX_Size ¶
func (m *HttpTransactionProtoCollection) XXX_Size() int
func (*HttpTransactionProtoCollection) XXX_Unmarshal ¶
func (m *HttpTransactionProtoCollection) XXX_Unmarshal(b []byte) error
type PointCountTelemetry ¶
type PointCountTelemetry struct {
// contains filtered or unexported fields
}
PointCountTelemetry sends the number of points successfully sent and the number of points dropped.
func NewPointCountTelemetry ¶
func NewPointCountTelemetry(domain string) *PointCountTelemetry
NewPointCountTelemetry creates a new instance of PointCountTelemetry.
func (*PointCountTelemetry) OnPointDropped ¶
func (t *PointCountTelemetry) OnPointDropped(count int)
OnPointDropped increases the telemetry that counts the number of points droppped
func (*PointCountTelemetry) OnPointSuccessfullySent ¶
func (t *PointCountTelemetry) OnPointSuccessfullySent(count int)
OnPointSuccessfullySent increases the telemetry that counts the number of points successfully sent.
type QueueCapacityStats ¶
QueueCapacityStats represents statistics about the capacity of the retry queue.
type QueueDurationCapacity ¶
type QueueDurationCapacity struct {
// contains filtered or unexported fields
}
QueueDurationCapacity provides a method to know how much data (express as a duration) the in-memory retry queue and the disk storage retry queue can store. For each domain, the capacity in bytes is the sum of: - the in-memory retry queue capacity. We assume there is enough memory for all in-memory retry queues (one by domain) - the available disk storage * `domain relative speed` where `domain relative speed` is the number of bytes per second for this domain, divided by the total number of bytes per second for all domains. If a domain receives twice the traffic compared to anoter one, twice disk storage capacity is allocated to this domain. Disk storage is shared across domain. If there is no traffic during the time period for a domain, no statistic is reported.
func NewQueueDurationCapacity ¶
func NewQueueDurationCapacity( historyDuration time.Duration, bucketDuration time.Duration, maxMemSizeInBytes int, optionalDiskSpace diskSpace) *QueueDurationCapacity
NewQueueDurationCapacity creates a new instance of *QueueDurationCapacity. if `optionalDiskSpace` is not nil, the capacity also use the storage on disk. `historyDuration` is the duration used to compute the number of bytes received per second. `bucketDuration` is the size of a bucket.
func (*QueueDurationCapacity) ComputeCapacity ¶
func (r *QueueDurationCapacity) ComputeCapacity(t time.Time) (map[string]QueueCapacityStats, error)
ComputeCapacity computes the capacity of the retry queue express as a duration. Return statistics by domain name.
func (*QueueDurationCapacity) OnTransaction ¶
func (r *QueueDurationCapacity) OnTransaction( transaction *transaction.HTTPTransaction, mainDomain string, now time.Time) error
OnTransaction must be called for each transaction. Note: because of alternateDomains, `mainDomain` is not necessary the same as transaction.Domain.
type TransactionDiskStorage ¶
type TransactionDiskStorage interface { Store([]transaction.Transaction) error ExtractLast() ([]transaction.Transaction, error) GetDiskSpaceUsed() int64 }
TransactionDiskStorage is an interface to store and load transactions from disk
type TransactionPriorityProto ¶
type TransactionPriorityProto int32
const ( TransactionPriorityProto_NORMAL TransactionPriorityProto = 0 TransactionPriorityProto_HIGH TransactionPriorityProto = 1 )
func (TransactionPriorityProto) EnumDescriptor ¶
func (TransactionPriorityProto) EnumDescriptor() ([]byte, []int)
func (TransactionPriorityProto) String ¶
func (x TransactionPriorityProto) String() string
type TransactionPrioritySorter ¶
type TransactionPrioritySorter interface {
Sort([]transaction.Transaction)
}
TransactionPrioritySorter is an interface to sort transactions.
type TransactionRetryQueue ¶
type TransactionRetryQueue struct {
// contains filtered or unexported fields
}
TransactionRetryQueue stores transactions in memory and flush them to disk when the memory limit is exceeded.
func BuildTransactionRetryQueue ¶
func BuildTransactionRetryQueue( log log.Component, maxMemSizeInBytes int, flushToStorageRatio float64, optionalDomainFolderPath string, optionalDiskUsageLimit *DiskUsageLimit, dropPrioritySorter TransactionPrioritySorter, resolver resolver.DomainResolver, pointCountTelemetry *PointCountTelemetry) *TransactionRetryQueue
BuildTransactionRetryQueue builds a new instance of TransactionRetryQueue
func NewTransactionRetryQueue ¶
func NewTransactionRetryQueue( dropPrioritySorter TransactionPrioritySorter, optionalTransactionStorage TransactionDiskStorage, maxMemSizeInBytes int, flushToStorageRatio float64, telemetry TransactionRetryQueueTelemetry, pointCountTelemetry *PointCountTelemetry) *TransactionRetryQueue
NewTransactionRetryQueue creates a new instance of NewTransactionRetryQueue
func (*TransactionRetryQueue) Add ¶
func (tc *TransactionRetryQueue) Add(t transaction.Transaction) (int, error)
Add adds a new transaction and flush transactions to disk if the memory limit is exceeded. The amount of transactions flushed to disk is control by `flushToStorageRatio` which is the ratio of the transactions to be flushed. Consider the following payload sizes 10, 20, 30, 40, 15 with `maxMemSizeInBytes=100` and `flushToStorageRatio=0.6` When adding the last payload `15`, the buffer becomes full (10+20+30+40+15 > 100) and 100*0.6=60 bytes must be flushed on disk. The first 3 transactions are flushed to the disk as 10 + 20 + 30 >= 60 If disk serialization failed or is not enabled, remove old transactions such as `currentMemSizeInBytes` <= `maxMemSizeInBytes`
func (*TransactionRetryQueue) ExtractTransactions ¶
func (tc *TransactionRetryQueue) ExtractTransactions() ([]transaction.Transaction, error)
ExtractTransactions extracts transactions from the container. If some transactions exist in memory extract them otherwise extract transactions from the disk. No transactions are in memory after calling this method.
func (*TransactionRetryQueue) FlushToDisk ¶
func (tc *TransactionRetryQueue) FlushToDisk() error
FlushToDisk is called on shutdown and persists all transactions to disk. The normal limits on capacity still apply, and the same rules are followed as during normal operation in terms of which transactions are dropped and which are persisted.
func (*TransactionRetryQueue) GetDiskSpaceUsed ¶
func (tc *TransactionRetryQueue) GetDiskSpaceUsed() int64
GetDiskSpaceUsed returns the current disk space used for storing transactions.
func (*TransactionRetryQueue) GetMaxMemSizeInBytes ¶
func (tc *TransactionRetryQueue) GetMaxMemSizeInBytes() int
GetMaxMemSizeInBytes gets the maximum memory usage for storing transactions
func (*TransactionRetryQueue) GetTransactionCount ¶
func (tc *TransactionRetryQueue) GetTransactionCount() int
GetTransactionCount gets the number of transactions in the container
type TransactionRetryQueueTelemetry ¶
type TransactionRetryQueueTelemetry struct {
// contains filtered or unexported fields
}
TransactionRetryQueueTelemetry handles the telemetry for TransactionRetryQueue
func NewTransactionRetryQueueTelemetry ¶
func NewTransactionRetryQueueTelemetry(domainName string) TransactionRetryQueueTelemetry
NewTransactionRetryQueueTelemetry creates a new TransactionRetryQueueTelemetry