notification

package
v0.5.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoMessage = errors.New("no message found")

Functions

This section is empty.

Types

type Config

type Config struct {
	Queue          queues.Config `mapstructure:"queue" yaml:"queue"`
	MessageHandler HandlerConfig `mapstructure:"message_handler" yaml:"message_handler"`
	DLQHandler     HandlerConfig `mapstructure:"dlq_handler" yaml:"dlq_handler"`
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is a process to handle message publishing

func NewHandler

func NewHandler(cfg HandlerConfig, logger log.Logger, q Queuer, registry map[string]Notifier, opts ...HandlerOption) *Handler

NewHandler creates a new handler with some supported type of Notifiers

func (*Handler) MessageHandler

func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error

MessageHandler is a function to handler dequeued message

func (*Handler) Process

func (h *Handler) Process(ctx context.Context, runAt time.Time) error

type HandlerConfig

type HandlerConfig struct {
	Enabled       bool          `mapstructure:"enabled" yaml:"enabled" default:"true"`
	PollDuration  time.Duration `mapstructure:"poll_duration" yaml:"poll_duration" default:"5s"`
	ReceiverTypes []string      `mapstructure:"receiver_types" yaml:"receiver_types"`
	BatchSize     int           `mapstructure:"batch_size" yaml:"batch_size" default:"1"`
}

type HandlerOption

type HandlerOption func(*Handler)

HandlerOption is an option to customize handler creation

func HandlerWithBatchSize

func HandlerWithBatchSize(bs int) HandlerOption

HandlerWithBatchSize sets created handler with the specified batch size

func HandlerWithIdentifier

func HandlerWithIdentifier(identifier string) HandlerOption

HandlerWithIdentifier sets created handler with the specified batch size

type IdempotencyRepository added in v0.5.6

type IdempotencyRepository interface {
	InsertOnConflictReturning(context.Context, string, string) (*idempotency.Idempotency, error)
	UpdateSuccess(context.Context, uint64, bool) error
	Delete(context.Context, idempotency.Filter) error
}

type Message

type Message struct {
	ID     string
	Status MessageStatus

	ReceiverType string
	Configs      map[string]interface{} // the datasource to build vendor-specific configs
	Details      map[string]interface{} // the datasource to build vendor-specific message
	LastError    string

	MaxTries  int
	TryCount  int
	Retryable bool

	ExpiredAt time.Time
	CreatedAt time.Time
	UpdatedAt time.Time
	// contains filtered or unexported fields
}

Message is the model to be sent for a specific subscription's receiver

func (*Message) AddStringDetail

func (m *Message) AddStringDetail(key, value string)

AddDetail adds a custom kv string detail

func (*Message) Initialize

func (m *Message) Initialize(
	n Notification,
	receiverType string,
	notificationConfigs map[string]interface{},
	opts ...MessageOption,
)

Initialize initializes the message with some default value or the customized value

func (*Message) MarkFailed

func (m *Message) MarkFailed(updatedAt time.Time, retryable bool, err error)

MarkFailed update message to the failed state

func (*Message) MarkPending

func (m *Message) MarkPending(updatedAt time.Time)

MarkPending update message to the pending state

func (*Message) MarkPublished

func (m *Message) MarkPublished(updatedAt time.Time)

MarkPublished update message to the published state

type MessageOption

type MessageOption func(*Message)

MessageOption provides ability to configure the message initialization

func InitWithCreateTime

func InitWithCreateTime(timeNow time.Time) MessageOption

InitWithCreateTime initializes the message with custom create time

func InitWithExpiryDuration

func InitWithExpiryDuration(dur time.Duration) MessageOption

InitWithExpiryDuration initializes the message with the specified expiry duration

func InitWithID

func InitWithID(id string) MessageOption

InitWithID initializes the message with some ID

func InitWithMaxTries

func InitWithMaxTries(mt int) MessageOption

InitWithMaxTries initializes the message with custom max tries

type MessageStatus

type MessageStatus string

MessageStatus determines the state of the message

const (
	DefaultMaxTries = 3

	// additional details
	DetailsKeyRoutingMethod = "routing_method"

	MessageStatusEnqueued  MessageStatus = "enqueued"
	MessageStatusFailed    MessageStatus = "failed"
	MessageStatusPending   MessageStatus = "pending"
	MessageStatusPublished MessageStatus = "published"
)

func (MessageStatus) String added in v0.5.2

func (ms MessageStatus) String() string

type Notification

type Notification struct {
	ID                  string                 `json:"id"`
	Data                map[string]interface{} `json:"data"`
	Labels              map[string]string      `json:"labels"`
	ValidDurationString string                 `json:"valid_duration"`
	Template            string                 `json:"template"`
	CreatedAt           time.Time
}

Notification is a model of notification

func (Notification) ToMessage

func (n Notification) ToMessage(receiverType string, notificationConfigMap map[string]interface{}) (*Message, error)

ToMessage transforms Notification model to one or several Messages

type Notifier

type Notifier interface {
	PreHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]interface{}) (map[string]interface{}, error)
	PostHookQueueTransformConfigs(ctx context.Context, notificationConfigMap map[string]interface{}) (map[string]interface{}, error)
	GetSystemDefaultTemplate() string
	Send(ctx context.Context, message Message) (bool, error)
}

type Queuer

type Queuer interface {
	Enqueue(ctx context.Context, ms ...Message) error
	Dequeue(ctx context.Context, receiverTypes []string, batchSize int, handlerFn func(context.Context, []Message) error) error
	SuccessCallback(ctx context.Context, ms Message) error
	ErrorCallback(ctx context.Context, ms Message) error
	Type() string
	Cleanup(ctx context.Context, filter queues.FilterCleanup) error
	Stop(ctx context.Context) error
}

type ReceiverService

type ReceiverService interface {
	Get(ctx context.Context, id uint64, gopts ...receiver.GetOption) (*receiver.Receiver, error)
}

type RoutingMethod

type RoutingMethod string
const (
	RoutingMethodReceiver    RoutingMethod = "receiver"
	RoutingMethodSubscribers RoutingMethod = "subscribers"
)

func (RoutingMethod) String

func (rm RoutingMethod) String() string

type Service added in v0.5.6

type Service struct {
	// contains filtered or unexported fields
}

Service is a service for notification domain

func NewService

func NewService(
	logger log.Logger,
	q Queuer,
	idempotencyRepository IdempotencyRepository,
	receiverService ReceiverService,
	subscriptionService SubscriptionService,
	notifierPlugins map[string]Notifier,
) *Service

NewService creates a new notification service

func (*Service) CheckAndInsertIdempotency added in v0.5.6

func (s *Service) CheckAndInsertIdempotency(ctx context.Context, scope, key string) (uint64, error)

func (*Service) DispatchToReceiver added in v0.5.6

func (ns *Service) DispatchToReceiver(ctx context.Context, n Notification, receiverID uint64) error

func (*Service) DispatchToSubscribers added in v0.5.6

func (ns *Service) DispatchToSubscribers(ctx context.Context, namespaceID uint64, n Notification) error

func (*Service) MarkIdempotencyAsSuccess added in v0.5.6

func (s *Service) MarkIdempotencyAsSuccess(ctx context.Context, id uint64) error

func (*Service) RemoveIdempotencies added in v0.5.6

func (s *Service) RemoveIdempotencies(ctx context.Context, TTL time.Duration) error

type SubscriptionService

type SubscriptionService interface {
	MatchByLabels(ctx context.Context, namespaceID uint64, labels map[string]string) ([]subscription.Subscription, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL