notification

package
v0.7.7-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ValidDurationRequestKey string = "valid_duration"

	RouterReceiver   string = "receiver"
	RouterSubscriber string = "subscriber"

	TypeAlert string = "alert"
	TypeEvent string = "event"

	DispatchKindBulkNotification   = "bulk_notification"
	DispatchKindSingleNotification = "single_notification"
)

Variables

View Source
var (
	ErrNoMessage                   = errors.New("no message found")
	ErrRouteSubscriberNoMatchFound = errors.New("not matching any subscription")
)

Functions

This section is empty.

Types

type AlertRepository added in v0.7.6

type AlertRepository interface {
	BulkUpdateSilence(context.Context, []int64, string) error
}

type Config

type Config struct {
	MaxNumReceiverSelectors int           `mapstructure:"max_num_receiver_selectors" yaml:"max_num_receiver_selectors" default:"10"`
	MaxMessagesReceiverFlow int           `mapstructure:"max_messages_receiver_flow" yaml:"max_messages_receiver_flow" default:"10"`
	Queue                   queues.Config `mapstructure:"queue" yaml:"queue"`
	MessageHandler          HandlerConfig `mapstructure:"message_handler" yaml:"message_handler"`
	DLQHandler              HandlerConfig `mapstructure:"dlq_handler" yaml:"dlq_handler"`
	GroupBy                 []string      `mapstructure:"group_by" yaml:"group_by"`

	// experimental: derived from service.Config
	SubscriptionV2Enabled bool
	EnableSilenceFeature  bool
}

type Deps

type Deps struct {
	Cfg                   Config
	Logger                saltlog.Logger
	Repository            Repository
	Q                     Queuer
	IdempotencyRepository IdempotencyRepository
	AlertRepository       AlertRepository
	LogService            LogService
	ReceiverService       ReceiverService
	TemplateService       TemplateService
	SubscriptionService   SubscriptionService
	SilenceService        SilenceService
}

type DispatchBulkNotificationService added in v0.7.6

type DispatchBulkNotificationService struct {
	// contains filtered or unexported fields
}

DispatchBulkNotificationService only supports subscriber routing and not supporting direct receiver routing

func NewDispatchBulkNotificationService added in v0.7.6

func NewDispatchBulkNotificationService(
	deps Deps,
	notifierPlugins map[string]Notifier,
	routerMap map[string]Router,
) *DispatchBulkNotificationService

func (*DispatchBulkNotificationService) Dispatch added in v0.7.6

func (s *DispatchBulkNotificationService) Dispatch(ctx context.Context, ns []Notification) (notificationIDs []string, err error)

func (*DispatchBulkNotificationService) RenderMessages added in v0.7.6

func (s *DispatchBulkNotificationService) RenderMessages(ctx context.Context, metaMessages []MetaMessage) (messages []Message, err error)

type DispatchSingleNotificationService added in v0.7.6

type DispatchSingleNotificationService struct {
	// contains filtered or unexported fields
}

DispatchSingleNotificationService supports subscriber routing and receiver routing at the same time

func NewDispatchSingleNotificationService added in v0.7.6

func NewDispatchSingleNotificationService(
	deps Deps,
	notifierPlugins map[string]Notifier,
	routerMap map[string]Router,
) *DispatchSingleNotificationService

func (*DispatchSingleNotificationService) Dispatch added in v0.7.6

type Dispatcher

type Dispatcher interface {
	Dispatch(ctx context.Context, ns []Notification) ([]string, error)
}

type Filter added in v0.7.4

