subscription

package
v0.0.0-...-58c0a64 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2024 License: MIT Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Prefix for all subscriptions keys in the registry
	Prefix = "subscriptions/"

	// SubscriptionTypePush ...
	SubscriptionTypePush = "Push"

	// SubscriptionTypePull ...
	SubscriptionTypePull = "Pull"
)
View Source
const (
	// MinAckDeadlineSeconds minimum value for ack deadline in seconds
	MinAckDeadlineSeconds = 10
	// MaxAckDeadlineSeconds maximum value for ack deadline in seconds
	MaxAckDeadlineSeconds = 600
	// DefaultAckDeadlineSeconds default value for ack deadline in seconds if not specified or 0
	DefaultAckDeadlineSeconds = 10

	// MinBackOffSeconds minimum supported backoff for retry
	MinBackOffSeconds = 0
	// MaxBackOffSeconds maximum supported backoff for retry
	MaxBackOffSeconds = 3600
	// DefaultMinBackOffSeconds default minimum backoff for retry
	DefaultMinBackOffSeconds = 10
	// DefaultMaxBackoffSeconds default maximums backoff for retry
	DefaultMaxBackoffSeconds = 600

	// MinDeliveryAttempts minimum delivery attempts before deadlettering
	MinDeliveryAttempts = 1
	// MaxDeliveryAttempts maximum delivery attempts before deadlettering
	MaxDeliveryAttempts = 100
	// DefaultDeliveryAttempts sDefault delivery attempts before deadlettering
	DefaultDeliveryAttempts = 5
	// Timeout to check the push endpoint is reachable or not
	URLValidationTimeout = 1 * time.Second
)

Config validations related values

View Source
const DefaultBackoffExponential = 2

DefaultBackoffExponential is the default exponential value for backoff

Variables

View Source
var (
	// ErrInvalidSubscriptionName ...
	ErrInvalidSubscriptionName = errors.New("subscription name cannot start with goog")

	// ErrInvalidTopicName ...
	ErrInvalidTopicName = errors.New("subscription topic name cannot ends with " + topic.RetryTopicSuffix)

	// ErrInvalidMinBackoff ...
	ErrInvalidMinBackoff = errors.New(fmt.Sprintf("min backoff should be greater than %v seconds", MinBackOffSeconds))

	// ErrInvalidMaxBackoff ...
	ErrInvalidMaxBackoff = errors.New(fmt.Sprintf("max backoff should be less than %v seconds", MaxBackOffSeconds))

	// ErrInvalidMinAndMaxBackoff ...
	ErrInvalidMinAndMaxBackoff = errors.New("min backoff should be less or equal to max backoff")

	// ErrInvalidMaxDeliveryAttempt ...
	ErrInvalidMaxDeliveryAttempt = errors.New(fmt.Sprintf("max delivery attempt should be between %v and %v", MinDeliveryAttempts, MaxDeliveryAttempts))

	// ErrInvalidAckDeadline ...
	ErrInvalidAckDeadline = errors.New(fmt.Sprintf("ack deadline should be between %v and %v", MinAckDeadlineSeconds, MaxAckDeadlineSeconds))

	// ErrInvalidPushEndpointURL ...
	ErrInvalidPushEndpointURL = errors.New("invalid push_endpoint url")

	// ErrInvalidPushEndpointUsername ...
	ErrInvalidPushEndpointUsername = errors.New("invalid push_endpoint username attribute")

	// ErrInvalidPushEndpointPassword ...
	ErrInvalidPushEndpointPassword = errors.New("invalid push_endpoint password attribute")

	// ErrPushEndpointNotReachable ...
	ErrPushEndpointNotReachable = errors.New("push_endpoint not reachable")
)

Functions

func ModelToSubscriptionProtoV1

func ModelToSubscriptionProtoV1(m *Model) *metrov1.Subscription

ModelToSubscriptionProtoV1 - Convert internal model into protov1 model

func ValidateUpdateSubscriptionRequest

func ValidateUpdateSubscriptionRequest(_ context.Context, req *metrov1.UpdateSubscriptionRequest) error

ValidateUpdateSubscriptionRequest - Validates the update subscription request

Types

type Backoff

type Backoff interface {
	Next(BackoffPolicy) float64
}

Backoff defines the backoff calculation logic

func NewExponentialWindowBackoff

func NewExponentialWindowBackoff() Backoff

NewExponentialWindowBackoff return a backoff policy that that grows exponentially.

func NewFixedWindowBackoff

func NewFixedWindowBackoff() Backoff

NewFixedWindowBackoff return a backoff policy that always returns the same backoff delay.

type BackoffPolicy

type BackoffPolicy struct {
	// contains filtered or unexported fields
}

BackoffPolicy is a backoff policy for retrying an operation.

func NewBackoffPolicy

func NewBackoffPolicy(startInterval, lastInterval, count, exponential float64) BackoffPolicy

NewBackoffPolicy returns a backoff policy instance.

type Core

type Core struct {
	// contains filtered or unexported fields
}

Core implements all business logic for a subscription

func (*Core) CreateSubscription

func (c *Core) CreateSubscription(ctx context.Context, m *Model) error

CreateSubscription creates a subscription for a given topic

func (*Core) DeleteProjectSubscriptions

func (c *Core) DeleteProjectSubscriptions(ctx context.Context, projectID string) error

DeleteProjectSubscriptions deletes all subscriptions for the given projectID

func (*Core) DeleteSubscription

func (c *Core) DeleteSubscription(ctx context.Context, m *Model) error

DeleteSubscription deletes a subscription

func (*Core) Exists

func (c *Core) Exists(ctx context.Context, key string) (bool, error)

Exists checks if subscription exists for a given key

func (*Core) Get

func (c *Core) Get(ctx context.Context, key string) (*Model, error)

Get returns subscription with the given key

func (*Core) GetTopicFromSubscriptionName

func (c *Core) GetTopicFromSubscriptionName(ctx context.Context, subscription string) (string, error)

GetTopicFromSubscriptionName returns topic from subscription

func (*Core) List

func (c *Core) List(ctx context.Context, prefix string) ([]*Model, error)

List gets slice of subscriptions starting with given prefix

func (*Core) ListKeys

func (c *Core) ListKeys(ctx context.Context, prefix string) ([]string, error)

ListKeys gets all subscription keys

func (*Core) Migrate

func (c *Core) Migrate(ctx context.Context, names []string) error

Migrate takes care of backfilling subscription topics for existing subscriptions. This is an idempotent operation that creates topics for each subscription. Migrate can be modified in the future for other use-cases as well.

func (*Core) RescaleSubTopics

func (c *Core) RescaleSubTopics(ctx context.Context, topicModel *topic.Model) error

RescaleSubTopics - Get all the subs and rescale all the Retry/Delay/DLQ topics

func (*Core) UpdateSubscription

func (c *Core) UpdateSubscription(ctx context.Context, uModel *Model) error

UpdateSubscription - Updates a given subscription

func (*Core) UpdateSubscriptionStatus

func (c *Core) UpdateSubscriptionStatus(ctx context.Context, names []string, detached bool) ([]string, error)

UpdateSubscriptionStatus updates subscription status

type DeadLetterPolicy

type DeadLetterPolicy struct {
	// DeadLetterTopic keeps the topic name used for dead lettering, this will be created with subscription and
	// will be visible to subscriber, subscriber can create subscription over this topic to read messages from this
	DeadLetterTopic     string `json:"dead_letter_topic,omitempty"`
	MaxDeliveryAttempts int32  `json:"max_delivery_attempts,omitempty"`
}

DeadLetterPolicy defines the dead letter policy

type ExpirationPolicy

type ExpirationPolicy struct {
	TTL uint `json:"ttl,omitempty"`
}

ExpirationPolicy defines the expiration policy

type Filter

type Filter = filter.Condition

Filter defines the Filter criteria for messages

type ICore

type ICore interface {
	CreateSubscription(ctx context.Context, subscription *Model) error
	UpdateSubscription(ctx context.Context, uModel *Model) error
	Exists(ctx context.Context, key string) (bool, error)
	DeleteSubscription(ctx context.Context, m *Model) error
	DeleteProjectSubscriptions(ctx context.Context, projectID string) error
	GetTopicFromSubscriptionName(ctx context.Context, subscription string) (string, error)
	ListKeys(ctx context.Context, prefix string) ([]string, error)
	List(ctx context.Context, prefix string) ([]*Model, error)
	Get(ctx context.Context, key string) (*Model, error)
	Migrate(ctx context.Context, names []string) error
	RescaleSubTopics(ctx context.Context, topicModel *topic.Model) error
	UpdateSubscriptionStatus(ctx context.Context, names []string, detached bool) ([]string, error)
}

ICore is an interface over subscription core

func NewCore

func NewCore(repo IRepo, projectCore project.ICore, topicCore topic.ICore, brokerStore brokerstore.IBrokerStore) ICore

NewCore returns an instance of Core

type IRepo

type IRepo interface {
	common.IRepo
	List(ctx context.Context, prefix string) ([]common.IModel, error)
}

IRepo interface over database repository

func NewRepo

func NewRepo(registry registry.IRegistry) IRepo

NewRepo returns IRepo

type IntervalFinder

type IntervalFinder interface {
	Next(IntervalFinderParams) topic.Interval
}

IntervalFinder defines the next interval identification logic

func NewClosestIntervalWithCeil

func NewClosestIntervalWithCeil() IntervalFinder

NewClosestIntervalWithCeil returns the closest interval window finder

type IntervalFinderParams

type IntervalFinderParams struct {
	// contains filtered or unexported fields
}

IntervalFinderParams defines the constraints to be used to identify the next interval

func NewIntervalFinderParams

func NewIntervalFinderParams(min, max uint, delayInterval float64, intervals []topic.Interval) IntervalFinderParams

NewIntervalFinderParams returns a instance of next interval finder

type Model

type Model struct {
	common.BaseModel
	Name                     string            `json:"name,omitempty"`
	Topic                    string            `json:"topic,omitempty"`
	PushConfig               *PushConfig       `json:"push_config,omitempty"`
	AckDeadlineSeconds       int32             `json:"ack_deadline_seconds,omitempty"`
	RetainAckedMessages      bool              `json:"retain_acked_messages,omitempty"`
	MessageRetentionDuration uint              `json:"message_retention_duration,omitempty"`
	Labels                   map[string]string `json:"labels,omitempty"`
	EnableMessageOrdering    bool              `json:"enable_message_ordering,omitempty"`
	ExpirationPolicy         *ExpirationPolicy `json:"expiration_policy,omitempty"`
	// use SetFilterExpression function for setting FilterExpression field
	FilterExpression string `json:"filter,omitempty"`

	RetryPolicy                    *RetryPolicy      `json:"retry_policy,omitempty"`
	DeadLetterPolicy               *DeadLetterPolicy `json:"dead_letter_policy,omitempty"`
	Detached                       bool              `json:"detached,omitempty"`
	ExtractedTopicProjectID        string            `json:"extracted_topic_project_id"`
	ExtractedSubscriptionProjectID string            `json:"extracted_subscription_project_id"`
	ExtractedTopicName             string            `json:"extracted_topic_name"`
	ExtractedSubscriptionName      string            `json:"extracted_subscription_name"`
	// contains filtered or unexported fields
}

Model for a subscription

func GetValidatedModelForCreate

func GetValidatedModelForCreate(ctx context.Context, req *metrov1.Subscription) (*Model, error)

GetValidatedModelForCreate validates an incoming proto request and returns the model for create requests

func GetValidatedModelForDelete

func GetValidatedModelForDelete(ctx context.Context, req *metrov1.Subscription) (*Model, error)

GetValidatedModelForDelete validates an incoming proto request and returns the model for delete requests

func GetValidatedModelForUpdate

func GetValidatedModelForUpdate(ctx context.Context, req *metrov1.Subscription) (*Model, error)

GetValidatedModelForUpdate - validates the subscription model for update operation and returns the parsed model

func (*Model) GetBackoff

func (m *Model) GetBackoff() Backoff

GetBackoff returns backoff policy

func (*Model) GetCredentials

func (m *Model) GetCredentials() *credentials.Model

GetCredentials returns the credentials for the push endpoint

func (*Model) GetDeadLetterTopic

func (m *Model) GetDeadLetterTopic() string

GetDeadLetterTopic returns the topic used for dead lettering for subscription

func (*Model) GetDelay150secTopic

func (m *Model) GetDelay150secTopic() string

GetDelay150secTopic returns the formatted delay topic name for 150sec interval

func (*Model) GetDelay1800secTopic

func (m *Model) GetDelay1800secTopic() string

GetDelay1800secTopic returns the formatted delay topic name for 1800sec interval

func (*Model) GetDelay300secTopic

func (m *Model) GetDelay300secTopic() string

GetDelay300secTopic returns the formatted delay topic name for 300sec interval

func (*Model) GetDelay30secTopic

func (m *Model) GetDelay30secTopic() string

GetDelay30secTopic returns the formatted delay topic name for 30sec interval

func (*Model) GetDelay3600secTopic

func (m *Model) GetDelay3600secTopic() string

GetDelay3600secTopic returns the formatted delay topic name for 3600sec interval

func (*Model) GetDelay5secTopic

func (m *Model) GetDelay5secTopic() string

GetDelay5secTopic returns the formatted delay topic name for 5sec interval

func (*Model) GetDelay600secTopic

func (m *Model) GetDelay600secTopic() string

GetDelay600secTopic returns the formatted delay topic name for 600sec interval

func (*Model) GetDelay60secTopic

func (m *Model) GetDelay60secTopic() string

GetDelay60secTopic returns the formatted delay topic name for 60sec interval

func (*Model) GetDelayConsumerGroupID

func (m *Model) GetDelayConsumerGroupID(delayTopic string) string

GetDelayConsumerGroupID returns the consumer group ID to be used by the delay consumers

func (*Model) GetDelayConsumerGroupInstanceID

func (m *Model) GetDelayConsumerGroupInstanceID(subscriberID, delayTopic string) string

GetDelayConsumerGroupInstanceID returns the consumer group ID to be used by the specific delay consumer

func (*Model) GetDelayTopics

func (m *Model) GetDelayTopics() []string

GetDelayTopics returns the all the delay topic names for this subscription

func (*Model) GetDelayTopicsByBackoff

func (m *Model) GetDelayTopicsByBackoff() []string

GetDelayTopicsByBackoff returns delay topic based on retry policy

func (*Model) GetDelayTopicsMap

func (m *Model) GetDelayTopicsMap() map[topic.Interval]string

GetDelayTopicsMap returns the all the delay topic names to its interval map

func (*Model) GetFilterExpressionAsStruct

func (m *Model) GetFilterExpressionAsStruct() (*Filter, error)

GetFilterExpressionAsStruct parses and returns the filter expression into GO Struct

func (*Model) GetIntervalFinder

func (m *Model) GetIntervalFinder() IntervalFinder

GetIntervalFinder returns the interval window finder

func (*Model) GetRedactedPushEndpoint

func (m *Model) GetRedactedPushEndpoint() string

GetRedactedPushEndpoint returns the push endpoint but replaces any password with "xxxxx".

func (*Model) GetRetryTopic

func (m *Model) GetRetryTopic() string

GetRetryTopic returns the topic used for subscription retries

func (*Model) GetSubscriptionTopic

func (m *Model) GetSubscriptionTopic() string

GetSubscriptionTopic returns the topic used for subscription fanout topic

func (*Model) GetSubscriptionType

func (m *Model) GetSubscriptionType() string

GetSubscriptionType returns subscription type i.e. Push or Pull

func (*Model) GetTopic

func (m *Model) GetTopic() string

GetTopic returns the primary subscription topic

func (*Model) HasCredentials

func (m *Model) HasCredentials() bool

HasCredentials returns true if a subscription has credentials for push endpoint

func (*Model) IsFilteringEnabled

func (m *Model) IsFilteringEnabled() bool

IsFilteringEnabled checks if the subscription has filter criteria or not

func (*Model) IsPush

func (m *Model) IsPush() bool

IsPush returns true if a subscription is a push subscription

func (*Model) Key

func (m *Model) Key() string

Key returns the Key for storing subscriptions in the registry

func (*Model) Prefix

func (m *Model) Prefix() string

Prefix returns the Key prefix

func (*Model) SetFilterExpression

func (m *Model) SetFilterExpression(Filter string)

SetFilterExpression sets filter expression to the new input. It also sets filter struct to nil. It will be set greedily whenever required

type PushConfig

type PushConfig struct {
	PushEndpoint string             `json:"push_endpoint,omitempty"`
	Attributes   map[string]string  `json:"attributes,omitempty"`
	Credentials  *credentials.Model `json:"credentials,omitempty"`
}

PushConfig defines the push endpoint

type Repo

type Repo struct {
	common.BaseRepo
}

Repo implements various repository methods

func (*Repo) List

func (r *Repo) List(ctx context.Context, prefix string) ([]common.IModel, error)

List returns a slice of subscriptions matching prefix

type RetryPolicy

type RetryPolicy struct {
	MinimumBackoff uint `json:"minimum_backoff,omitempty"`
	MaximumBackoff uint `json:"maximum_backoff,omitempty"`
}

RetryPolicy defines the retry policy

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL