Documentation ¶
Index ¶
- Constants
- Variables
- func ModelToSubscriptionProtoV1(m *Model) *metrov1.Subscription
- func ValidateUpdateSubscriptionRequest(_ context.Context, req *metrov1.UpdateSubscriptionRequest) error
- type Backoff
- type BackoffPolicy
- type Core
- func (c *Core) CreateSubscription(ctx context.Context, m *Model) error
- func (c *Core) DeleteProjectSubscriptions(ctx context.Context, projectID string) error
- func (c *Core) DeleteSubscription(ctx context.Context, m *Model) error
- func (c *Core) Exists(ctx context.Context, key string) (bool, error)
- func (c *Core) Get(ctx context.Context, key string) (*Model, error)
- func (c *Core) GetTopicFromSubscriptionName(ctx context.Context, subscription string) (string, error)
- func (c *Core) List(ctx context.Context, prefix string) ([]*Model, error)
- func (c *Core) ListKeys(ctx context.Context, prefix string) ([]string, error)
- func (c *Core) Migrate(ctx context.Context, names []string) error
- func (c *Core) RescaleSubTopics(ctx context.Context, topicModel *topic.Model) error
- func (c *Core) UpdateSubscription(ctx context.Context, uModel *Model) error
- func (c *Core) UpdateSubscriptionStatus(ctx context.Context, names []string, detached bool) ([]string, error)
- type DeadLetterPolicy
- type ExpirationPolicy
- type Filter
- type ICore
- type IRepo
- type IntervalFinder
- type IntervalFinderParams
- type Model
- func (m *Model) GetBackoff() Backoff
- func (m *Model) GetCredentials() *credentials.Model
- func (m *Model) GetDeadLetterTopic() string
- func (m *Model) GetDelay150secTopic() string
- func (m *Model) GetDelay1800secTopic() string
- func (m *Model) GetDelay300secTopic() string
- func (m *Model) GetDelay30secTopic() string
- func (m *Model) GetDelay3600secTopic() string
- func (m *Model) GetDelay5secTopic() string
- func (m *Model) GetDelay600secTopic() string
- func (m *Model) GetDelay60secTopic() string
- func (m *Model) GetDelayConsumerGroupID(delayTopic string) string
- func (m *Model) GetDelayConsumerGroupInstanceID(subscriberID, delayTopic string) string
- func (m *Model) GetDelayTopics() []string
- func (m *Model) GetDelayTopicsByBackoff() []string
- func (m *Model) GetDelayTopicsMap() map[topic.Interval]string
- func (m *Model) GetFilterExpressionAsStruct() (*Filter, error)
- func (m *Model) GetIntervalFinder() IntervalFinder
- func (m *Model) GetRedactedPushEndpoint() string
- func (m *Model) GetRetryTopic() string
- func (m *Model) GetSubscriptionTopic() string
- func (m *Model) GetSubscriptionType() string
- func (m *Model) GetTopic() string
- func (m *Model) HasCredentials() bool
- func (m *Model) IsFilteringEnabled() bool
- func (m *Model) IsPush() bool
- func (m *Model) Key() string
- func (m *Model) Prefix() string
- func (m *Model) SetFilterExpression(Filter string)
- type PushConfig
- type Repo
- type RetryPolicy
Constants ¶
const ( // Prefix for all subscriptions keys in the registry Prefix = "subscriptions/" // SubscriptionTypePush ... SubscriptionTypePush = "Push" // SubscriptionTypePull ... SubscriptionTypePull = "Pull" )
const ( // MinAckDeadlineSeconds minimum value for ack deadline in seconds MinAckDeadlineSeconds = 10 // MaxAckDeadlineSeconds maximum value for ack deadline in seconds MaxAckDeadlineSeconds = 600 // DefaultAckDeadlineSeconds default value for ack deadline in seconds if not specified or 0 DefaultAckDeadlineSeconds = 10 // MinBackOffSeconds minimum supported backoff for retry MinBackOffSeconds = 0 // MaxBackOffSeconds maximum supported backoff for retry MaxBackOffSeconds = 3600 // DefaultMinBackOffSeconds default minimum backoff for retry DefaultMinBackOffSeconds = 10 // DefaultMaxBackoffSeconds default maximums backoff for retry DefaultMaxBackoffSeconds = 600 // MinDeliveryAttempts minimum delivery attempts before deadlettering MinDeliveryAttempts = 1 // MaxDeliveryAttempts maximum delivery attempts before deadlettering MaxDeliveryAttempts = 100 // DefaultDeliveryAttempts sDefault delivery attempts before deadlettering DefaultDeliveryAttempts = 5 // Timeout to check the push endpoint is reachable or not URLValidationTimeout = 1 * time.Second )
Config validations related values
const DefaultBackoffExponential = 2
DefaultBackoffExponential is the default exponential value for backoff
Variables ¶
var ( // ErrInvalidSubscriptionName ... ErrInvalidSubscriptionName = errors.New("subscription name cannot start with goog") // ErrInvalidTopicName ... ErrInvalidTopicName = errors.New("subscription topic name cannot ends with " + topic.RetryTopicSuffix) // ErrInvalidMinBackoff ... ErrInvalidMinBackoff = errors.New(fmt.Sprintf("min backoff should be greater than %v seconds", MinBackOffSeconds)) // ErrInvalidMaxBackoff ... ErrInvalidMaxBackoff = errors.New(fmt.Sprintf("max backoff should be less than %v seconds", MaxBackOffSeconds)) // ErrInvalidMinAndMaxBackoff ... ErrInvalidMinAndMaxBackoff = errors.New("min backoff should be less or equal to max backoff") // ErrInvalidMaxDeliveryAttempt ... ErrInvalidMaxDeliveryAttempt = errors.New(fmt.Sprintf("max delivery attempt should be between %v and %v", MinDeliveryAttempts, MaxDeliveryAttempts)) // ErrInvalidAckDeadline ... ErrInvalidAckDeadline = errors.New(fmt.Sprintf("ack deadline should be between %v and %v", MinAckDeadlineSeconds, MaxAckDeadlineSeconds)) // ErrInvalidPushEndpointURL ... ErrInvalidPushEndpointURL = errors.New("invalid push_endpoint url") // ErrInvalidPushEndpointUsername ... ErrInvalidPushEndpointUsername = errors.New("invalid push_endpoint username attribute") // ErrInvalidPushEndpointPassword ... ErrInvalidPushEndpointPassword = errors.New("invalid push_endpoint password attribute") // ErrPushEndpointNotReachable ... ErrPushEndpointNotReachable = errors.New("push_endpoint not reachable") )
Functions ¶
func ModelToSubscriptionProtoV1 ¶
func ModelToSubscriptionProtoV1(m *Model) *metrov1.Subscription
ModelToSubscriptionProtoV1 - Convert internal model into protov1 model
func ValidateUpdateSubscriptionRequest ¶
func ValidateUpdateSubscriptionRequest(_ context.Context, req *metrov1.UpdateSubscriptionRequest) error
ValidateUpdateSubscriptionRequest - Validates the update subscription request
Types ¶
type Backoff ¶
type Backoff interface {
Next(BackoffPolicy) float64
}
Backoff defines the backoff calculation logic
func NewExponentialWindowBackoff ¶
func NewExponentialWindowBackoff() Backoff
NewExponentialWindowBackoff return a backoff policy that that grows exponentially.
func NewFixedWindowBackoff ¶
func NewFixedWindowBackoff() Backoff
NewFixedWindowBackoff return a backoff policy that always returns the same backoff delay.
type BackoffPolicy ¶
type BackoffPolicy struct {
// contains filtered or unexported fields
}
BackoffPolicy is a backoff policy for retrying an operation.
func NewBackoffPolicy ¶
func NewBackoffPolicy(startInterval, lastInterval, count, exponential float64) BackoffPolicy
NewBackoffPolicy returns a backoff policy instance.
type Core ¶
type Core struct {
// contains filtered or unexported fields
}
Core implements all business logic for a subscription
func (*Core) CreateSubscription ¶
CreateSubscription creates a subscription for a given topic
func (*Core) DeleteProjectSubscriptions ¶
DeleteProjectSubscriptions deletes all subscriptions for the given projectID
func (*Core) DeleteSubscription ¶
DeleteSubscription deletes a subscription
func (*Core) GetTopicFromSubscriptionName ¶
func (c *Core) GetTopicFromSubscriptionName(ctx context.Context, subscription string) (string, error)
GetTopicFromSubscriptionName returns topic from subscription
func (*Core) Migrate ¶
Migrate takes care of backfilling subscription topics for existing subscriptions. This is an idempotent operation that creates topics for each subscription. Migrate can be modified in the future for other use-cases as well.
func (*Core) RescaleSubTopics ¶
RescaleSubTopics - Get all the subs and rescale all the Retry/Delay/DLQ topics
func (*Core) UpdateSubscription ¶
UpdateSubscription - Updates a given subscription
type DeadLetterPolicy ¶
type DeadLetterPolicy struct { // DeadLetterTopic keeps the topic name used for dead lettering, this will be created with subscription and // will be visible to subscriber, subscriber can create subscription over this topic to read messages from this DeadLetterTopic string `json:"dead_letter_topic,omitempty"` MaxDeliveryAttempts int32 `json:"max_delivery_attempts,omitempty"` }
DeadLetterPolicy defines the dead letter policy
type ExpirationPolicy ¶
type ExpirationPolicy struct {
TTL uint `json:"ttl,omitempty"`
}
ExpirationPolicy defines the expiration policy
type ICore ¶
type ICore interface { CreateSubscription(ctx context.Context, subscription *Model) error UpdateSubscription(ctx context.Context, uModel *Model) error Exists(ctx context.Context, key string) (bool, error) DeleteSubscription(ctx context.Context, m *Model) error DeleteProjectSubscriptions(ctx context.Context, projectID string) error GetTopicFromSubscriptionName(ctx context.Context, subscription string) (string, error) ListKeys(ctx context.Context, prefix string) ([]string, error) List(ctx context.Context, prefix string) ([]*Model, error) Get(ctx context.Context, key string) (*Model, error) Migrate(ctx context.Context, names []string) error RescaleSubTopics(ctx context.Context, topicModel *topic.Model) error UpdateSubscriptionStatus(ctx context.Context, names []string, detached bool) ([]string, error) }
ICore is an interface over subscription core
func NewCore ¶
func NewCore(repo IRepo, projectCore project.ICore, topicCore topic.ICore, brokerStore brokerstore.IBrokerStore) ICore
NewCore returns an instance of Core
type IRepo ¶
type IRepo interface { common.IRepo List(ctx context.Context, prefix string) ([]common.IModel, error) }
IRepo interface over database repository
type IntervalFinder ¶
type IntervalFinder interface {
Next(IntervalFinderParams) topic.Interval
}
IntervalFinder defines the next interval identification logic
func NewClosestIntervalWithCeil ¶
func NewClosestIntervalWithCeil() IntervalFinder
NewClosestIntervalWithCeil returns the closest interval window finder
type IntervalFinderParams ¶
type IntervalFinderParams struct {
// contains filtered or unexported fields
}
IntervalFinderParams defines the constraints to be used to identify the next interval
func NewIntervalFinderParams ¶
func NewIntervalFinderParams(min, max uint, delayInterval float64, intervals []topic.Interval) IntervalFinderParams
NewIntervalFinderParams returns a instance of next interval finder
type Model ¶
type Model struct { common.BaseModel Name string `json:"name,omitempty"` Topic string `json:"topic,omitempty"` PushConfig *PushConfig `json:"push_config,omitempty"` AckDeadlineSeconds int32 `json:"ack_deadline_seconds,omitempty"` RetainAckedMessages bool `json:"retain_acked_messages,omitempty"` MessageRetentionDuration uint `json:"message_retention_duration,omitempty"` Labels map[string]string `json:"labels,omitempty"` EnableMessageOrdering bool `json:"enable_message_ordering,omitempty"` ExpirationPolicy *ExpirationPolicy `json:"expiration_policy,omitempty"` // use SetFilterExpression function for setting FilterExpression field FilterExpression string `json:"filter,omitempty"` RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"` DeadLetterPolicy *DeadLetterPolicy `json:"dead_letter_policy,omitempty"` Detached bool `json:"detached,omitempty"` ExtractedTopicProjectID string `json:"extracted_topic_project_id"` ExtractedSubscriptionProjectID string `json:"extracted_subscription_project_id"` ExtractedTopicName string `json:"extracted_topic_name"` ExtractedSubscriptionName string `json:"extracted_subscription_name"` // contains filtered or unexported fields }
Model for a subscription
func GetValidatedModelForCreate ¶
GetValidatedModelForCreate validates an incoming proto request and returns the model for create requests
func GetValidatedModelForDelete ¶
GetValidatedModelForDelete validates an incoming proto request and returns the model for delete requests
func GetValidatedModelForUpdate ¶
GetValidatedModelForUpdate - validates the subscription model for update operation and returns the parsed model
func (*Model) GetCredentials ¶
func (m *Model) GetCredentials() *credentials.Model
GetCredentials returns the credentials for the push endpoint
func (*Model) GetDeadLetterTopic ¶
GetDeadLetterTopic returns the topic used for dead lettering for subscription
func (*Model) GetDelay150secTopic ¶
GetDelay150secTopic returns the formatted delay topic name for 150sec interval
func (*Model) GetDelay1800secTopic ¶
GetDelay1800secTopic returns the formatted delay topic name for 1800sec interval
func (*Model) GetDelay300secTopic ¶
GetDelay300secTopic returns the formatted delay topic name for 300sec interval
func (*Model) GetDelay30secTopic ¶
GetDelay30secTopic returns the formatted delay topic name for 30sec interval
func (*Model) GetDelay3600secTopic ¶
GetDelay3600secTopic returns the formatted delay topic name for 3600sec interval
func (*Model) GetDelay5secTopic ¶
GetDelay5secTopic returns the formatted delay topic name for 5sec interval
func (*Model) GetDelay600secTopic ¶
GetDelay600secTopic returns the formatted delay topic name for 600sec interval
func (*Model) GetDelay60secTopic ¶
GetDelay60secTopic returns the formatted delay topic name for 60sec interval
func (*Model) GetDelayConsumerGroupID ¶
GetDelayConsumerGroupID returns the consumer group ID to be used by the delay consumers
func (*Model) GetDelayConsumerGroupInstanceID ¶
GetDelayConsumerGroupInstanceID returns the consumer group ID to be used by the specific delay consumer
func (*Model) GetDelayTopics ¶
GetDelayTopics returns the all the delay topic names for this subscription
func (*Model) GetDelayTopicsByBackoff ¶
GetDelayTopicsByBackoff returns delay topic based on retry policy
func (*Model) GetDelayTopicsMap ¶
GetDelayTopicsMap returns the all the delay topic names to its interval map
func (*Model) GetFilterExpressionAsStruct ¶
GetFilterExpressionAsStruct parses and returns the filter expression into GO Struct
func (*Model) GetIntervalFinder ¶
func (m *Model) GetIntervalFinder() IntervalFinder
GetIntervalFinder returns the interval window finder
func (*Model) GetRedactedPushEndpoint ¶
GetRedactedPushEndpoint returns the push endpoint but replaces any password with "xxxxx".
func (*Model) GetRetryTopic ¶
GetRetryTopic returns the topic used for subscription retries
func (*Model) GetSubscriptionTopic ¶
GetSubscriptionTopic returns the topic used for subscription fanout topic
func (*Model) GetSubscriptionType ¶
GetSubscriptionType returns subscription type i.e. Push or Pull
func (*Model) HasCredentials ¶
HasCredentials returns true if a subscription has credentials for push endpoint
func (*Model) IsFilteringEnabled ¶
IsFilteringEnabled checks if the subscription has filter criteria or not
func (*Model) SetFilterExpression ¶
SetFilterExpression sets filter expression to the new input. It also sets filter struct to nil. It will be set greedily whenever required
type PushConfig ¶
type PushConfig struct { PushEndpoint string `json:"push_endpoint,omitempty"` Attributes map[string]string `json:"attributes,omitempty"` Credentials *credentials.Model `json:"credentials,omitempty"` }
PushConfig defines the push endpoint
type RetryPolicy ¶
type RetryPolicy struct { MinimumBackoff uint `json:"minimum_backoff,omitempty"` MaximumBackoff uint `json:"maximum_backoff,omitempty"` }
RetryPolicy defines the retry policy