actions

package
v0.13.50 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2024 License: MIT Imports: 39 Imported by: 0

Documentation

Overview

Package actions provides handlers for all the operations that can be done on the message bus

Index

Constants

This section is empty.

Variables

View Source
var DeleteExpiredSubscriptionsCounter, DeleteExpiredSubscriptionsHistogram = pruneMetrics("expired_subscriptions")
View Source
var ErrExists = errors.New("Already exists")
View Source
var ErrNotFound = errors.New("Not found")

Functions

func CancelAnySubModifiedAwaiter

func CancelAnySubModifiedAwaiter(c AnySubModifiedNotifier)

this has to take a two-way channel so that it can find it in the map

func CancelAnyTopicModifiedAwaiter

func CancelAnyTopicModifiedAwaiter(c AnyTopicModifiedNotifier)

this has to take a two-way channel so that it can find it in the map

func CancelPublishAwaiter

func CancelPublishAwaiter(subID uuid.UUID, c PublishNotifier)

this has to take a two-way channel so that it can find it in the map

func CancelSubModifiedAwaiter

func CancelSubModifiedAwaiter(subID uuid.UUID, subName string, c SubModifiedNotifier)

this has to take a two-way channel so that it can find it in the map

func CancelTopicModifiedAwaiter

func CancelTopicModifiedAwaiter(topicID uuid.UUID, topicName string, c TopicModifiedNotifier)

this has to take a two-way channel so that it can find it in the map

func NextDelayFor

func NextDelayFor(sub *ent.Subscription, attempts int) (nominalDelay, fuzzedDelay time.Duration)

delay after N attempts = floor(max, min * factor^N), AKA after first attempt delay is min, after each further attempt delay *= factor, until delay hits max

func NotifyModifySubscription

func NotifyModifySubscription(tx *ent.Tx, subscriptionID uuid.UUID, subscriptionName string)

func NotifyModifyTopic

func NotifyModifyTopic(tx *ent.Tx, topicID uuid.UUID, topicName string)

func RemovePublishHook

func RemovePublishHook(handle PublishHookHandle) bool

func RemoveSubscriptionModifyHook

func RemoveSubscriptionModifyHook(handle SubscriptionModifyHookHandle) bool

func RemoveTopicModifyHook

func RemoveTopicModifyHook(handle TopicModifyHookHandle) bool

func WakeAllInternal added in v0.5.16

func WakeAllInternal()

func WakePublishListeners

func WakePublishListeners(onlyInternal bool, subIDs ...uuid.UUID)

WakePublishListeners notifies about publishes to the given subscription IDs

func WakeSubscriptionListeners

func WakeSubscriptionListeners(
	onlyInternal bool,
	subscriptionID uuid.UUID,
	subscriptionName string,
)

func WakeTopicListeners

func WakeTopicListeners(onlyInternal bool, topicID uuid.UUID, topicName string)

Types

type AckDeliveries

type AckDeliveries struct {
	// contains filtered or unexported fields
}

func NewAckDeliveries

func NewAckDeliveries(
	ids ...uuid.UUID,
) *AckDeliveries

func (*AckDeliveries) Execute

func (a *AckDeliveries) Execute(ctx context.Context, tx *ent.Tx) error

func (*AckDeliveries) Parameters

func (a *AckDeliveries) Parameters() P

func (*AckDeliveries) Results

func (a *AckDeliveries) Results() (results R, ok bool)

type AckDeliveriesParams

type AckDeliveriesParams struct {
	// contains filtered or unexported fields
}

type Action

type Action[P any, R any] interface {
	ActionData[P, R]
	Execute(context.Context, *ent.Tx) error
}

type ActionData added in v0.11.0

type ActionData[P any, R any] interface {
	Parameters() P
	Results() (results R, ok bool)
}

type AnySubModifiedNotifier

type AnySubModifiedNotifier notifier

func AnySubModifiedAwaiter

func AnySubModifiedAwaiter() AnySubModifiedNotifier

this has to return a two-way channel so that it can be passed to CancelAnySubModifiedAwaiter

type AnyTopicModifiedNotifier

type AnyTopicModifiedNotifier notifier

func AnyTopicModifiedAwaiter

func AnyTopicModifiedAwaiter() AnyTopicModifiedNotifier

this has to return a two-way channel so that it can be passed to CancelAnyTopicModifiedAwaiter

type CreateSnapshot added in v0.12.0

type CreateSnapshot struct {
	// contains filtered or unexported fields
}

func NewCreateSnapshot added in v0.12.0

func NewCreateSnapshot(params CreateSnapshotParams) *CreateSnapshot

func (*CreateSnapshot) Execute added in v0.12.0

func (a *CreateSnapshot) Execute(ctx context.Context, tx *ent.Tx) error

func (*CreateSnapshot) Parameters added in v0.12.0

func (a *CreateSnapshot) Parameters() P

func (*CreateSnapshot) Results added in v0.12.0

func (a *CreateSnapshot) Results() (results R, ok bool)

type CreateSnapshotParams added in v0.12.0

type CreateSnapshotParams struct {
	SubscriptionName string
	Name             string
	Labels           map[string]string
}

type CreateSubscription

type CreateSubscription struct {
	// contains filtered or unexported fields
}

func NewCreateSubscription

func NewCreateSubscription(params CreateSubscriptionParams) *CreateSubscription

func (*CreateSubscription) Execute

func (a *CreateSubscription) Execute(ctx context.Context, tx *ent.Tx) error

func (*CreateSubscription) Parameters

func (a *CreateSubscription) Parameters() P

func (*CreateSubscription) Results

func (a *CreateSubscription) Results() (results R, ok bool)

type CreateSubscriptionParams

type CreateSubscriptionParams struct {
	TopicName              string
	Name                   string
	TTL                    time.Duration
	MessageTTL             time.Duration
	OrderedDelivery        bool
	Labels                 map[string]string
	PushEndpoint           string
	MinBackoff, MaxBackoff time.Duration
	Filter                 string

	// these two normally should both be present, or not
	MaxDeliveryAttempts int32
	DeadLetterTopic     string
}

type CreateTopic

type CreateTopic struct {
	// contains filtered or unexported fields
}

func NewCreateTopic

func NewCreateTopic(params CreateTopicParams) *CreateTopic

func (*CreateTopic) Execute

func (a *CreateTopic) Execute(ctx context.Context, tx *ent.Tx) error

func (*CreateTopic) Parameters

func (a *CreateTopic) Parameters() P

func (*CreateTopic) Results

func (a *CreateTopic) Results() (results R, ok bool)

type CreateTopicParams

type CreateTopicParams struct {
	Name   string
	Labels map[string]string
}

type DeadLetterDeliveries

type DeadLetterDeliveries struct {
	// contains filtered or unexported fields
}

func NewDeadLetterDeliveries

func NewDeadLetterDeliveries(params DeadLetterDeliveriesParams) *DeadLetterDeliveries

func (*DeadLetterDeliveries) Execute

func (a *DeadLetterDeliveries) Execute(ctx context.Context, tx *ent.Tx) error

func (*DeadLetterDeliveries) Parameters

func (a *DeadLetterDeliveries) Parameters() P

func (*DeadLetterDeliveries) Results

func (a *DeadLetterDeliveries) Results() (results R, ok bool)

type DeadLetterDeliveriesParams

type DeadLetterDeliveriesParams struct {
	MaxDeliveries int `json:"maxDeliveries"`
}

type DeadLetterDeliveriesResults

type DeadLetterDeliveriesResults struct {
	NumDeadLettered int
}

type DelayDeliveries

type DelayDeliveries struct {
	// contains filtered or unexported fields
}

func NewDelayDeliveries

func NewDelayDeliveries(params DelayDeliveriesParams) *DelayDeliveries

func (*DelayDeliveries) Execute

func (a *DelayDeliveries) Execute(ctx context.Context, tx *ent.Tx) error

func (*DelayDeliveries) Parameters

func (a *DelayDeliveries) Parameters() P

func (*DelayDeliveries) Results

func (a *DelayDeliveries) Results() (results R, ok bool)

type DelayDeliveriesParams

type DelayDeliveriesParams struct {
	IDs   []uuid.UUID
	Delay time.Duration
}

type DeleteExpiredSubscriptions

type DeleteExpiredSubscriptions struct {
	// contains filtered or unexported fields
}

func NewDeleteExpiredSubscriptions

func NewDeleteExpiredSubscriptions(params PruneCommonParams) *DeleteExpiredSubscriptions

func (*DeleteExpiredSubscriptions) Execute

