notification

package
v0.5.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2023 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ReceiverIDLabelKey      string = "receiver_id"
	ValidDurationRequestKey string = "valid_duration"

	TypeReceiver   string = "receiver"
	TypeSubscriber string = "subscriber"
)

Variables

View Source
var ErrNoMessage = errors.New("no message found")

Functions

This section is empty.

Types

type AlertService added in v0.5.8

type AlertService interface {
	UpdateSilenceStatus(ctx context.Context, alertIDs []int64, hasSilenced bool, hasNonSilenced bool) error
}

type Config

type Config struct {
	Queue          queues.Config `mapstructure:"queue" yaml:"queue"`
	MessageHandler HandlerConfig `mapstructure:"message_handler" yaml:"message_handler"`
	DLQHandler     HandlerConfig `mapstructure:"dlq_handler" yaml:"dlq_handler"`
}

type Deps added in v0.5.8

type Deps struct {
	IdempotencyRepository     IdempotencyRepository
	LogService                LogService
	ReceiverService           ReceiverService
	SubscriptionService       SubscriptionService
	SilenceService            SilenceService
	AlertService              AlertService
	DispatchReceiverService   Dispatcher
	DispatchSubscriberService Dispatcher
}

type DispatchReceiverService added in v0.5.8

type DispatchReceiverService struct {
	// contains filtered or unexported fields
}

func NewDispatchReceiverService added in v0.5.8

func NewDispatchReceiverService(receiverService ReceiverService, notifierPlugins map[string]Notifier) *DispatchReceiverService

func (*DispatchReceiverService) PrepareMessage added in v0.5.8

type DispatchSubscriberService added in v0.5.8

type DispatchSubscriberService struct {
	// contains filtered or unexported fields
}

func NewDispatchSubscriberService added in v0.5.8

func NewDispatchSubscriberService(
	logger saltlog.Logger,
	subscriptionService SubscriptionService,
	silenceService SilenceService,
	notifierPlugins map[string]Notifier) *DispatchSubscriberService

func (*DispatchSubscriberService) PrepareMessage added in v0.5.8

type Dispatcher added in v0.5.8

type Dispatcher interface {
	PrepareMessage(ctx context.Context, n Notification) ([]Message, []log.Notification, bool, error)
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is a process to handle message publishing

func NewHandler

func NewHandler(cfg HandlerConfig, logger log.Logger, q Queuer, registry map[string]Notifier, opts ...HandlerOption) *Handler

NewHandler creates a new handler with some supported type of Notifiers

func (*Handler) MessageHandler

func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error

MessageHandler is a function to handler dequeued message

func (*Handler) Process

func (h *Handler) Process(ctx context.Context, runAt time.Time) error

type HandlerConfig

type HandlerConfig struct {
	Enabled       bool          `mapstructure:"enabled" yaml:"enabled" default:"true"`
	PollDuration  time.Duration `mapstructure:"poll_duration" yaml:"poll_duration" default:"5s"`
	ReceiverTypes []string      `mapstructure:"receiver_types" yaml:"receiver_types"`
	BatchSize     int           `mapstructure:"batch_size" yaml:"batch_size" default:"1"`
}

type HandlerOption

type HandlerOption func(*Handler)

HandlerOption is an option to customize handler creation

func HandlerWithBatchSize

func HandlerWithBatchSize(bs int) HandlerOption

HandlerWithBatchSize sets created handler with the specified batch size

func HandlerWithIdentifier

func HandlerWithIdentifier(identifier string) HandlerOption

HandlerWithIdentifier sets created handler with the specified batch size

type Idempotency added in v0.5.8

type Idempotency struct {
	ID        uint64
	Scope     string
	Key       string
	Success   bool
	CreatedAt time.Time
	UpdatedAt time.Time
}

type IdempotencyFilter added in v0.5.8

type IdempotencyFilter struct {
	TTL time.Duration
}

type IdempotencyRepository added in v0.5.6

type IdempotencyRepository interface {
	InsertOnConflictReturning(context.Context, string, string) (*Idempotency, error)
	UpdateSuccess(context.Context, uint64, bool) error
	Delete(context.Context, IdempotencyFilter) error
}

type LogService added in v0.5.8

type LogService interface {
	LogNotifications(ctx context.Context, nlogs ...log.Notification) error
}

type Message

type Message struct {
	ID           string
	Status       MessageStatus
	ReceiverType string
	Configs      map[string]interface{} // the datasource to build vendor-specific configs
	Details      map[string]interface{} // the datasource to build vendor-specific message
	MaxTries     int
	ExpiredAt    time.Time
	CreatedAt    time.Time
	UpdatedAt    time.Time

	LastError string
	TryCount  int
	Retryable bool
	// contains filtered or unexported fields
}

Message is the model to be sent for a specific subscription's receiver

func InitMessage added in v0.5.8

func InitMessage(
	ctx context.Context,
	notifierPlugin Notifier,
	n Notification,
	receiverType string,
	messageConfig map[string]interface{},
	opts ...MessageOption,
) (Message, error)

Initialize initializes the message with some default value or the customized value

func (*Message) MarkFailed

func (m *Message) MarkFailed(updatedAt time.Time, retryable bool, err error)

MarkFailed update message to the failed state

func (*Message) MarkPending

func (m *Message) MarkPending(updatedAt time.Time)

MarkPending update message to the pending state

func (*Message) MarkPublished

func (m *Message) MarkPublished(updatedAt time.Time)

MarkPublished update message to the published state

type MessageOption

type MessageOption func(*Message)

MessageOption provides ability to configure the message initialization

func InitWithCreateTime

func InitWithCreateTime(timeNow time.Time) MessageOption

InitWithCreateTime initializes the message with custom create time

func InitWithExpiryDuration

func InitWithExpiryDuration(dur time.Duration) MessageOption

InitWithExpiryDuration initializes the message with the specified expiry duration

func InitWithID

func InitWithID(id string) MessageOption

InitWithID initializes the message with some ID

func InitWithMaxTries

func InitWithMaxTries(mt int) MessageOption

InitWithMaxTries initializes the message with custom max tries

type MessageStatus

type MessageStatus string

MessageStatus determines the state of the message

const (

	// additional details
	DetailsKeyNotificationType = "notification_type"

	MessageStatusEnqueued  MessageStatus = "enqueued"
	MessageStatusFailed    MessageStatus = "failed"
	MessageStatusPending   MessageStatus = "pending"
	MessageStatusPublished MessageStatus = "published"
)

func (MessageStatus) String added in v0.5.2

func (ms MessageStatus) String() string

type Notification

type Notification struct {
	ID            string                 `json:"id"`
	NamespaceID   uint64                 `json:"namespace_id"`
	Type          string                 `json:"type"`
	Data          map[string]interface{} `json:"data"`
	Labels        map[string]string      `json:"labels"`
	ValidDuration time.Duration          `json:"valid_duration"`
	Template      string                 `json:"template"`
	UniqueKey     string                 `json:"unique_key"`
	CreatedAt     time.Time              `json:"created_at"`

	// won't be stored in notification table, only to propaget this to notification_subscriber
	AlertIDs []int64
}

Notification is a model of notification if type is `receiver`, it is expected for the labels to have receiver_id = int

func BuildFromAlerts added in v0.5.8

func BuildFromAlerts(
	alerts []alert.Alert,
	firingLen int,
	createdTime time.Time,
) ([]Notification, error)

Transform alerts and populate Data and Labels to be interpolated to the system-default template .Data - id - status "FIRING"/"RESOLVED" - resource - template - metric_value - metric_name - generator_url - num_alerts_firing - dashboard - playbook - summary .Labels - severity "WARNING"/"CRITICAL" - alertname - (others labels defined in rules)

func BuildTypeReceiver added in v0.5.8

func BuildTypeReceiver(receiverID uint64, payloadMap map[string]interface{}) (Notification, error)

BuildTypeReceiver builds a notification struct with receiver type flow

func (*Notification) EnrichID added in v0.5.8

func (n *Notification) EnrichID(id string)

func (Notification) Validate added in v0.5.8

func (n Notification) Validate() error

type Notifier

type Notifier interface {
	PreHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]interface{}) (map[string]interface{}, error)
	PostHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]interface{}) (map[string]interface{}, error)
	GetSystemDefaultTemplate() string
	Send(ctx context.Context, message Message) (bool, error)
}

type Queuer

type Queuer interface {
	Enqueue(ctx context.Context, ms ...Message) error
	Dequeue(ctx context.Context, receiverTypes []string, batchSize int, handlerFn func(context.Context, []Message) error) error
	SuccessCallback(ctx context.Context, ms Message) error
	ErrorCallback(ctx context.Context, ms Message) error
	Type() string
	Cleanup(ctx context.Context, filter queues.FilterCleanup) error
	Stop(ctx context.Context) error
}

type ReceiverService

type ReceiverService interface {
	Get(ctx context.Context, id uint64, gopts ...receiver.GetOption) (*receiver.Receiver, error)
}

type Repository added in v0.5.8

type Repository interface {
	Create(context.Context, Notification) (Notification, error)
}

type Service added in v0.5.6

type Service struct {
	// contains filtered or unexported fields
}

Service is a service for notification domain

func NewService

func NewService(
	logger saltlog.Logger,
	repository Repository,
	q Queuer,
	notifierPlugins map[string]Notifier,
	deps Deps,
) *Service

NewService creates a new notification service

func (*Service) CheckAndInsertIdempotency added in v0.5.6

func (s *Service) CheckAndInsertIdempotency(ctx context.Context, scope, key string) (uint64, error)

func (*Service) Dispatch added in v0.5.8

func (s *Service) Dispatch(ctx context.Context, n Notification) error

func (*Service) MarkIdempotencyAsSuccess added in v0.5.6

func (s *Service) MarkIdempotencyAsSuccess(ctx context.Context, id uint64) error

func (*Service) RemoveIdempotencies added in v0.5.6

func (s *Service) RemoveIdempotencies(ctx context.Context, TTL time.Duration) error

type SilenceService added in v0.5.8

type SilenceService interface {
	List(ctx context.Context, filter silence.Filter) ([]silence.Silence, error)
}

type SubscriptionService

type SubscriptionService interface {
	MatchByLabels(ctx context.Context, namespaceID uint64, labels map[string]string) ([]subscription.Subscription, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL