notification

package
v0.6.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2023 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReceiverIDLabelKey      string = "receiver_id"
	ValidDurationRequestKey string = "valid_duration"

	TypeReceiver   string = "receiver"
	TypeSubscriber string = "subscriber"
)

Variables

View Source
var ErrNoMessage = errors.New("no message found")

Functions

This section is empty.

Types

type AlertService

type AlertService interface {
	UpdateSilenceStatus(ctx context.Context, alertIDs []int64, hasSilenced bool, hasNonSilenced bool) error
}

type Config

type Config struct {
	GroupBy        []string      `mapstructure:"group_by" yaml:"group_by"`
	Queue          queues.Config `mapstructure:"queue" yaml:"queue"`
	MessageHandler HandlerConfig `mapstructure:"message_handler" yaml:"message_handler"`
	DLQHandler     HandlerConfig `mapstructure:"dlq_handler" yaml:"dlq_handler"`
}

type Deps

type Deps struct {
	IdempotencyRepository     IdempotencyRepository
	LogService                LogService
	ReceiverService           ReceiverService
	SubscriptionService       SubscriptionService
	SilenceService            SilenceService
	AlertService              AlertService
	DispatchReceiverService   Dispatcher
	DispatchSubscriberService Dispatcher
}

type DispatchReceiverService

type DispatchReceiverService struct {
	// contains filtered or unexported fields
}

func NewDispatchReceiverService

func NewDispatchReceiverService(receiverService ReceiverService, notifierPlugins map[string]Notifier) *DispatchReceiverService

func (*DispatchReceiverService) PrepareMessage

type DispatchSubscriberService

type DispatchSubscriberService struct {
	// contains filtered or unexported fields
}

func NewDispatchSubscriberService

func NewDispatchSubscriberService(
	logger saltlog.Logger,
	subscriptionService SubscriptionService,
	silenceService SilenceService,
	notifierPlugins map[string]Notifier) *DispatchSubscriberService

func (*DispatchSubscriberService) PrepareMessage

type Dispatcher

type Dispatcher interface {
	PrepareMessage(ctx context.Context, n Notification) ([]Message, []log.Notification, bool, error)
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is a process to handle message publishing

func NewHandler

func NewHandler(cfg HandlerConfig, logger log.Logger, q Queuer, registry map[string]Notifier, opts ...HandlerOption) *Handler

NewHandler creates a new handler with some supported type of Notifiers

func (*Handler) MessageHandler

func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error

MessageHandler is a function to handler dequeued message

func (*Handler) Process

func (h *Handler) Process(ctx context.Context, runAt time.Time) error

type HandlerConfig

type HandlerConfig struct {
	Enabled       bool          `mapstructure:"enabled" yaml:"enabled" default:"true"`
	PollDuration  time.Duration `mapstructure:"poll_duration" yaml:"poll_duration" default:"5s"`
	ReceiverTypes []string      `mapstructure:"receiver_types" yaml:"receiver_types"`
	BatchSize     int           `mapstructure:"batch_size" yaml:"batch_size" default:"1"`
}

type HandlerOption

type HandlerOption func(*Handler)

HandlerOption is an option to customize handler creation

func HandlerWithBatchSize

func HandlerWithBatchSize(bs int) HandlerOption

HandlerWithBatchSize sets created handler with the specified batch size

func HandlerWithIdentifier

func HandlerWithIdentifier(identifier string) HandlerOption

HandlerWithIdentifier sets created handler with the specified batch size

type Idempotency

type Idempotency struct {
	ID        uint64
	Scope     string
	Key       string
	Success   bool
	CreatedAt time.Time
	UpdatedAt time.Time
}

type IdempotencyFilter

type IdempotencyFilter struct {
	TTL time.Duration
}

type IdempotencyRepository

type IdempotencyRepository interface {
	InsertOnConflictReturning(context.Context, string, string) (*Idempotency, error)
	UpdateSuccess(context.Context, uint64, bool) error
	Delete(context.Context, IdempotencyFilter) error
}

type LogService

type LogService interface {
	LogNotifications(ctx context.Context, nlogs ...log.Notification) error
}

type Message

type Message struct {
	ID           string
	Status       MessageStatus
	ReceiverType string
	Configs      map[string]any // the datasource to build vendor-specific configs
	Details      map[string]any // the datasource to build vendor-specific message
	MaxTries     int
	ExpiredAt    time.Time
	CreatedAt    time.Time
	UpdatedAt    time.Time

	LastError string
	TryCount  int
	Retryable bool
	// contains filtered or unexported fields
}

Message is the model to be sent for a specific subscription's receiver

func InitMessage

func InitMessage(
	ctx context.Context,
	notifierPlugin Notifier,
	n Notification,
	receiverType string,
	messageConfig map[string]any,
	opts ...MessageOption,
) (Message, error)

Initialize initializes the message with some default value or the customized value

func (*Message) MarkFailed

func (m *Message) MarkFailed(updatedAt time.Time, retryable bool, err error)

MarkFailed update message to the failed state

func (*Message) MarkPending

func (m *Message) MarkPending(updatedAt time.Time)

MarkPending update message to the pending state

func (*Message) MarkPublished

func (m *Message) MarkPublished(updatedAt time.Time)

MarkPublished update message to the published state

type MessageOption

type MessageOption func(*Message)

MessageOption provides ability to configure the message initialization

func InitWithCreateTime

func InitWithCreateTime(timeNow time.Time) MessageOption

InitWithCreateTime initializes the message with custom create time

func InitWithExpiryDuration

func InitWithExpiryDuration(dur time.Duration) MessageOption

InitWithExpiryDuration initializes the message with the specified expiry duration

func InitWithID

func InitWithID(id string) MessageOption

InitWithID initializes the message with some ID

func InitWithMaxTries

func InitWithMaxTries(mt int) MessageOption

InitWithMaxTries initializes the message with custom max tries

type MessageStatus

type MessageStatus string

MessageStatus determines the state of the message

const (

	// additional details
	DetailsKeyNotificationType = "notification_type"

	MessageStatusEnqueued  MessageStatus = "enqueued"
	MessageStatusFailed    MessageStatus = "failed"
	MessageStatusPending   MessageStatus = "pending"
	MessageStatusPublished MessageStatus = "published"
)

func (MessageStatus) String

func (ms MessageStatus) String() string

type Notification

type Notification struct {
	ID            string            `json:"id"`
	NamespaceID   uint64            `json:"namespace_id"`
	Type          string            `json:"type"`
	Data          map[string]any    `json:"data"`
	Labels        map[string]string `json:"labels"`
	ValidDuration time.Duration     `json:"valid_duration"`
	Template      string            `json:"template"`
	UniqueKey     string            `json:"unique_key"`
	CreatedAt     time.Time         `json:"created_at"`

	// won't be stored in notification table, only to propaget this to notification_subscriber
	AlertIDs []int64
}

Notification is a model of notification if type is `receiver`, it is expected for the labels to have receiver_id = int

func BuildTypeReceiver

func BuildTypeReceiver(receiverID uint64, payloadMap map[string]any) (Notification, error)

BuildTypeReceiver builds a notification struct with receiver type flow

func (*Notification) EnrichID

func (n *Notification) EnrichID(id string)

func (Notification) Validate

func (n Notification) Validate() error

type Notifier

type Notifier interface {
	PreHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]any) (map[string]any, error)
	PostHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]any) (map[string]any, error)
	GetSystemDefaultTemplate() string
	Send(ctx context.Context, message Message) (bool, error)
}