type Filter struct {
	Type             string
	Template         string
	Labels           map[string]string
	ReceiverSelector map[string]string
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is a process to handle message publishing

func NewHandler

func NewHandler(cfg HandlerConfig, logger log.Logger, q Queuer, registry map[string]Notifier, opts ...HandlerOption) *Handler

NewHandler creates a new handler with some supported type of Notifiers

func (*Handler) MessageHandler

func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error

MessageHandler is a function to handler dequeued message

func (*Handler) Process

func (h *Handler) Process(ctx context.Context, runAt time.Time) error

func (*Handler) SingleMessageHandler added in v0.7.6

func (h *Handler) SingleMessageHandler(ctx context.Context, msg *Message) error

type HandlerConfig

type HandlerConfig struct {
	Enabled       bool          `mapstructure:"enabled" yaml:"enabled" default:"true"`
	PollDuration  time.Duration `mapstructure:"poll_duration" yaml:"poll_duration" default:"5s"`
	ReceiverTypes []string      `mapstructure:"receiver_types" yaml:"receiver_types"`
	BatchSize     int           `mapstructure:"batch_size" yaml:"batch_size" default:"1"`
}

type HandlerOption

type HandlerOption func(*Handler)

HandlerOption is an option to customize handler creation

func HandlerWithBatchSize

func HandlerWithBatchSize(bs int) HandlerOption

HandlerWithBatchSize sets created handler with the specified batch size

func HandlerWithIdentifier

func HandlerWithIdentifier(identifier string) HandlerOption

HandlerWithIdentifier sets created handler with the specified batch size

type Idempotency

type Idempotency struct {
	ID             uint64    `json:"id"`
	Scope          string    `json:"scope"`
	Key            string    `json:"key"`
	NotificationID string    `json:"notification_id"`
	CreatedAt      time.Time `json:"created_at"`
	UpdatedAt      time.Time `json:"updated_at"`
}

type IdempotencyFilter

type IdempotencyFilter struct {
	TTL time.Duration
}

type IdempotencyRepository

type IdempotencyRepository interface {
	Create(ctx context.Context, scope, key, notificationID string) (*Idempotency, error)
	Check(ctx context.Context, scope, key string) (*Idempotency, error)
	Delete(context.Context, IdempotencyFilter) error
}

type LogService

type LogService interface {
	LogNotifications(ctx context.Context, nlogs ...log.Notification) error
}

type Message

type Message struct {
	ID              string
	NotificationIDs []string
	Status          MessageStatus
	ReceiverType    string
	Configs         map[string]any // the datasource to build vendor-specific configs
	Details         map[string]any // the datasource to build vendor-specific message
	MaxTries        int
	ExpiredAt       time.Time
	CreatedAt       time.Time
	UpdatedAt       time.Time

	LastError string
	TryCount  int
	Retryable bool
	// contains filtered or unexported fields
}

Message is the model to be sent for a specific subscription's receiver

func InitMessage

func InitMessage(
	ctx context.Context,
	notifierPlugin Notifier,
	templateService TemplateService,
	n Notification,
	receiverType string,
	messageConfig map[string]any,
	opts ...MessageOption,
) (Message, error)

Initialize initializes the message with some default value or the customized value

func InitMessageByMetaMessage added in v0.7.6

func InitMessageByMetaMessage(
	ctx context.Context,
	notifierPlugin Notifier,
	templateService TemplateService,
	mm MetaMessage,
	opts ...MessageOption,
) (Message, error)

func (*Message) MarkFailed

func (m *Message) MarkFailed(updatedAt time.Time, retryable bool, err error)

MarkFailed update message to the failed state

func (*Message) MarkPending

func (m *Message) MarkPending(updatedAt time.Time)

MarkPending update message to the pending state

func (*Message) MarkPublished

func (m *Message) MarkPublished(updatedAt time.Time)

MarkPublished update message to the published state

func (*Message) ToV1beta1Proto added in v0.7.3

func (m *Message) ToV1beta1Proto() (*sirenv1beta1.NotificationMessage, error)

type MessageOption

type MessageOption func(*Message)

MessageOption provides ability to configure the message initialization

func InitWithCreateTime

func InitWithCreateTime(timeNow time.Time) MessageOption

InitWithCreateTime initializes the message with custom create time

func InitWithExpiryDuration

func InitWithExpiryDuration(dur time.Duration) MessageOption

InitWithExpiryDuration initializes the message with the specified expiry duration

func InitWithID

func InitWithID(id string) MessageOption

InitWithID initializes the message with some ID

func InitWithMaxTries

func InitWithMaxTries(mt int) MessageOption

InitWithMaxTries initializes the message with custom max tries

type MessageStatus

type MessageStatus string

MessageStatus determines the state of the message

const (

	// additional details
	DetailsKeyNotificationType = "notification_type"

	MessageStatusEnqueued  MessageStatus = "enqueued"
	MessageStatusFailed    MessageStatus = "failed"
	MessageStatusPending   MessageStatus = "pending"
	MessageStatusPublished MessageStatus = "published"
)

func (MessageStatus) String

func (ms MessageStatus) String() string

type MetaMessage added in v0.7.6

type MetaMessage struct {
	ReceiverID       uint64
	SubscriptionIDs  []uint64
	ReceiverType     string
	NotificationIDs  []string
	NotificationType string
	ReceiverConfigs  map[string]any
	Data             map[string]any
	ValidDuration    time.Duration
	Template         string
	Labels           map[string]string
	MergedLabels     map[string][]string
}

func MergeMetaMessage added in v0.7.6

func MergeMetaMessage(from MetaMessage, to MetaMessage) MetaMessage

func ReduceMetaMessages added in v0.7.6

func ReduceMetaMessages(metaMessages []MetaMessage, groupBy []string) ([]MetaMessage, error)

type Notification

type Notification struct {
	ID                string              `json:"id"`
	NamespaceID       uint64              `json:"namespace_id"`
	Type              string              `json:"type"`
	Data              map[string]any      `json:"data"`
	Labels            map[string]string   `json:"labels"`
	ValidDuration     time.Duration       `json:"valid_duration"`
	Template          string              `json:"template"`
	UniqueKey         string              `json:"unique_key"`
	ReceiverSelectors []map[string]string `json:"receiver_selectors"`
	CreatedAt         time.Time           `json:"created_at"`

	// won't be stored in notification table, only to propagate this to notification_subscriber
	AlertIDs []int64
}

Notification is a model of notification

func (*Notification) EnrichID

func (n *Notification) EnrichID(id string)

func (Notification) MetaMessage added in v0.7.6

func (n Notification) MetaMessage(receiverView subscription.ReceiverView) MetaMessage

func (Notification) Validate

func (n Notification) Validate(routerKind string) error

type Notifier

type Notifier interface {
	PreHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]any) (map[string]any, error)
	PostHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]any) (map[string]any, error)
	GetSystemDefaultTemplate() string
	Send(ctx context.Context, message Message) (bool, error)
}

type Queuer

type Queuer interface {
	ListMessages(ctx context.Context, notificationID string) ([]Message, error)
	Enqueue(ctx context.Context, ms ...Message) error
	Dequeue(ctx context.Context, receiverTypes []string, batchSize int, handlerFn func(context.Context, []Message) error) error
	SuccessCallback(ctx context.Context, ms Message) error
	ErrorCallback(ctx context.Context, ms Message) error
	Cleanup(ctx context.Context, filter queues.FilterCleanup) error
	Stop(ctx context.Context) error
}

type ReceiverService

type ReceiverService interface {
	List(ctx context.Context, flt receiver.Filter) ([]receiver.Receiver, error)
}

type Repository

type Repository interface {
	Transactor
	BulkCreate(context.Context, []Notification) ([]Notification, error)
	Create(context.Context, Notification) (Notification, error)
	List(context.Context, Filter) ([]Notification, error)
}

type Router added in v0.7.6

type Router interface {
	PrepareMessage(ctx context.Context, n Notification) ([]Message, []log.Notification, bool, error)
	PrepareMessageV2(ctx context.Context, n Notification) ([]Message, []log.Notification, bool, error)
	PrepareMetaMessages(ctx context.Context, n Notification) (metaMessages []MetaMessage, notificationLogs []log.Notification, err error)
}

type RouterReceiverService added in v0.7.6

type RouterReceiverService struct {
	// contains filtered or unexported fields
}

func NewRouterReceiverService added in v0.7.6

func NewRouterReceiverService(
	deps Deps,
	notifierPlugins map[string]Notifier,
) *RouterReceiverService

func (*RouterReceiverService) PrepareMessage added in v0.7.6

func (s *RouterReceiverService) PrepareMessage(ctx context.Context, n Notification) ([]Message, []log.Notification, bool, error)

func (*RouterReceiverService) PrepareMessageV2 added in v0.7.6

func (s *RouterReceiverService) PrepareMessageV2(ctx context.Context, n Notification) ([]Message, []log.Notification, bool, error)

func (*RouterReceiverService) PrepareMetaMessages added in v0.7.6

func (s *RouterReceiverService) PrepareMetaMessages(ctx context.Context, n Notification) (metaMessages []MetaMessage, notificationLogs []log.Notification, err error)

type RouterSubscriberService added in v0.7.6

type RouterSubscriberService struct {
	// contains filtered or unexported fields
}

func NewRouterSubscriberService added in v0.7.6

func NewRouterSubscriberService(
	deps Deps,
	notifierPlugins map[string]Notifier,
) *RouterSubscriberService

func (*RouterSubscriberService) PrepareMessage added in v0.7.6

func (*RouterSubscriberService) PrepareMessageV2 added in v0.7.6

func (s *RouterSubscriberService) PrepareMessageV2(ctx context.Context, n Notification) (messages []Message, notificationLogs []log.Notification, hasSilenced bool, err error)

func (*RouterSubscriberService) PrepareMetaMessages added in v0.7.6

func (s *RouterSubscriberService) PrepareMetaMessages(ctx context.Context, n Notification) (metaMessages []MetaMessage, notificationLogs []log.Notification, err error)

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is a service for notification domain

func NewService

func NewService(
	deps Deps,
	dispatchServiceMap map[string]Dispatcher,
) *Service

NewService creates a new notification service

func (*Service) CheckIdempotency added in v0.7.0

func (s *Service) CheckIdempotency(ctx context.Context, scope, key string) (string, error)

func (*Service) Dispatch

func (s *Service) Dispatch(ctx context.Context, ns []Notification, dispatcherKind string) ([]string, error)

func (*Service) InsertIdempotency added in v0.7.0

func (s *Service) InsertIdempotency(ctx context.Context, scope, key, notificationID string) error

func (*Service) List added in v0.7.4

func (s *Service) List(ctx context.Context, flt Filter) ([]Notification, error)

func (*Service) ListNotificationMessages added in v0.7.3

func (s *Service) ListNotificationMessages(ctx context.Context, notificationID string) ([]Message, error)

func (*Service) RemoveIdempotencies

func (s *Service) RemoveIdempotencies(ctx context.Context, TTL time.Duration) error

type SilenceService

type SilenceService interface {
	List(ctx context.Context, filter silence.Filter) ([]silence.Silence, error)
}

type SubscriptionService

type SubscriptionService interface {
	MatchByLabels(ctx context.Context, namespaceID uint64, labels map[string]string) ([]subscription.Subscription, error)
	MatchByLabelsV2(ctx context.Context, namespaceID uint64, labels map[string]string) ([]subscription.ReceiverView, error)
}

type TemplateService added in v0.7.1

type TemplateService interface {
	GetByName(ctx context.Context, name string) (*template.Template, error)
}

type Transactor added in v0.7.0

type Transactor interface {
	WithTransaction(ctx context.Context) context.Context
	Rollback(ctx context.Context, err error) error
	Commit(ctx context.Context) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL