Documentation ¶
Overview ¶
Package actions provides handlers for all the operations that can be done on the message bus
Index ¶
- Variables
- func CancelAnySubModifiedAwaiter(c AnySubModifiedNotifier)
- func CancelAnyTopicModifiedAwaiter(c AnyTopicModifiedNotifier)
- func CancelPublishAwaiter(subID uuid.UUID, c PublishNotifier)
- func CancelSubModifiedAwaiter(subID uuid.UUID, subName string, c SubModifiedNotifier)
- func CancelTopicModifiedAwaiter(topicID uuid.UUID, topicName string, c TopicModifiedNotifier)
- func NextDelayFor(sub *ent.Subscription, attempts int) (nominalDelay, fuzzedDelay time.Duration)
- func NotifyModifySubscription(tx *ent.Tx, subscriptionID uuid.UUID, subscriptionName string)
- func NotifyModifyTopic(tx *ent.Tx, topicID uuid.UUID, topicName string)
- func RemovePublishHook(handle PublishHookHandle) bool
- func RemoveSubscriptionModifyHook(handle SubscriptionModifyHookHandle) bool
- func RemoveTopicModifyHook(handle TopicModifyHookHandle) bool
- func WakeAllInternal()
- func WakePublishListeners(onlyInternal bool, subIDs ...uuid.UUID)
- func WakeSubscriptionListeners(onlyInternal bool, subscriptionID uuid.UUID, subscriptionName string)
- func WakeTopicListeners(onlyInternal bool, topicID uuid.UUID, topicName string)
- type AckDeliveries
- type AckDeliveriesParams
- type Action
- type ActionData
- type AnySubModifiedNotifier
- type AnyTopicModifiedNotifier
- type CreateSnapshot
- type CreateSnapshotParams
- type CreateSubscription
- type CreateSubscriptionParams
- type CreateTopic
- type CreateTopicParams
- type DeadLetterDeliveries
- type DeadLetterDeliveriesParams
- type DeadLetterDeliveriesResults
- type DelayDeliveries
- type DelayDeliveriesParams
- type DeleteExpiredSubscriptions
- type DeleteSubscription
- type DeleteTopic
- type FlowControl
- type GetSubscriptionMessages
- type GetSubscriptionMessagesParams
- type HttpPushStreamer
- type MessageStreamRequest
- type MessageStreamer
- type NackDeliveries
- type NackDeliveriesParams
- type PruneCommonParams
- type PruneCommonResults
- type PruneCompletedDeliveries
- type PruneCompletedMessages
- type PruneDeletedSubscriptionDeliveries
- type PruneDeletedSubscriptions
- type PruneDeletedTopics
- type PruneExpiredDeliveries
- type PublishHookHandle
- type PublishMessage
- type PublishMessageParams
- type PublishNotifier
- type SeekSubscriptionToSnapshot
- type SeekSubscriptionToSnapshotParams
- type SeekSubscriptionToTime
- type SeekSubscriptionToTimeParams
- type StreamConnection
- type StreamConnectionBatchSend
- type SubModifiedNotifier
- type SubscriptionMessageDelivery
- type SubscriptionModifyHookHandle
- type TopicModifiedNotifier
- type TopicModifyHookHandle
Constants ¶
This section is empty.
Variables ¶
var DeleteExpiredSubscriptionsCounter, DeleteExpiredSubscriptionsHistogram = pruneMetrics("expired_subscriptions")
var ErrExists = errors.New("Already exists")
var ErrNotFound = errors.New("Not found")
Functions ¶
func CancelAnySubModifiedAwaiter ¶
func CancelAnySubModifiedAwaiter(c AnySubModifiedNotifier)
this has to take a two-way channel so that it can find it in the map
func CancelAnyTopicModifiedAwaiter ¶
func CancelAnyTopicModifiedAwaiter(c AnyTopicModifiedNotifier)
this has to take a two-way channel so that it can find it in the map
func CancelPublishAwaiter ¶
func CancelPublishAwaiter(subID uuid.UUID, c PublishNotifier)
this has to take a two-way channel so that it can find it in the map
func CancelSubModifiedAwaiter ¶
func CancelSubModifiedAwaiter(subID uuid.UUID, subName string, c SubModifiedNotifier)
this has to take a two-way channel so that it can find it in the map
func CancelTopicModifiedAwaiter ¶
func CancelTopicModifiedAwaiter(topicID uuid.UUID, topicName string, c TopicModifiedNotifier)
this has to take a two-way channel so that it can find it in the map
func NextDelayFor ¶
func NextDelayFor(sub *ent.Subscription, attempts int) (nominalDelay, fuzzedDelay time.Duration)
delay after N attempts = floor(max, min * factor^N), AKA after first attempt delay is min, after each further attempt delay *= factor, until delay hits max
func RemovePublishHook ¶
func RemovePublishHook(handle PublishHookHandle) bool
func RemoveSubscriptionModifyHook ¶
func RemoveSubscriptionModifyHook(handle SubscriptionModifyHookHandle) bool
func RemoveTopicModifyHook ¶
func RemoveTopicModifyHook(handle TopicModifyHookHandle) bool
func WakeAllInternal ¶ added in v0.5.16
func WakeAllInternal()
func WakePublishListeners ¶
WakePublishListeners notifies about publishes to the given subscription IDs
Types ¶
type AckDeliveries ¶
type AckDeliveries struct {
// contains filtered or unexported fields
}
func NewAckDeliveries ¶
func NewAckDeliveries( ids ...uuid.UUID, ) *AckDeliveries
func (*AckDeliveries) Parameters ¶
func (a *AckDeliveries) Parameters() P
type AckDeliveriesParams ¶
type AckDeliveriesParams struct {
// contains filtered or unexported fields
}
type ActionData ¶ added in v0.11.0
type AnySubModifiedNotifier ¶
type AnySubModifiedNotifier notifier
func AnySubModifiedAwaiter ¶
func AnySubModifiedAwaiter() AnySubModifiedNotifier
this has to return a two-way channel so that it can be passed to CancelAnySubModifiedAwaiter
type AnyTopicModifiedNotifier ¶
type AnyTopicModifiedNotifier notifier
func AnyTopicModifiedAwaiter ¶
func AnyTopicModifiedAwaiter() AnyTopicModifiedNotifier
this has to return a two-way channel so that it can be passed to CancelAnyTopicModifiedAwaiter
type CreateSnapshot ¶ added in v0.12.0
type CreateSnapshot struct {
// contains filtered or unexported fields
}
func NewCreateSnapshot ¶ added in v0.12.0
func NewCreateSnapshot(params CreateSnapshotParams) *CreateSnapshot
func (*CreateSnapshot) Parameters ¶ added in v0.12.0
func (a *CreateSnapshot) Parameters() P
type CreateSnapshotParams ¶ added in v0.12.0
type CreateSubscription ¶
type CreateSubscription struct {
// contains filtered or unexported fields
}
func NewCreateSubscription ¶
func NewCreateSubscription(params CreateSubscriptionParams) *CreateSubscription
func (*CreateSubscription) Parameters ¶
func (a *CreateSubscription) Parameters() P
type CreateSubscriptionParams ¶
type CreateSubscriptionParams struct { TopicName string Name string TTL time.Duration MessageTTL time.Duration OrderedDelivery bool Labels map[string]string PushEndpoint string MinBackoff, MaxBackoff time.Duration Filter string // these two normally should both be present, or not MaxDeliveryAttempts int32 DeadLetterTopic string }
type CreateTopic ¶
type CreateTopic struct {
// contains filtered or unexported fields
}
func NewCreateTopic ¶
func NewCreateTopic(params CreateTopicParams) *CreateTopic
func (*CreateTopic) Parameters ¶
func (a *CreateTopic) Parameters() P
type CreateTopicParams ¶
type DeadLetterDeliveries ¶
type DeadLetterDeliveries struct {
// contains filtered or unexported fields
}
func NewDeadLetterDeliveries ¶
func NewDeadLetterDeliveries(params DeadLetterDeliveriesParams) *DeadLetterDeliveries
func (*DeadLetterDeliveries) Parameters ¶
func (a *DeadLetterDeliveries) Parameters() P
type DeadLetterDeliveriesParams ¶
type DeadLetterDeliveriesParams struct {
MaxDeliveries int `json:"maxDeliveries"`
}
type DeadLetterDeliveriesResults ¶
type DeadLetterDeliveriesResults struct {
NumDeadLettered int
}
type DelayDeliveries ¶
type DelayDeliveries struct {
// contains filtered or unexported fields
}
func NewDelayDeliveries ¶
func NewDelayDeliveries(params DelayDeliveriesParams) *DelayDeliveries
func (*DelayDeliveries) Parameters ¶
func (a *DelayDeliveries) Parameters() P
type DelayDeliveriesParams ¶
type DeleteExpiredSubscriptions ¶
type DeleteExpiredSubscriptions struct {
// contains filtered or unexported fields
}
func NewDeleteExpiredSubscriptions ¶
func NewDeleteExpiredSubscriptions(params PruneCommonParams) *DeleteExpiredSubscriptions
type DeleteSubscription ¶
type DeleteSubscription struct {
// contains filtered or unexported fields
}
func NewDeleteSubscription ¶
func NewDeleteSubscription(name string) *DeleteSubscription
func (*DeleteSubscription) Parameters ¶
func (a *DeleteSubscription) Parameters() P
type DeleteTopic ¶
type DeleteTopic struct {
// contains filtered or unexported fields
}
func NewDeleteTopic ¶
func NewDeleteTopic(name string) *DeleteTopic
func (*DeleteTopic) Parameters ¶
func (a *DeleteTopic) Parameters() P
type FlowControl ¶
type GetSubscriptionMessages ¶
type GetSubscriptionMessages struct {
// contains filtered or unexported fields
}
func NewGetSubscriptionMessages ¶
func NewGetSubscriptionMessages(params GetSubscriptionMessagesParams) *GetSubscriptionMessages
func (*GetSubscriptionMessages) Execute
deprecated
Deprecated: use ExecuteClient instead.
Using the standard Execute method will cause excessive consumption of database connections, as the transaction will be held open until message(s) are received, or the call times out. This also simply will not work correctly with SQLite. Using ExecuteClient will avoid holding a connection open while waiting for messages, and will work properly with SQLite.
func (*GetSubscriptionMessages) ExecuteClient ¶
func (*GetSubscriptionMessages) Parameters ¶
func (a *GetSubscriptionMessages) Parameters() P
type HttpPushStreamer ¶
type HttpPushStreamer struct {
// contains filtered or unexported fields
}
func NewHttpPusher ¶
func (*HttpPushStreamer) CurrentFlowControl ¶
func (p *HttpPushStreamer) CurrentFlowControl() FlowControl
TODO: this is only exposed for acceptance testing, which is bad, refactor tests to not need this
func (*HttpPushStreamer) LogContexter ¶
func (p *HttpPushStreamer) LogContexter(c zerolog.Context) zerolog.Context
type MessageStreamRequest ¶
type MessageStreamRequest struct { FlowControl *FlowControl `json:"flowControl"` Ack []uuid.UUID `json:"ack"` Nack []uuid.UUID `json:"nack"` Delay []uuid.UUID `json:"delay"` // do this as a number to avoid issues & cross-platform/language // inconsistencies with duration serialization DelaySeconds float64 `json:"delaySeconds"` }
type MessageStreamer ¶
type MessageStreamer struct { Client *ent.Client Logger *logging.Logger SubscriptionID *uuid.UUID SubscriptionName string AutomaticNack bool }
func (*MessageStreamer) Go ¶
func (ms *MessageStreamer) Go(ctx context.Context, conn StreamConnection) error
type NackDeliveries ¶
type NackDeliveries struct {
// contains filtered or unexported fields
}
func NewNackDeliveries ¶
func NewNackDeliveries( ids ...uuid.UUID, ) *NackDeliveries
func (*NackDeliveries) Parameters ¶
func (a *NackDeliveries) Parameters() P
type NackDeliveriesParams ¶
type PruneCommonParams ¶
type PruneCommonParams struct { MinAge time.Duration `json:"minAge"` MaxDelete int `json:"maxDelete"` }
func (*PruneCommonParams) Validate ¶
func (p *PruneCommonParams) Validate() error
type PruneCommonResults ¶
type PruneCommonResults struct {
NumDeleted int `json:"numDeleted"`
}
type PruneCompletedDeliveries ¶
type PruneCompletedDeliveries struct {
// contains filtered or unexported fields
}
func NewPruneCompletedDeliveries ¶
func NewPruneCompletedDeliveries(params PruneCommonParams) *PruneCompletedDeliveries
type PruneCompletedMessages ¶
type PruneCompletedMessages struct {
// contains filtered or unexported fields
}
func NewPruneCompletedMessages ¶
func NewPruneCompletedMessages(params PruneCommonParams) *PruneCompletedMessages
type PruneDeletedSubscriptionDeliveries ¶
type PruneDeletedSubscriptionDeliveries struct {
// contains filtered or unexported fields
}
func NewPruneDeletedSubscriptionDeliveries ¶
func NewPruneDeletedSubscriptionDeliveries(params PruneCommonParams) *PruneDeletedSubscriptionDeliveries
type PruneDeletedSubscriptions ¶
type PruneDeletedSubscriptions struct {
// contains filtered or unexported fields
}
func NewPruneDeletedSubscriptions ¶
func NewPruneDeletedSubscriptions(params PruneCommonParams) *PruneDeletedSubscriptions
type PruneDeletedTopics ¶
type PruneDeletedTopics struct {
// contains filtered or unexported fields
}
func NewPruneDeletedTopics ¶
func NewPruneDeletedTopics(params PruneCommonParams) *PruneDeletedTopics
type PruneExpiredDeliveries ¶
type PruneExpiredDeliveries struct {
// contains filtered or unexported fields
}
func NewPruneExpiredDeliveries ¶
func NewPruneExpiredDeliveries(params PruneCommonParams) *PruneExpiredDeliveries
type PublishHookHandle ¶
type PublishHookHandle *publishHook
func AddPublishHook ¶
func AddPublishHook(hook publishHook) PublishHookHandle
type PublishMessage ¶
type PublishMessage struct {
// contains filtered or unexported fields
}
func NewPublishMessage ¶
func NewPublishMessage(params PublishMessageParams) *PublishMessage
func (*PublishMessage) Parameters ¶
func (a *PublishMessage) Parameters() P
type PublishMessageParams ¶
type PublishNotifier ¶
type PublishNotifier notifier
func PublishAwaiter ¶
func PublishAwaiter(subID uuid.UUID) PublishNotifier
this has to return a two-way channel so that it can be passed to cancelAwaiter
type SeekSubscriptionToSnapshot ¶ added in v0.12.0
type SeekSubscriptionToSnapshot struct {
// contains filtered or unexported fields
}
func NewSeekSubscriptionToSnapshot ¶ added in v0.12.0
func NewSeekSubscriptionToSnapshot(params SeekSubscriptionToSnapshotParams) *SeekSubscriptionToSnapshot
func (*SeekSubscriptionToSnapshot) Parameters ¶ added in v0.12.0
func (a *SeekSubscriptionToSnapshot) Parameters() P
type SeekSubscriptionToSnapshotParams ¶ added in v0.12.0
type SeekSubscriptionToTime ¶
type SeekSubscriptionToTime struct {
// contains filtered or unexported fields
}
func NewSeekSubscriptionToTime ¶
func NewSeekSubscriptionToTime(params SeekSubscriptionToTimeParams) *SeekSubscriptionToTime
func (*SeekSubscriptionToTime) Parameters ¶
func (a *SeekSubscriptionToTime) Parameters() P
type StreamConnection ¶
type StreamConnection interface { Close() error Receive(context.Context) (*MessageStreamRequest, error) Send(context.Context, *SubscriptionMessageDelivery) error }
type StreamConnectionBatchSend ¶
type StreamConnectionBatchSend interface {
SendBatch(context.Context, []*SubscriptionMessageDelivery) error
}
type SubModifiedNotifier ¶
type SubModifiedNotifier notifier
func SubModifiedAwaiter ¶
func SubModifiedAwaiter(subID uuid.UUID, subName string) SubModifiedNotifier
this has to return a two-way channel so that it can be passed to cancelSubModifiedAwaiter
type SubscriptionMessageDelivery ¶
type SubscriptionMessageDelivery struct { ID uuid.UUID `json:"id"` MessageID uuid.UUID `json:"messageID"` PublishedAt time.Time `json:"publishedAt"` // NumAttempts is approximate NumAttempts int `json:"numAttempts"` NextAttemptAt time.Time `json:"nextAttemptAt"` OrderKey *string `json:"orderKey,omitempty"` Payload json.RawMessage `json:"payload"` Attributes map[string]string `json:"attributes"` // contains filtered or unexported fields }
type SubscriptionModifyHookHandle ¶
type SubscriptionModifyHookHandle *modifyHook
func AddSubscriptionModifyHook ¶
func AddSubscriptionModifyHook(hook modifyHook) SubscriptionModifyHookHandle
type TopicModifiedNotifier ¶
type TopicModifiedNotifier notifier
func TopicModifiedAwaiter ¶
func TopicModifiedAwaiter(topicID uuid.UUID, topicName string) TopicModifiedNotifier
this has to return a two-way channel so that it can be passed to cancelTopicModifiedAwaiter
type TopicModifyHookHandle ¶
type TopicModifyHookHandle *modifyHook
func AddTopicModifyHook ¶
func AddTopicModifyHook(hook modifyHook) TopicModifyHookHandle
Source Files ¶
- ack-deliveries.go
- action-metrics.go
- action.go
- create-snapshot.go
- create-subscription.go
- create-topic.go
- dead-letter-deliveries.go
- delay-deliveries.go
- delete-expired-subscriptions.go
- delete-subscription.go
- delete-topic.go
- delivery-utils.go
- doc.go
- errors.go
- errors_cgo.go
- flow-control.go
- get-subscription-messages-metrics.go
- get-subscription-messages.go
- http-push-metrics.go
- http-push-streamer.go
- message-streamer-metrics.go
- message-streamer.go
- nack-deliveries.go
- notify.go
- prune-common.go
- prune-completed-deliveries.go
- prune-completed-messages.go
- prune-deleted-subscription-deliveries.go
- prune-deleted-subscriptions.go
- prune-deleted-topics.go
- prune-expired-deliveries.go
- publish-message-metrics.go
- publish-message.go
- seek-subscription-to-snapshot.go
- seek-subscription-to-time.go