type Queuer

type Queuer interface {
	Enqueue(ctx context.Context, ms ...Message) error
	Dequeue(ctx context.Context, receiverTypes []string, batchSize int, handlerFn func(context.Context, []Message) error) error
	SuccessCallback(ctx context.Context, ms Message) error
	ErrorCallback(ctx context.Context, ms Message) error
	Type() string
	Cleanup(ctx context.Context, filter queues.FilterCleanup) error
	Stop(ctx context.Context) error
}

type ReceiverService

type ReceiverService interface {
	Get(ctx context.Context, id uint64, gopts ...receiver.GetOption) (*receiver.Receiver, error)
}

type Repository

type Repository interface {
	Create(context.Context, Notification) (Notification, error)
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is a service for notification domain

func NewService

func NewService(
	logger saltlog.Logger,
	cfg Config,
	repository Repository,
	q Queuer,
	notifierPlugins map[string]Notifier,
	deps Deps,
) *Service

NewService creates a new notification service

func (*Service) BuildFromAlerts added in v0.6.1

func (s *Service) BuildFromAlerts(
	alerts []alert.Alert,
	firingLen int,
	createdTime time.Time,
) ([]Notification, error)

Transform alerts and populate Data and Labels to be interpolated to the system-default template .Data - id - status "FIRING"/"RESOLVED" - resource - template - metric_value - metric_name - generator_url - num_alerts_firing - dashboard - playbook - summary .Labels - severity "WARNING"/"CRITICAL" - alertname - (others labels defined in rules)

func (*Service) CheckAndInsertIdempotency

func (s *Service) CheckAndInsertIdempotency(ctx context.Context, scope, key string) (uint64, error)

func (*Service) Dispatch

func (s *Service) Dispatch(ctx context.Context, n Notification) error

func (*Service) MarkIdempotencyAsSuccess

func (s *Service) MarkIdempotencyAsSuccess(ctx context.Context, id uint64) error

func (*Service) RemoveIdempotencies

func (s *Service) RemoveIdempotencies(ctx context.Context, TTL time.Duration) error

type SilenceService

type SilenceService interface {
	List(ctx context.Context, filter silence.Filter) ([]silence.Silence, error)
}

type SubscriptionService

type SubscriptionService interface {
	MatchByLabels(ctx context.Context, namespaceID uint64, labels map[string]string) ([]subscription.Subscription, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL