Documentation
¶
Index ¶
- type BucketCounter
- type EventsCache
- func (ec *EventsCache) Close() error
- func (ec *EventsCache) Error(cacheDisabled bool, destinationID, originEvent string, errMsg string)
- func (ec *EventsCache) Get(namespace, id, status string, limit int) ([]meta.Event, uint64)
- func (ec *EventsCache) GetCacheCapacityAndIntervalWindow() (int, int)
- func (ec *EventsCache) GetTotal(namespace, id, status string) int
- func (ec *EventsCache) RawErrorEvent(disabled bool, tokenID string, serializedMalformedPayload []byte, err error)
- func (ec *EventsCache) RawEvent(disabled bool, tokenID string, serializedPayload []byte, skipMsg string)
- func (ec *EventsCache) Skip(cacheDisabled bool, destinationID, originEvent string, errMsg string)
- func (ec *EventsCache) Succeed(eventContext *adapters.EventContext)
- type RateLimiter
- type RefillableRateLimiter
- type SucceedDBEvent
- type SucceedHTTPEvent
- type SucceedSynchronousEvent
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BucketCounter ¶
type BucketCounter struct {
// contains filtered or unexported fields
}
BucketCounter is a counter of a certain second
func (*BucketCounter) Get ¶
func (bc *BucketCounter) Get(now time.Time) uint64
Get returns current value if it is a current minute or if secondsID in the last minute interval
func (*BucketCounter) Increment ¶
func (bc *BucketCounter) Increment(now time.Time)
Increment increments value if now.Minute is equal current minute clean counter if the current minute has been changed
type EventsCache ¶
type EventsCache struct {
// contains filtered or unexported fields
}
EventsCache is an event cache based on meta.Storage(Redis)
func NewEventsCache ¶
func NewEventsCache(enabled bool, storage meta.Storage, capacityPerTokenOrDestination, poolSize, trimIntervalMs, timeWindowSeconds int) *EventsCache
NewEventsCache returns EventsCache and start goroutine for async operations
func (*EventsCache) Close ¶
func (ec *EventsCache) Close() error
Close stops all underlying goroutines
func (*EventsCache) Error ¶
func (ec *EventsCache) Error(cacheDisabled bool, destinationID, originEvent string, errMsg string)
Error puts value into channel which will be read and updated in storage
func (*EventsCache) Get ¶
Get returns 1. last [token, destination] namespace raw JSON events with limit 2. amount of rate limited events
func (*EventsCache) GetCacheCapacityAndIntervalWindow ¶
func (ec *EventsCache) GetCacheCapacityAndIntervalWindow() (int, int)
GetCacheCapacityAndIntervalWindow returns cache capacity and window interval seconds
func (*EventsCache) GetTotal ¶
func (ec *EventsCache) GetTotal(namespace, id, status string) int
GetTotal returns total amount of destination events in storage
func (*EventsCache) RawErrorEvent ¶
func (ec *EventsCache) RawErrorEvent(disabled bool, tokenID string, serializedMalformedPayload []byte, err error)
RawErrorEvent puts value into channel which will be read and written to storage
func (*EventsCache) RawEvent ¶
func (ec *EventsCache) RawEvent(disabled bool, tokenID string, serializedPayload []byte, skipMsg string)
RawEvent puts value into channel which will be read and written to storage
func (*EventsCache) Skip ¶
func (ec *EventsCache) Skip(cacheDisabled bool, destinationID, originEvent string, errMsg string)
Skip puts value into channel which will be read and updated in storage
func (*EventsCache) Succeed ¶
func (ec *EventsCache) Succeed(eventContext *adapters.EventContext)
Succeed puts value into channel which will be read and updated in storage
type RateLimiter ¶
func NewRefillableRateLimiter ¶
func NewRefillableRateLimiter(capacity uint64, timeWindow time.Duration) RateLimiter
type RefillableRateLimiter ¶
type RefillableRateLimiter struct {
// contains filtered or unexported fields
}
RefillableRateLimiter is a refillable buckets RateLimiter which has capacity and current available value. refills (increment by 1) available value every = timeWindow / capacity. e.g. capacity = 100, timeWindow = 60 seconds, so every 100/60=1.66 second available value will be incremented by 1 refill happens on Allow() call
func (*RefillableRateLimiter) Allow ¶
func (brl *RefillableRateLimiter) Allow() bool
Allow checks if available > 0 then just decrement and returns true if available == 0 checks how much we can refill based on last refill time or return false and counts limited
func (*RefillableRateLimiter) GetLastMinuteLimited ¶
func (brl *RefillableRateLimiter) GetLastMinuteLimited() uint64
GetLastMinuteLimited returns quantity of limits in the last minute
type SucceedDBEvent ¶
type SucceedDBEvent struct { DestinationID string `json:"destination_id,omitempty"` Table string `json:"table,omitempty"` Record []*adapters.TableField `json:"record,omitempty"` }
SucceedDBEvent is an entity for cached events response for databases events
type SucceedHTTPEvent ¶
type SucceedHTTPEvent struct { DestinationID string `json:"destination_id,omitempty"` URL string `json:"url,omitempty"` Method string `json:"method,omitempty"` Headers map[string]string `json:"headers,omitempty"` Body string `json:"body,omitempty"` }
SucceedHTTPEvent is an entity for cached events response for HTTP events