notification

package
v0.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ValidDurationRequestKey string = "valid_duration"

	FlowReceiver   string = "receiver"
	FlowSubscriber string = "subscriber"

	TypeAlert string = "alert"
	TypeEvent string = "event"
)

Variables

View Source
var ErrNoMessage = errors.New("no message found")

Functions

This section is empty.

Types

type AlertService

type AlertService interface {
	UpdateSilenceStatus(ctx context.Context, alertIDs []int64, hasSilenced bool, hasNonSilenced bool) error
}

type Config

type Config struct {
	GroupBy        []string      `mapstructure:"group_by" yaml:"group_by"`
	Queue          queues.Config `mapstructure:"queue" yaml:"queue"`
	MessageHandler HandlerConfig `mapstructure:"message_handler" yaml:"message_handler"`
	DLQHandler     HandlerConfig `mapstructure:"dlq_handler" yaml:"dlq_handler"`
}

type Deps

type Deps struct {
	IdempotencyRepository     IdempotencyRepository
	LogService                LogService
	ReceiverService           ReceiverService
	TemplateService           TemplateService
	SubscriptionService       SubscriptionService
	SilenceService            SilenceService
	AlertService              AlertService
	DispatchReceiverService   Dispatcher
	DispatchSubscriberService Dispatcher
}

type DispatchReceiverService

type DispatchReceiverService struct {
	// contains filtered or unexported fields
}

func NewDispatchReceiverService

func NewDispatchReceiverService(
	receiverService ReceiverService,
	templateService TemplateService,
	notifierPlugins map[string]Notifier) *DispatchReceiverService

func (*DispatchReceiverService) PrepareMessage

type DispatchSubscriberService

type DispatchSubscriberService struct {
	// contains filtered or unexported fields
}

func NewDispatchSubscriberService

func NewDispatchSubscriberService(
	logger saltlog.Logger,
	subscriptionService SubscriptionService,
	silenceService SilenceService,
	templateService TemplateService,
	notifierPlugins map[string]Notifier,
	enableSilenceFeature bool,
) *DispatchSubscriberService

func (*DispatchSubscriberService) PrepareMessage

type Dispatcher

type Dispatcher interface {
	PrepareMessage(ctx context.Context, n Notification) ([]Message, []log.Notification, bool, error)
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is a process to handle message publishing

func NewHandler

func NewHandler(cfg HandlerConfig, logger log.Logger, q Queuer, registry map[string]Notifier, opts ...HandlerOption) *Handler

NewHandler creates a new handler with some supported type of Notifiers

func (*Handler) MessageHandler

func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error

MessageHandler is a function to handler dequeued message

func (*Handler) Process

func (h *Handler) Process(ctx context.Context, runAt time.Time) error

type HandlerConfig

type HandlerConfig struct {
	Enabled       bool          `mapstructure:"enabled" yaml:"enabled" default:"true"`
	PollDuration  time.Duration `mapstructure:"poll_duration" yaml:"poll_duration" default:"5s"`
	ReceiverTypes []string      `mapstructure:"receiver_types" yaml:"receiver_types"`
	BatchSize     int           `mapstructure:"batch_size" yaml:"batch_size" default:"1"`
}

type HandlerOption

type HandlerOption func(*Handler)

HandlerOption is an option to customize handler creation

func HandlerWithBatchSize

func HandlerWithBatchSize(bs int) HandlerOption

HandlerWithBatchSize sets created handler with the specified batch size

func HandlerWithIdentifier

func HandlerWithIdentifier(identifier string) HandlerOption

HandlerWithIdentifier sets created handler with the specified batch size

type Idempotency

type Idempotency struct {
	ID             uint64    `json:"id"`
	Scope          string    `json:"scope"`
	Key            string    `json:"key"`
	NotificationID string    `json:"notification_id"`
	CreatedAt      time.Time `json:"created_at"`
	UpdatedAt      time.Time `json:"updated_at"`
}

type IdempotencyFilter

type IdempotencyFilter struct {
	TTL time.Duration
}

type IdempotencyRepository

type IdempotencyRepository interface {
	Create(ctx context.Context, scope, key, notificationID string) (*Idempotency, error)
	Check(ctx context.Context, scope, key string) (*Idempotency, error)
	Delete(context.Context, IdempotencyFilter) error
}

type LogService

type LogService interface {
	LogNotifications(ctx context.Context, nlogs ...log.Notification) error
}

type Message

type Message struct {
	ID             string
	NotificationID string
	Status         MessageStatus
	ReceiverType   string
	Configs        map[string]any // the datasource to build vendor-specific configs
	Details        map[string]any // the datasource to build vendor-specific message
	MaxTries       int
	ExpiredAt      time.Time
	CreatedAt      time.Time
	UpdatedAt      time.Time

	LastError string
	TryCount  int
	Retryable bool
	// contains filtered or unexported fields
}

Message is the model to be sent for a specific subscription's receiver

func InitMessage

func InitMessage(
	ctx context.Context,
	notifierPlugin Notifier,
	templateService TemplateService,
	n Notification,
	receiverType string,
	messageConfig map[string]any,
	opts ...MessageOption,
) (Message, error)

Initialize initializes the message with some default value or the customized value

func (*Message) MarkFailed

func (m *Message) MarkFailed(updatedAt time.Time, retryable bool, err error)

MarkFailed update message to the failed state

func (*Message) MarkPending

func (m *Message) MarkPending(updatedAt time.Time)

MarkPending update message to the pending state

func (*Message) MarkPublished

func (m *Message) MarkPublished(updatedAt time.Time)

MarkPublished update message to the published state

func (*Message) ToV1beta1Proto added in v0.7.3

func (m *Message) ToV1beta1Proto() (*sirenv1beta1.NotificationMessage, error)

type MessageOption

type MessageOption func(*Message)

MessageOption provides ability to configure the message initialization

func InitWithCreateTime

func InitWithCreateTime(timeNow time.Time) MessageOption

InitWithCreateTime initializes the message with custom create time

func InitWithExpiryDuration

func InitWithExpiryDuration(dur time.Duration) MessageOption

InitWithExpiryDuration initializes the message with the specified expiry duration

func InitWithID

func InitWithID(id string) MessageOption

InitWithID initializes the message with some ID

func InitWithMaxTries

func InitWithMaxTries(mt int) MessageOption

InitWithMaxTries initializes the message with custom max tries

type MessageStatus

type MessageStatus string

MessageStatus determines the state of the message

const (

	// additional details
	DetailsKeyNotificationType = "notification_type"

	MessageStatusEnqueued  MessageStatus = "enqueued"
	MessageStatusFailed    MessageStatus = "failed"
	MessageStatusPending   MessageStatus = "pending"
	MessageStatusPublished MessageStatus = "published"
)

func (MessageStatus) String

func (ms MessageStatus) String() string

type Notification

type Notification struct {
	ID                string              `json:"id"`
	NamespaceID       uint64              `json:"namespace_id"`
	Type              string              `json:"type"`
	Data              map[string]any      `json:"data"`
	Labels            map[string]string   `json:"labels"`
	ValidDuration     time.Duration       `json:"valid_duration"`
	Template          string              `json:"template"`
	UniqueKey         string              `json:"unique_key"`
	ReceiverSelectors []map[string]string `json:"receiver_selectors"`
	CreatedAt         time.Time           `json:"created_at"`

	// won't be stored in notification table, only to propagate this to notification_subscriber
	AlertIDs []int64
}

Notification is a model of notification

func (*Notification) EnrichID

func (n *Notification) EnrichID(id string)

func (Notification) Validate

func (n Notification) Validate(flow string) error

type Notifier

type Notifier interface {
	PreHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]any) (map[string]any, error)
	PostHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]any) (map[string]any, error)
	GetSystemDefaultTemplate() string
	Send(ctx context.Context, message Message) (bool, error)
}

type Queuer

type Queuer interface {
	ListMessages(ctx context.Context, notificationID string) ([]Message, error)
	Enqueue(ctx context.Context, ms ...Message) error
	Dequeue(ctx context.Context, receiverTypes []string, batchSize int, handlerFn func(context.Context, []Message) error) error
	SuccessCallback(ctx context.Context, ms Message) error
	ErrorCallback(ctx context.Context, ms Message) error
	Cleanup(ctx context.Context, filter queues.FilterCleanup) error
	Stop(ctx context.Context) error
}

type ReceiverService

type ReceiverService interface {
	List(ctx context.Context, flt receiver.Filter) ([]receiver.Receiver, error)
}

type Repository

type Repository interface {
	Transactor
	Create(context.Context, Notification) (Notification, error)
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is a service for notification domain

func NewService

func NewService(
	logger saltlog.Logger,
	cfg Config,
	repository Repository,
	q Queuer,
	notifierPlugins map[string]Notifier,
	deps Deps,
	enableSilenceFeature bool,
) *Service

NewService creates a new notification service

func (*Service) BuildFromAlerts added in v0.6.1

func (s *Service) BuildFromAlerts(
	alerts []alert.Alert,
	firingLen int,
	createdTime time.Time,
) ([]Notification, error)

Transform alerts and populate Data and Labels to be interpolated to the system-default template .Data - id - status "FIRING"/"RESOLVED" - resource - template - metric_value - metric_name - generator_url - num_alerts_firing - dashboard - playbook - summary .Labels - severity "WARNING"/"CRITICAL" - alertname - (others labels defined in rules)

func (*Service) CheckIdempotency added in v0.7.0

func (s *Service) CheckIdempotency(ctx context.Context, scope, key string) (string, error)

func (*Service) Dispatch

func (s *Service) Dispatch(ctx context.Context, n Notification) (string, error)

func (*Service) InsertIdempotency added in v0.7.0

func (s *Service) InsertIdempotency(ctx context.Context, scope, key, notificationID string) error

func (*Service) ListNotificationMessages added in v0.7.3

func (s *Service) ListNotificationMessages(ctx context.Context, notificationID string) ([]Message, error)

func (*Service) RemoveIdempotencies

func (s *Service) RemoveIdempotencies(ctx context.Context, TTL time.Duration) error

type SilenceService

type SilenceService interface {
	List(ctx context.Context, filter silence.Filter) ([]silence.Silence, error)
}

type SubscriptionService

type SubscriptionService interface {
	MatchByLabels(ctx context.Context, namespaceID uint64, labels map[string]string) ([]subscription.Subscription, error)
}

type TemplateService added in v0.7.1

type TemplateService interface {
	GetByName(ctx context.Context, name string) (*template.Template, error)
}

type Transactor added in v0.7.0

type Transactor interface {
	WithTransaction(ctx context.Context) context.Context
	Rollback(ctx context.Context, err error) error
	Commit(ctx context.Context) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL