Documentation ¶
Index ¶
- Variables
- type EndpointProto
- func (*EndpointProto) Descriptor() ([]byte, []int)
- func (m *EndpointProto) GetName() string
- func (m *EndpointProto) GetRoute() string
- func (m *EndpointProto) Marshal() (dAtA []byte, err error)
- func (m *EndpointProto) MarshalTo(dAtA []byte) (int, error)
- func (m *EndpointProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*EndpointProto) ProtoMessage()
- func (m *EndpointProto) Reset()
- func (m *EndpointProto) Size() (n int)
- func (m *EndpointProto) String() string
- func (m *EndpointProto) Unmarshal(dAtA []byte) error
- func (m *EndpointProto) XXX_DiscardUnknown()
- func (m *EndpointProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *EndpointProto) XXX_Merge(src proto.Message)
- func (m *EndpointProto) XXX_Size() int
- func (m *EndpointProto) XXX_Unmarshal(b []byte) error
- type FileRemovalPolicy
- type FileRemovalPolicyTelemetry
- type HTTPTransactionsSerializer
- type HeaderValuesProto
- func (*HeaderValuesProto) Descriptor() ([]byte, []int)
- func (m *HeaderValuesProto) GetValues() []string
- func (m *HeaderValuesProto) Marshal() (dAtA []byte, err error)
- func (m *HeaderValuesProto) MarshalTo(dAtA []byte) (int, error)
- func (m *HeaderValuesProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*HeaderValuesProto) ProtoMessage()
- func (m *HeaderValuesProto) Reset()
- func (m *HeaderValuesProto) Size() (n int)
- func (m *HeaderValuesProto) String() string
- func (m *HeaderValuesProto) Unmarshal(dAtA []byte) error
- func (m *HeaderValuesProto) XXX_DiscardUnknown()
- func (m *HeaderValuesProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *HeaderValuesProto) XXX_Merge(src proto.Message)
- func (m *HeaderValuesProto) XXX_Size() int
- func (m *HeaderValuesProto) XXX_Unmarshal(b []byte) error
- type HttpTransactionProto
- func (*HttpTransactionProto) Descriptor() ([]byte, []int)
- func (m *HttpTransactionProto) GetCreatedAt() int64
- func (m *HttpTransactionProto) GetDomain() string
- func (m *HttpTransactionProto) GetEndpoint() *EndpointProto
- func (m *HttpTransactionProto) GetErrorCount() int64
- func (m *HttpTransactionProto) GetHeaders() map[string]*HeaderValuesProto
- func (m *HttpTransactionProto) GetPayload() []byte
- func (m *HttpTransactionProto) GetPriority() TransactionPriorityProto
- func (m *HttpTransactionProto) GetRetryable() bool
- func (m *HttpTransactionProto) Marshal() (dAtA []byte, err error)
- func (m *HttpTransactionProto) MarshalTo(dAtA []byte) (int, error)
- func (m *HttpTransactionProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*HttpTransactionProto) ProtoMessage()
- func (m *HttpTransactionProto) Reset()
- func (m *HttpTransactionProto) Size() (n int)
- func (m *HttpTransactionProto) String() string
- func (m *HttpTransactionProto) Unmarshal(dAtA []byte) error
- func (m *HttpTransactionProto) XXX_DiscardUnknown()
- func (m *HttpTransactionProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *HttpTransactionProto) XXX_Merge(src proto.Message)
- func (m *HttpTransactionProto) XXX_Size() int
- func (m *HttpTransactionProto) XXX_Unmarshal(b []byte) error
- type HttpTransactionProtoCollection
- func (*HttpTransactionProtoCollection) Descriptor() ([]byte, []int)
- func (m *HttpTransactionProtoCollection) GetValues() []*HttpTransactionProto
- func (m *HttpTransactionProtoCollection) GetVersion() int32
- func (m *HttpTransactionProtoCollection) Marshal() (dAtA []byte, err error)
- func (m *HttpTransactionProtoCollection) MarshalTo(dAtA []byte) (int, error)
- func (m *HttpTransactionProtoCollection) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*HttpTransactionProtoCollection) ProtoMessage()
- func (m *HttpTransactionProtoCollection) Reset()
- func (m *HttpTransactionProtoCollection) Size() (n int)
- func (m *HttpTransactionProtoCollection) String() string
- func (m *HttpTransactionProtoCollection) Unmarshal(dAtA []byte) error
- func (m *HttpTransactionProtoCollection) XXX_DiscardUnknown()
- func (m *HttpTransactionProtoCollection) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *HttpTransactionProtoCollection) XXX_Merge(src proto.Message)
- func (m *HttpTransactionProtoCollection) XXX_Size() int
- func (m *HttpTransactionProtoCollection) XXX_Unmarshal(b []byte) error
- type TransactionPriorityProto
- type TransactionPrioritySorter
- type TransactionRetryQueue
- type TransactionRetryQueueTelemetry
- type TransactionSerializer
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthHttpTransactionProto = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowHttpTransactionProto = fmt.Errorf("proto: integer overflow") ErrUnexpectedEndOfGroupHttpTransactionProto = fmt.Errorf("proto: unexpected end of group") )
var TransactionPriorityProto_name = map[int32]string{
0: "NORMAL",
1: "HIGH",
}
var TransactionPriorityProto_value = map[string]int32{
"NORMAL": 0,
"HIGH": 1,
}
Functions ¶
This section is empty.
Types ¶
type EndpointProto ¶
type EndpointProto struct { Route string `protobuf:"bytes,1,opt,name=route,proto3" json:"route,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*EndpointProto) Descriptor ¶
func (*EndpointProto) Descriptor() ([]byte, []int)
func (*EndpointProto) GetName ¶
func (m *EndpointProto) GetName() string
func (*EndpointProto) GetRoute ¶
func (m *EndpointProto) GetRoute() string
func (*EndpointProto) Marshal ¶
func (m *EndpointProto) Marshal() (dAtA []byte, err error)
func (*EndpointProto) MarshalToSizedBuffer ¶
func (m *EndpointProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*EndpointProto) ProtoMessage ¶
func (*EndpointProto) ProtoMessage()
func (*EndpointProto) Reset ¶
func (m *EndpointProto) Reset()
func (*EndpointProto) Size ¶
func (m *EndpointProto) Size() (n int)
func (*EndpointProto) String ¶
func (m *EndpointProto) String() string
func (*EndpointProto) Unmarshal ¶
func (m *EndpointProto) Unmarshal(dAtA []byte) error
func (*EndpointProto) XXX_DiscardUnknown ¶
func (m *EndpointProto) XXX_DiscardUnknown()
func (*EndpointProto) XXX_Marshal ¶
func (m *EndpointProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*EndpointProto) XXX_Merge ¶
func (m *EndpointProto) XXX_Merge(src proto.Message)
func (*EndpointProto) XXX_Size ¶
func (m *EndpointProto) XXX_Size() int
func (*EndpointProto) XXX_Unmarshal ¶
func (m *EndpointProto) XXX_Unmarshal(b []byte) error
type FileRemovalPolicy ¶
type FileRemovalPolicy struct {
// contains filtered or unexported fields
}
FileRemovalPolicy handles the removal policy for `.retry` files.
func NewFileRemovalPolicy ¶
func NewFileRemovalPolicy( rootPath string, outdatedFileDayCount int, telemetry FileRemovalPolicyTelemetry) (*FileRemovalPolicy, error)
NewFileRemovalPolicy creates a new instance of FileRemovalPolicy
func (*FileRemovalPolicy) RegisterDomain ¶
func (p *FileRemovalPolicy) RegisterDomain(domainName string) (string, error)
RegisterDomain registers a domain name.
func (*FileRemovalPolicy) RemoveOutdatedFiles ¶
func (p *FileRemovalPolicy) RemoveOutdatedFiles() ([]string, error)
RemoveOutdatedFiles removes the outdated files when a file is older than outDatedFileDayCount days.
func (*FileRemovalPolicy) RemoveUnknownDomains ¶
func (p *FileRemovalPolicy) RemoveUnknownDomains() ([]string, error)
RemoveUnknownDomains remove unknown domains.
type FileRemovalPolicyTelemetry ¶
type FileRemovalPolicyTelemetry struct{}
FileRemovalPolicyTelemetry handles the telemetry for FileRemovalPolicy.
type HTTPTransactionsSerializer ¶
type HTTPTransactionsSerializer struct {
// contains filtered or unexported fields
}
HTTPTransactionsSerializer serializes Transaction instances. To support a new Transaction implementation, add a new method `func (s *HTTPTransactionsSerializer) Add(transaction NEW_TYPE) error {`
func NewHTTPTransactionsSerializer ¶
func NewHTTPTransactionsSerializer(domain string, apiKeys []string) *HTTPTransactionsSerializer
NewHTTPTransactionsSerializer creates a new instance of HTTPTransactionsSerializer
func (*HTTPTransactionsSerializer) Add ¶
func (s *HTTPTransactionsSerializer) Add(transaction *transaction.HTTPTransaction) error
Add adds a transaction to the serializer. This function uses references on HTTPTransaction.Payload and HTTPTransaction.Headers and so the transaction must not be updated until a call to `GetBytesAndReset`.
func (*HTTPTransactionsSerializer) Deserialize ¶
func (s *HTTPTransactionsSerializer) Deserialize(bytes []byte) ([]transaction.Transaction, int, error)
Deserialize deserializes from bytes.
func (*HTTPTransactionsSerializer) GetBytesAndReset ¶
func (s *HTTPTransactionsSerializer) GetBytesAndReset() ([]byte, error)
GetBytesAndReset returns as bytes the serialized transactions and reset the transaction collection.
type HeaderValuesProto ¶
type HeaderValuesProto struct { Values []string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*HeaderValuesProto) Descriptor ¶
func (*HeaderValuesProto) Descriptor() ([]byte, []int)
func (*HeaderValuesProto) GetValues ¶
func (m *HeaderValuesProto) GetValues() []string
func (*HeaderValuesProto) Marshal ¶
func (m *HeaderValuesProto) Marshal() (dAtA []byte, err error)
func (*HeaderValuesProto) MarshalTo ¶
func (m *HeaderValuesProto) MarshalTo(dAtA []byte) (int, error)
func (*HeaderValuesProto) MarshalToSizedBuffer ¶
func (m *HeaderValuesProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*HeaderValuesProto) ProtoMessage ¶
func (*HeaderValuesProto) ProtoMessage()
func (*HeaderValuesProto) Reset ¶
func (m *HeaderValuesProto) Reset()
func (*HeaderValuesProto) Size ¶
func (m *HeaderValuesProto) Size() (n int)
func (*HeaderValuesProto) String ¶
func (m *HeaderValuesProto) String() string
func (*HeaderValuesProto) Unmarshal ¶
func (m *HeaderValuesProto) Unmarshal(dAtA []byte) error
func (*HeaderValuesProto) XXX_DiscardUnknown ¶
func (m *HeaderValuesProto) XXX_DiscardUnknown()
func (*HeaderValuesProto) XXX_Marshal ¶
func (m *HeaderValuesProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*HeaderValuesProto) XXX_Merge ¶
func (m *HeaderValuesProto) XXX_Merge(src proto.Message)
func (*HeaderValuesProto) XXX_Size ¶
func (m *HeaderValuesProto) XXX_Size() int
func (*HeaderValuesProto) XXX_Unmarshal ¶
func (m *HeaderValuesProto) XXX_Unmarshal(b []byte) error
type HttpTransactionProto ¶
type HttpTransactionProto struct { Domain string `protobuf:"bytes,1,opt,name=Domain,proto3" json:"Domain,omitempty"` Endpoint *EndpointProto `protobuf:"bytes,2,opt,name=Endpoint,proto3" json:"Endpoint,omitempty"` Headers map[string]*HeaderValuesProto `` /* 155-byte string literal not displayed */ Payload []byte `protobuf:"bytes,4,opt,name=Payload,proto3" json:"Payload,omitempty"` ErrorCount int64 `protobuf:"varint,5,opt,name=ErrorCount,proto3" json:"ErrorCount,omitempty"` CreatedAt int64 `protobuf:"varint,6,opt,name=CreatedAt,proto3" json:"CreatedAt,omitempty"` Retryable bool `protobuf:"varint,7,opt,name=Retryable,proto3" json:"Retryable,omitempty"` Priority TransactionPriorityProto `protobuf:"varint,8,opt,name=priority,proto3,enum=forwarder.TransactionPriorityProto" json:"priority,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*HttpTransactionProto) Descriptor ¶
func (*HttpTransactionProto) Descriptor() ([]byte, []int)
func (*HttpTransactionProto) GetCreatedAt ¶
func (m *HttpTransactionProto) GetCreatedAt() int64
func (*HttpTransactionProto) GetDomain ¶
func (m *HttpTransactionProto) GetDomain() string
func (*HttpTransactionProto) GetEndpoint ¶
func (m *HttpTransactionProto) GetEndpoint() *EndpointProto
func (*HttpTransactionProto) GetErrorCount ¶
func (m *HttpTransactionProto) GetErrorCount() int64
func (*HttpTransactionProto) GetHeaders ¶
func (m *HttpTransactionProto) GetHeaders() map[string]*HeaderValuesProto
func (*HttpTransactionProto) GetPayload ¶
func (m *HttpTransactionProto) GetPayload() []byte
func (*HttpTransactionProto) GetPriority ¶
func (m *HttpTransactionProto) GetPriority() TransactionPriorityProto
func (*HttpTransactionProto) GetRetryable ¶
func (m *HttpTransactionProto) GetRetryable() bool
func (*HttpTransactionProto) Marshal ¶
func (m *HttpTransactionProto) Marshal() (dAtA []byte, err error)
func (*HttpTransactionProto) MarshalTo ¶
func (m *HttpTransactionProto) MarshalTo(dAtA []byte) (int, error)
func (*HttpTransactionProto) MarshalToSizedBuffer ¶
func (m *HttpTransactionProto) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*HttpTransactionProto) ProtoMessage ¶
func (*HttpTransactionProto) ProtoMessage()
func (*HttpTransactionProto) Reset ¶
func (m *HttpTransactionProto) Reset()
func (*HttpTransactionProto) Size ¶
func (m *HttpTransactionProto) Size() (n int)
func (*HttpTransactionProto) String ¶
func (m *HttpTransactionProto) String() string
func (*HttpTransactionProto) Unmarshal ¶
func (m *HttpTransactionProto) Unmarshal(dAtA []byte) error
func (*HttpTransactionProto) XXX_DiscardUnknown ¶
func (m *HttpTransactionProto) XXX_DiscardUnknown()
func (*HttpTransactionProto) XXX_Marshal ¶
func (m *HttpTransactionProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*HttpTransactionProto) XXX_Merge ¶
func (m *HttpTransactionProto) XXX_Merge(src proto.Message)
func (*HttpTransactionProto) XXX_Size ¶
func (m *HttpTransactionProto) XXX_Size() int
func (*HttpTransactionProto) XXX_Unmarshal ¶
func (m *HttpTransactionProto) XXX_Unmarshal(b []byte) error
type HttpTransactionProtoCollection ¶
type HttpTransactionProtoCollection struct { Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` Values []*HttpTransactionProto `protobuf:"bytes,2,rep,name=values,proto3" json:"values,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*HttpTransactionProtoCollection) Descriptor ¶
func (*HttpTransactionProtoCollection) Descriptor() ([]byte, []int)
func (*HttpTransactionProtoCollection) GetValues ¶
func (m *HttpTransactionProtoCollection) GetValues() []*HttpTransactionProto
func (*HttpTransactionProtoCollection) GetVersion ¶
func (m *HttpTransactionProtoCollection) GetVersion() int32
func (*HttpTransactionProtoCollection) Marshal ¶
func (m *HttpTransactionProtoCollection) Marshal() (dAtA []byte, err error)
func (*HttpTransactionProtoCollection) MarshalTo ¶
func (m *HttpTransactionProtoCollection) MarshalTo(dAtA []byte) (int, error)
func (*HttpTransactionProtoCollection) MarshalToSizedBuffer ¶
func (m *HttpTransactionProtoCollection) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*HttpTransactionProtoCollection) ProtoMessage ¶
func (*HttpTransactionProtoCollection) ProtoMessage()
func (*HttpTransactionProtoCollection) Reset ¶
func (m *HttpTransactionProtoCollection) Reset()
func (*HttpTransactionProtoCollection) Size ¶
func (m *HttpTransactionProtoCollection) Size() (n int)
func (*HttpTransactionProtoCollection) String ¶
func (m *HttpTransactionProtoCollection) String() string
func (*HttpTransactionProtoCollection) Unmarshal ¶
func (m *HttpTransactionProtoCollection) Unmarshal(dAtA []byte) error
func (*HttpTransactionProtoCollection) XXX_DiscardUnknown ¶
func (m *HttpTransactionProtoCollection) XXX_DiscardUnknown()
func (*HttpTransactionProtoCollection) XXX_Marshal ¶
func (m *HttpTransactionProtoCollection) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*HttpTransactionProtoCollection) XXX_Merge ¶
func (m *HttpTransactionProtoCollection) XXX_Merge(src proto.Message)
func (*HttpTransactionProtoCollection) XXX_Size ¶
func (m *HttpTransactionProtoCollection) XXX_Size() int
func (*HttpTransactionProtoCollection) XXX_Unmarshal ¶
func (m *HttpTransactionProtoCollection) XXX_Unmarshal(b []byte) error
type TransactionPriorityProto ¶
type TransactionPriorityProto int32
const ( TransactionPriorityProto_NORMAL TransactionPriorityProto = 0 TransactionPriorityProto_HIGH TransactionPriorityProto = 1 )
func (TransactionPriorityProto) EnumDescriptor ¶
func (TransactionPriorityProto) EnumDescriptor() ([]byte, []int)
func (TransactionPriorityProto) String ¶
func (x TransactionPriorityProto) String() string
type TransactionPrioritySorter ¶
type TransactionPrioritySorter interface {
Sort([]transaction.Transaction)
}
TransactionPrioritySorter is an interface to sort transactions.
type TransactionRetryQueue ¶
type TransactionRetryQueue struct {
// contains filtered or unexported fields
}
TransactionRetryQueue stores transactions in memory and flush them to disk when the memory limit is exceeded.
func BuildTransactionRetryQueue ¶
func BuildTransactionRetryQueue( maxMemSizeInBytes int, flushToStorageRatio float64, optionalDomainFolderPath string, storageMaxSize int64, dropPrioritySorter TransactionPrioritySorter, domain string, apiKeys []string) *TransactionRetryQueue
BuildTransactionRetryQueue builds a new instance of TransactionRetryQueue
func NewTransactionRetryQueue ¶
func NewTransactionRetryQueue( dropPrioritySorter TransactionPrioritySorter, optionalTransactionSerializer TransactionSerializer, maxMemSizeInBytes int, flushToStorageRatio float64, telemetry TransactionRetryQueueTelemetry) *TransactionRetryQueue
NewTransactionRetryQueue creates a new instance of NewTransactionRetryQueue
func (*TransactionRetryQueue) Add ¶
func (tc *TransactionRetryQueue) Add(t transaction.Transaction) (int, error)
Add adds a new transaction and flush transactions to disk if the memory limit is exceeded. The amount of transactions flushed to disk is control by `flushToStorageRatio` which is the ratio of the transactions to be flushed. Consider the following payload sizes 10, 20, 30, 40, 15 with `maxMemSizeInBytes=100` and `flushToStorageRatio=0.6` When adding the last payload `15`, the buffer becomes full (10+20+30+40+15 > 100) and 100*0.6=60 bytes must be flushed on disk. The first 3 transactions are flushed to the disk as 10 + 20 + 30 >= 60 If disk serialization failed or is not enabled, remove old transactions such as `currentMemSizeInBytes` <= `maxMemSizeInBytes`
func (*TransactionRetryQueue) ExtractTransactions ¶
func (tc *TransactionRetryQueue) ExtractTransactions() ([]transaction.Transaction, error)
ExtractTransactions extracts transactions from the container. If some transactions exist in memory extract them otherwise extract transactions from the disk. No transactions are in memory after calling this method.
func (*TransactionRetryQueue) GetMaxMemSizeInBytes ¶
func (tc *TransactionRetryQueue) GetMaxMemSizeInBytes() int
GetMaxMemSizeInBytes gets the maximum memory usage for storing transactions
func (*TransactionRetryQueue) GetTransactionCount ¶
func (tc *TransactionRetryQueue) GetTransactionCount() int
GetTransactionCount gets the number of transactions in the container
type TransactionRetryQueueTelemetry ¶
type TransactionRetryQueueTelemetry struct {
// contains filtered or unexported fields
}
TransactionRetryQueueTelemetry handles the telemetry for TransactionRetryQueue
func NewTransactionRetryQueueTelemetry ¶
func NewTransactionRetryQueueTelemetry(domainName string) TransactionRetryQueueTelemetry
NewTransactionRetryQueueTelemetry creates a new TransactionRetryQueueTelemetry
type TransactionSerializer ¶
type TransactionSerializer interface { Serialize([]transaction.Transaction) error Deserialize() ([]transaction.Transaction, error) }
TransactionSerializer is an interface to serialize / deserialize transactions