func (a *DeleteExpiredSubscriptions) Execute(ctx context.Context, tx *ent.Tx) error

type DeleteSubscription

type DeleteSubscription struct {
	// contains filtered or unexported fields
}

func NewDeleteSubscription

func NewDeleteSubscription(name string) *DeleteSubscription

func (*DeleteSubscription) Execute

func (a *DeleteSubscription) Execute(ctx context.Context, tx *ent.Tx) error

func (*DeleteSubscription) Parameters

func (a *DeleteSubscription) Parameters() P

func (*DeleteSubscription) Results

func (a *DeleteSubscription) Results() (results R, ok bool)

type DeleteTopic

type DeleteTopic struct {
	// contains filtered or unexported fields
}

func NewDeleteTopic

func NewDeleteTopic(name string) *DeleteTopic

func (*DeleteTopic) Execute

func (a *DeleteTopic) Execute(ctx context.Context, tx *ent.Tx) error

func (*DeleteTopic) Parameters

func (a *DeleteTopic) Parameters() P

func (*DeleteTopic) Results

func (a *DeleteTopic) Results() (results R, ok bool)

type FlowControl

type FlowControl struct {
	MaxMessages int `json:"maxMessages" form:"maxMessages,default=1" binding:"gte=1"`
	MaxBytes    int `json:"maxBytes" form:"maxBytes,default=1" binding:"gte=1"`
}

type GetSubscriptionMessages

type GetSubscriptionMessages struct {
	// contains filtered or unexported fields
}

func (*GetSubscriptionMessages) Execute deprecated

func (a *GetSubscriptionMessages) Execute(ctx context.Context, tx *ent.Tx) error

Deprecated: use ExecuteClient instead.

Using the standard Execute method will cause excessive consumption of database connections, as the transaction will be held open until message(s) are received, or the call times out. This also simply will not work correctly with SQLite. Using ExecuteClient will avoid holding a connection open while waiting for messages, and will work properly with SQLite.

func (*GetSubscriptionMessages) ExecuteClient

func (a *GetSubscriptionMessages) ExecuteClient(ctx context.Context, client *ent.Client) error

func (*GetSubscriptionMessages) Parameters

func (a *GetSubscriptionMessages) Parameters() P

func (*GetSubscriptionMessages) Results

func (a *GetSubscriptionMessages) Results() (results R, ok bool)

type GetSubscriptionMessagesParams

type GetSubscriptionMessagesParams struct {
	Name           string
	ID             *uuid.UUID
	MaxMessages    int
	MaxBytes       int
	MaxBytesStrict bool
	MaxWait        time.Duration
	// Waiting, if non-nil, will be closed when the action first goes into a delay
	Waiting chan<- struct{} `json:"-"`
}

type HttpPushStreamer

type HttpPushStreamer struct {
	// contains filtered or unexported fields
}

func NewHttpPusher

func NewHttpPusher(
	subscriptionName string,
	subscriptionID uuid.UUID,
	endpoint string,
	httpClient *http.Client,
	entClient *ent.Client,
) *HttpPushStreamer

func (*HttpPushStreamer) CurrentFlowControl

func (p *HttpPushStreamer) CurrentFlowControl() FlowControl

TODO: this is only exposed for acceptance testing, which is bad, refactor tests to not need this

func (*HttpPushStreamer) Go

func (*HttpPushStreamer) LogContexter

func (p *HttpPushStreamer) LogContexter(c zerolog.Context) zerolog.Context

type MessageStreamRequest

type MessageStreamRequest struct {
	FlowControl *FlowControl `json:"flowControl"`
	Ack         []uuid.UUID  `json:"ack"`
	Nack        []uuid.UUID  `json:"nack"`
	Delay       []uuid.UUID  `json:"delay"`
	// do this as a number to avoid issues & cross-platform/language
	// inconsistencies with duration serialization
	DelaySeconds float64 `json:"delaySeconds"`
}

type MessageStreamer

type MessageStreamer struct {
	Client           *ent.Client
	Logger           *logging.Logger
	SubscriptionID   *uuid.UUID
	SubscriptionName string
	AutomaticNack    bool
}

func (*MessageStreamer) Go

type NackDeliveries

type NackDeliveries struct {
	// contains filtered or unexported fields
}

func NewNackDeliveries

func NewNackDeliveries(
	ids ...uuid.UUID,
) *NackDeliveries

func (*NackDeliveries) Execute

func (a *NackDeliveries) Execute(ctx context.Context, tx *ent.Tx) error

func (*NackDeliveries) Parameters

func (a *NackDeliveries) Parameters() P

func (*NackDeliveries) Results

func (a *NackDeliveries) Results() (results R, ok bool)

type NackDeliveriesParams

type NackDeliveriesParams struct {
	IDs []uuid.UUID
}

type PruneCommonParams

type PruneCommonParams struct {
	MinAge    time.Duration `json:"minAge"`
	MaxDelete int           `json:"maxDelete"`
}

func (*PruneCommonParams) Validate

func (p *PruneCommonParams) Validate() error

type PruneCommonResults

type PruneCommonResults struct {
	NumDeleted int `json:"numDeleted"`
}

type PruneCompletedDeliveries

type PruneCompletedDeliveries struct {
	// contains filtered or unexported fields
}

func NewPruneCompletedDeliveries

func NewPruneCompletedDeliveries(params PruneCommonParams) *PruneCompletedDeliveries

func (*PruneCompletedDeliveries) Execute

func (pcd *PruneCompletedDeliveries) Execute(ctx context.Context, tx *ent.Tx) error

type PruneCompletedMessages

type PruneCompletedMessages struct {
	// contains filtered or unexported fields
}

func NewPruneCompletedMessages

func NewPruneCompletedMessages(params PruneCommonParams) *PruneCompletedMessages

func (*PruneCompletedMessages) Execute

func (a *PruneCompletedMessages) Execute(ctx context.Context, tx *ent.Tx) error

type PruneDeletedSubscriptionDeliveries

type PruneDeletedSubscriptionDeliveries struct {
	// contains filtered or unexported fields
}

func NewPruneDeletedSubscriptionDeliveries

func NewPruneDeletedSubscriptionDeliveries(params PruneCommonParams) *PruneDeletedSubscriptionDeliveries

func (*PruneDeletedSubscriptionDeliveries) Execute

type PruneDeletedSubscriptions

type PruneDeletedSubscriptions struct {
	// contains filtered or unexported fields
}

func NewPruneDeletedSubscriptions

func NewPruneDeletedSubscriptions(params PruneCommonParams) *PruneDeletedSubscriptions

func (*PruneDeletedSubscriptions) Execute

func (a *PruneDeletedSubscriptions) Execute(ctx context.Context, tx *ent.Tx) error

type PruneDeletedTopics

type PruneDeletedTopics struct {
	// contains filtered or unexported fields
}

func NewPruneDeletedTopics

func NewPruneDeletedTopics(params PruneCommonParams) *PruneDeletedTopics

func (*PruneDeletedTopics) Execute

func (a *PruneDeletedTopics) Execute(ctx context.Context, tx *ent.Tx) error

type PruneExpiredDeliveries

type PruneExpiredDeliveries struct {
	// contains filtered or unexported fields
}

func NewPruneExpiredDeliveries

func NewPruneExpiredDeliveries(params PruneCommonParams) *PruneExpiredDeliveries

func (*PruneExpiredDeliveries) Execute

func (a *PruneExpiredDeliveries) Execute(ctx context.Context, tx *ent.Tx) error

type PublishHookHandle

type PublishHookHandle *publishHook

func AddPublishHook

func AddPublishHook(hook publishHook) PublishHookHandle

type PublishMessage

type PublishMessage struct {
	// contains filtered or unexported fields
}

func NewPublishMessage

func NewPublishMessage(params PublishMessageParams) *PublishMessage

func (*PublishMessage) Execute

func (a *PublishMessage) Execute(ctx context.Context, tx *ent.Tx) error

func (*PublishMessage) Parameters

func (a *PublishMessage) Parameters() P

func (*PublishMessage) Results

func (a *PublishMessage) Results() (results R, ok bool)

type PublishMessageParams

type PublishMessageParams struct {
	TopicName  string
	TopicID    *uuid.UUID
	Payload    json.RawMessage
	Attributes map[string]string
	OrderKey   string
}

type PublishNotifier

type PublishNotifier notifier

func PublishAwaiter

func PublishAwaiter(subID uuid.UUID) PublishNotifier

this has to return a two-way channel so that it can be passed to cancelAwaiter

type PushMessage added in v0.13.0

type PushMessage = pubsubv1.PubsubMessage

type PushRequest added in v0.13.0

type PushRequest struct {
	Message         PushMessage `json:"message"`
	Subscription    string      `json:"subscription"`
	DeliveryAttempt int         `json:"deliveryAttempt"`
}

PushRequest represents the type of the POST payload for a push subscription operating in Wrapped mode.

Surprisingly, google doesn't export a type for this.

See: https://cloud.google.com/pubsub/docs/push#receive_push

type SeekSubscriptionToSnapshot added in v0.12.0

type SeekSubscriptionToSnapshot struct {
	// contains filtered or unexported fields
}

func NewSeekSubscriptionToSnapshot added in v0.12.0

func NewSeekSubscriptionToSnapshot(params SeekSubscriptionToSnapshotParams) *SeekSubscriptionToSnapshot

func (*SeekSubscriptionToSnapshot) Execute added in v0.12.0

func (a *SeekSubscriptionToSnapshot) Execute(ctx context.Context, tx *ent.Tx) error

func (*SeekSubscriptionToSnapshot) Parameters added in v0.12.0

func (a *SeekSubscriptionToSnapshot) Parameters() P

func (*SeekSubscriptionToSnapshot) Results added in v0.12.0

func (a *SeekSubscriptionToSnapshot) Results() (results R, ok bool)

type SeekSubscriptionToSnapshotParams added in v0.12.0

type SeekSubscriptionToSnapshotParams struct {
	SubscriptionName string
	SubscriptionID   *uuid.UUID
	SnapshotName     string
	SnapshotID       *uuid.UUID
}

type SeekSubscriptionToTime

type SeekSubscriptionToTime struct {
	// contains filtered or unexported fields
}

func (*SeekSubscriptionToTime) Execute

func (a *SeekSubscriptionToTime) Execute(ctx context.Context, tx *ent.Tx) error

func (*SeekSubscriptionToTime) Parameters

func (a *SeekSubscriptionToTime) Parameters() P

func (*SeekSubscriptionToTime) Results

func (a *SeekSubscriptionToTime) Results() (results R, ok bool)

type SeekSubscriptionToTimeParams

type SeekSubscriptionToTimeParams struct {
	Name string
	ID   *uuid.UUID
	Time time.Time
}

type StreamConnection

type StreamConnection interface {
	Close() error
	Receive(context.Context) (*MessageStreamRequest, error)
	Send(context.Context, *SubscriptionMessageDelivery) error
}

type StreamConnectionBatchSend

type StreamConnectionBatchSend interface {
	SendBatch(context.Context, []*SubscriptionMessageDelivery) error
}

type SubModifiedNotifier

type SubModifiedNotifier notifier

func SubModifiedAwaiter

func SubModifiedAwaiter(subID uuid.UUID, subName string) SubModifiedNotifier

this has to return a two-way channel so that it can be passed to cancelSubModifiedAwaiter

type SubscriptionMessageDelivery

type SubscriptionMessageDelivery struct {
	ID          uuid.UUID `json:"id"`
	MessageID   uuid.UUID `json:"messageID"`
	PublishedAt time.Time `json:"publishedAt"`
	// NumAttempts is approximate
	NumAttempts   int       `json:"numAttempts"`
	NextAttemptAt time.Time `json:"nextAttemptAt"`

	OrderKey   *string           `json:"orderKey,omitempty"`
	Payload    json.RawMessage   `json:"payload"`
	Attributes map[string]string `json:"attributes"`
	// contains filtered or unexported fields
}

type SubscriptionModifyHookHandle

type SubscriptionModifyHookHandle *modifyHook

func AddSubscriptionModifyHook

func AddSubscriptionModifyHook(hook modifyHook) SubscriptionModifyHookHandle

type TopicModifiedNotifier

type TopicModifiedNotifier notifier

func TopicModifiedAwaiter

func TopicModifiedAwaiter(topicID uuid.UUID, topicName string) TopicModifiedNotifier

this has to return a two-way channel so that it can be passed to cancelTopicModifiedAwaiter

type TopicModifyHookHandle

type TopicModifyHookHandle *modifyHook

func AddTopicModifyHook

func AddTopicModifyHook(hook modifyHook) TopicModifyHookHandle

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL