Documentation ¶
Index ¶
- Constants
- Variables
- type AlertService
- type Config
- type Deps
- type DispatchReceiverService
- type DispatchSubscriberService
- type Dispatcher
- type Handler
- type HandlerConfig
- type HandlerOption
- type Idempotency
- type IdempotencyFilter
- type IdempotencyRepository
- type LogService
- type Message
- type MessageOption
- type MessageStatus
- type Notification
- type Notifier
- type Queuer
- type ReceiverService
- type Repository
- type Service
- func (s *Service) CheckAndInsertIdempotency(ctx context.Context, scope, key string) (uint64, error)
- func (s *Service) Dispatch(ctx context.Context, n Notification) error
- func (s *Service) MarkIdempotencyAsSuccess(ctx context.Context, id uint64) error
- func (s *Service) RemoveIdempotencies(ctx context.Context, TTL time.Duration) error
- type SilenceService
- type SubscriptionService
Constants ¶
const ( ReceiverIDLabelKey string = "receiver_id" ValidDurationRequestKey string = "valid_duration" TypeReceiver string = "receiver" TypeSubscriber string = "subscriber" )
Variables ¶
var ErrNoMessage = errors.New("no message found")
Functions ¶
This section is empty.
Types ¶
type AlertService ¶
type Config ¶
type Config struct { Queue queues.Config `mapstructure:"queue" yaml:"queue"` MessageHandler HandlerConfig `mapstructure:"message_handler" yaml:"message_handler"` DLQHandler HandlerConfig `mapstructure:"dlq_handler" yaml:"dlq_handler"` }
type Deps ¶
type Deps struct { IdempotencyRepository IdempotencyRepository LogService LogService ReceiverService ReceiverService SubscriptionService SubscriptionService SilenceService SilenceService AlertService AlertService DispatchReceiverService Dispatcher DispatchSubscriberService Dispatcher }
type DispatchReceiverService ¶
type DispatchReceiverService struct {
// contains filtered or unexported fields
}
func NewDispatchReceiverService ¶
func NewDispatchReceiverService(receiverService ReceiverService, notifierPlugins map[string]Notifier) *DispatchReceiverService
func (*DispatchReceiverService) PrepareMessage ¶
func (s *DispatchReceiverService) PrepareMessage(ctx context.Context, n Notification) ([]Message, []log.Notification, bool, error)
type DispatchSubscriberService ¶
type DispatchSubscriberService struct {
// contains filtered or unexported fields
}
func NewDispatchSubscriberService ¶
func NewDispatchSubscriberService( logger saltlog.Logger, subscriptionService SubscriptionService, silenceService SilenceService, notifierPlugins map[string]Notifier) *DispatchSubscriberService
func (*DispatchSubscriberService) PrepareMessage ¶
func (s *DispatchSubscriberService) PrepareMessage(ctx context.Context, n Notification) ([]Message, []log.Notification, bool, error)
type Dispatcher ¶
type Dispatcher interface {
PrepareMessage(ctx context.Context, n Notification) ([]Message, []log.Notification, bool, error)
}
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler is a process to handle message publishing
func NewHandler ¶
func NewHandler(cfg HandlerConfig, logger log.Logger, q Queuer, registry map[string]Notifier, opts ...HandlerOption) *Handler
NewHandler creates a new handler with some supported type of Notifiers
func (*Handler) MessageHandler ¶
MessageHandler is a function to handler dequeued message
type HandlerConfig ¶
type HandlerConfig struct { Enabled bool `mapstructure:"enabled" yaml:"enabled" default:"true"` PollDuration time.Duration `mapstructure:"poll_duration" yaml:"poll_duration" default:"5s"` ReceiverTypes []string `mapstructure:"receiver_types" yaml:"receiver_types"` BatchSize int `mapstructure:"batch_size" yaml:"batch_size" default:"1"` }
type HandlerOption ¶
type HandlerOption func(*Handler)
HandlerOption is an option to customize handler creation
func HandlerWithBatchSize ¶
func HandlerWithBatchSize(bs int) HandlerOption
HandlerWithBatchSize sets created handler with the specified batch size
func HandlerWithIdentifier ¶
func HandlerWithIdentifier(identifier string) HandlerOption
HandlerWithIdentifier sets created handler with the specified batch size
type Idempotency ¶
type IdempotencyFilter ¶
type IdempotencyRepository ¶
type LogService ¶
type LogService interface {
LogNotifications(ctx context.Context, nlogs ...log.Notification) error
}
type Message ¶
type Message struct { ID string Status MessageStatus ReceiverType string Configs map[string]interface{} // the datasource to build vendor-specific configs Details map[string]interface{} // the datasource to build vendor-specific message MaxTries int ExpiredAt time.Time CreatedAt time.Time UpdatedAt time.Time LastError string TryCount int Retryable bool // contains filtered or unexported fields }
Message is the model to be sent for a specific subscription's receiver
func InitMessage ¶
func InitMessage( ctx context.Context, notifierPlugin Notifier, n Notification, receiverType string, messageConfig map[string]interface{}, opts ...MessageOption, ) (Message, error)
Initialize initializes the message with some default value or the customized value
func (*Message) MarkFailed ¶
MarkFailed update message to the failed state
func (*Message) MarkPending ¶
MarkPending update message to the pending state
func (*Message) MarkPublished ¶
MarkPublished update message to the published state
type MessageOption ¶
type MessageOption func(*Message)
MessageOption provides ability to configure the message initialization
func InitWithCreateTime ¶
func InitWithCreateTime(timeNow time.Time) MessageOption
InitWithCreateTime initializes the message with custom create time
func InitWithExpiryDuration ¶
func InitWithExpiryDuration(dur time.Duration) MessageOption
InitWithExpiryDuration initializes the message with the specified expiry duration
func InitWithID ¶
func InitWithID(id string) MessageOption
InitWithID initializes the message with some ID
func InitWithMaxTries ¶
func InitWithMaxTries(mt int) MessageOption
InitWithMaxTries initializes the message with custom max tries
type MessageStatus ¶
type MessageStatus string
MessageStatus determines the state of the message
const ( // additional details DetailsKeyNotificationType = "notification_type" MessageStatusEnqueued MessageStatus = "enqueued" MessageStatusFailed MessageStatus = "failed" MessageStatusPending MessageStatus = "pending" MessageStatusPublished MessageStatus = "published" )
func (MessageStatus) String ¶
func (ms MessageStatus) String() string
type Notification ¶
type Notification struct { ID string `json:"id"` NamespaceID uint64 `json:"namespace_id"` Type string `json:"type"` Data map[string]interface{} `json:"data"` Labels map[string]string `json:"labels"` ValidDuration time.Duration `json:"valid_duration"` Template string `json:"template"` UniqueKey string `json:"unique_key"` CreatedAt time.Time `json:"created_at"` // won't be stored in notification table, only to propaget this to notification_subscriber AlertIDs []int64 }
Notification is a model of notification if type is `receiver`, it is expected for the labels to have receiver_id = int
func BuildFromAlerts ¶
func BuildFromAlerts( alerts []alert.Alert, firingLen int, createdTime time.Time, ) ([]Notification, error)
Transform alerts and populate Data and Labels to be interpolated to the system-default template .Data - id - status "FIRING"/"RESOLVED" - resource - template - metric_value - metric_name - generator_url - num_alerts_firing - dashboard - playbook - summary .Labels - severity "WARNING"/"CRITICAL" - alertname - (others labels defined in rules)
func BuildTypeReceiver ¶
func BuildTypeReceiver(receiverID uint64, payloadMap map[string]interface{}) (Notification, error)
BuildTypeReceiver builds a notification struct with receiver type flow
func (*Notification) EnrichID ¶
func (n *Notification) EnrichID(id string)
func (Notification) Validate ¶
func (n Notification) Validate() error
type Notifier ¶
type Notifier interface { PreHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]interface{}) (map[string]interface{}, error) PostHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]interface{}) (map[string]interface{}, error) GetSystemDefaultTemplate() string Send(ctx context.Context, message Message) (bool, error) }
type Queuer ¶
type Queuer interface { Enqueue(ctx context.Context, ms ...Message) error Dequeue(ctx context.Context, receiverTypes []string, batchSize int, handlerFn func(context.Context, []Message) error) error SuccessCallback(ctx context.Context, ms Message) error ErrorCallback(ctx context.Context, ms Message) error Type() string Cleanup(ctx context.Context, filter queues.FilterCleanup) error Stop(ctx context.Context) error }
type ReceiverService ¶
type Repository ¶
type Repository interface {
Create(context.Context, Notification) (Notification, error)
}
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is a service for notification domain
func NewService ¶
func NewService( logger saltlog.Logger, repository Repository, q Queuer, notifierPlugins map[string]Notifier, deps Deps, ) *Service
NewService creates a new notification service
func (*Service) CheckAndInsertIdempotency ¶
func (*Service) MarkIdempotencyAsSuccess ¶
type SilenceService ¶
type SubscriptionService ¶
type SubscriptionService interface {
MatchByLabels(ctx context.Context, namespaceID uint64, labels map[string]string) ([]subscription.Subscription, error)
}