Documentation
¶
Index ¶
- Constants
- Variables
- func ExtractTopicMetaAndValidate(ctx context.Context, name string) (projectID string, topicName string, err error)
- func ExtractTopicMetaAndValidateForCreate(ctx context.Context, name string) (string, string, error)
- func GetTopicName(projectID string, name string) string
- func GetTopicNameOnly(topicName string) string
- func IsDLQTopic(topicName string) bool
- func IsRetentionPolicyUnchanged(existing, required map[string]string) bool
- type Core
- func (c *Core) CreateDeadLetterTopic(ctx context.Context, model *Model) error
- func (c *Core) CreateRetryTopic(ctx context.Context, model *Model) error
- func (c *Core) CreateSubscriptionTopic(ctx context.Context, model *Model) error
- func (c *Core) CreateTopic(ctx context.Context, m *Model) error
- func (c *Core) DeleteProjectTopics(ctx context.Context, projectID string) error
- func (c *Core) DeleteTopic(ctx context.Context, m *Model) error
- func (c *Core) Exists(ctx context.Context, key string) (bool, error)
- func (c *Core) ExistsWithName(ctx context.Context, name string) (bool, error)
- func (c *Core) Get(ctx context.Context, key string) (*Model, error)
- func (c *Core) List(ctx context.Context, prefix string) ([]*Model, error)
- func (c *Core) SetupTopicRetentionConfigs(ctx context.Context, names []string) ([]string, error)
- func (c *Core) UpdateTopic(ctx context.Context, m *Model) error
- type ICore
- type IRepo
- type Interval
- type Model
- func (m *Model) GetRetentionConfig() map[string]string
- func (m *Model) IsDeadLetterTopic() bool
- func (m *Model) IsDelayTopic() bool
- func (m *Model) IsPrimaryTopic() bool
- func (m *Model) IsRetryTopic() bool
- func (m *Model) IsSubscriptionInternalTopic() bool
- func (m *Model) Key() string
- func (m *Model) Prefix() string
- type Repo
Constants ¶
const ( // Prefix for all topic keys in the registry Prefix = "topics/" // RetryTopicSuffix every primary topic subscription will have a retry topic with this suffix as well RetryTopicSuffix = "-retry" // DeadLetterTopicSuffix every primary topic subscription will have a dlq topic with this suffix as well DeadLetterTopicSuffix = "-dlq" // SubscriptionSuffix is the suffix to be appended to the subscription specific topic SubscriptionSuffix = "-subscription-internal" // DefaultNumPartitions default no of partitions for a topic DefaultNumPartitions = 1 // MaxNumPartitions max number of partitions for a topic MaxNumPartitions = 100 // RetentionPeriodConfig is the topic level retention period config RetentionPeriodConfig = "retention.ms" // RetentionPeriod is the time after which messages will be deleted from the topic = 14 days RetentionPeriod = 1000 * 60 * 60 * 24 * 14 // RetentionSizeConfig is the partition retention size config RetentionSizeConfig = "retention.bytes" // RetentionSizePerPartition is the max no of bytes retained per partition = 10000MB RetentionSizePerPartition = 10000 * 1000000 )
const DelayConsumerGroupIDFormat = "%v-cg"
DelayConsumerGroupIDFormat ... -> subs.delay.30.seconds-cg
const DelayConsumerGroupInstanceIDFormat = "%v-%v"
DelayConsumerGroupInstanceIDFormat ... -> delayTopicName-subscriberID
const DelayTopicNameFormat = "%v.delay.%v.seconds"
DelayTopicNameFormat ... -> subs-delay-30-seconds, subs-delay-60-seconds ... subs-delay-600-seconds
const DelayTopicSuffix = "delay.%v.seconds"
DelayTopicSuffix ... -> delay-30-seconds, delay-60-seconds ... delay-600-seconds
const DelayTopicWithProjectNameFormat = "projects/%v/topics/%v.delay.%v.seconds"
DelayTopicWithProjectNameFormat ... -> projects/p1/topics/subs.delay.30.seconds
const (
// TopicNameFormat for public topic name "projects/{projectID}/topics/{topicName}
TopicNameFormat = "projects/%s/topics/%s"
)
Variables ¶
var ( // MinDelay ... MinDelay = Delay5sec // MaxDelay ... MaxDelay = Delay3600sec )
var Intervals = []Interval{Delay5sec, Delay30sec, Delay60sec, Delay150sec, Delay300sec, Delay600sec, Delay1800sec, Delay3600sec}
Intervals during subscription creation, query from the allowed intervals list, and create all the needed topics for retry.
Functions ¶
func ExtractTopicMetaAndValidate ¶
func ExtractTopicMetaAndValidate(ctx context.Context, name string) (projectID string, topicName string, err error)
ExtractTopicMetaAndValidate extracts topic metadata from its fully qualified name
func ExtractTopicMetaAndValidateForCreate ¶
ExtractTopicMetaAndValidateForCreate extracts and validates the topic details, additionally for topic create it checks if name can collide with dlq topics
func GetTopicName ¶
GetTopicName helper return the public topic name using project and topic name using format
func GetTopicNameOnly ¶
GetTopicNameOnly from the complete Topic Name
func IsDLQTopic ¶
IsDLQTopic helper checks if the topic is dlq topic
func IsRetentionPolicyUnchanged ¶
IsRetentionPolicyUnchanged checks if the existing and the required retention policy are same or not
Types ¶
type Core ¶
type Core struct {
// contains filtered or unexported fields
}
Core implements all business logic for a topic
func (*Core) CreateDeadLetterTopic ¶
CreateDeadLetterTopic creates a dead letter topic for the given primary topic and name
func (*Core) CreateRetryTopic ¶
CreateRetryTopic creates a retry topic for the given primary topic and name
func (*Core) CreateSubscriptionTopic ¶
CreateSubscriptionTopic creates a subscription topic for the given primary topic and name
func (*Core) CreateTopic ¶
CreateTopic implements topic creation
func (*Core) DeleteProjectTopics ¶
DeleteProjectTopics deletes all topics for a given projectID
func (*Core) DeleteTopic ¶
DeleteTopic deletes a topic and all resources associated with it
func (*Core) ExistsWithName ¶
ExistsWithName checks if the topic exists with a given name
func (*Core) SetupTopicRetentionConfigs ¶
SetupTopicRetentionConfigs sets up retention policy on top of dlq topics
type ICore ¶
type ICore interface { CreateTopic(ctx context.Context, model *Model) error UpdateTopic(ctx context.Context, model *Model) error SetupTopicRetentionConfigs(ctx context.Context, names []string) ([]string, error) Exists(ctx context.Context, key string) (bool, error) ExistsWithName(ctx context.Context, name string) (bool, error) DeleteTopic(ctx context.Context, m *Model) error DeleteProjectTopics(ctx context.Context, projectID string) error Get(ctx context.Context, key string) (*Model, error) CreateSubscriptionTopic(ctx context.Context, model *Model) error CreateRetryTopic(ctx context.Context, model *Model) error CreateDeadLetterTopic(ctx context.Context, model *Model) error List(ctx context.Context, prefix string) ([]*Model, error) }
ICore is an interface over topic core
func NewCore ¶
func NewCore(repo IRepo, projectCore project.ICore, brokerStore brokerstore.IBrokerStore) ICore
NewCore returns an instance of Core
type IRepo ¶
type IRepo interface { common.IRepo List(ctx context.Context, prefix string) ([]common.IModel, error) }
IRepo interface over database repository
type Interval ¶
type Interval uint
Interval is internal delay type per allowed interval
var ( // Delay5sec 5sec Delay5sec Interval = 5 // Delay30sec 30sec Delay30sec Interval = 30 // Delay60sec 1min Delay60sec Interval = 60 // Delay150sec 2.5min Delay150sec Interval = 150 // Delay300sec 5min Delay300sec Interval = 300 // Delay600sec 10min Delay600sec Interval = 600 // Delay1800sec 30min Delay1800sec Interval = 1800 // Delay3600sec 60min Delay3600sec Interval = 3600 )
type Model ¶
type Model struct { common.BaseModel Name string `json:"name"` Labels map[string]string `json:"labels"` ExtractedProjectID string `json:"extracted_project_id"` ExtractedTopicName string `json:"extracted_topic_name"` NumPartitions int `json:"num_partitions"` }
Model for a topic
func GetValidatedModel ¶
GetValidatedModel validates an incoming proto request and returns the model
func GetValidatedTopicForAdminUpdate ¶
GetValidatedTopicForAdminUpdate validates an incoming proto request and returns the model
func (*Model) GetRetentionConfig ¶
GetRetentionConfig returns the retention policy for a given topic
func (*Model) IsDeadLetterTopic ¶
IsDeadLetterTopic checks if the topic is a dead letter topic created for dlq support on subscription
func (*Model) IsDelayTopic ¶
IsDelayTopic checks if the topic is a delay topic created for delay support in subscription
func (*Model) IsPrimaryTopic ¶
IsPrimaryTopic checks if the topic is primary topic or not
func (*Model) IsRetryTopic ¶
IsRetryTopic checks if the topic is a retry topic created for retry support in subscription
func (*Model) IsSubscriptionInternalTopic ¶
IsSubscriptionInternalTopic checks if the topic is subscription's internal topic or not