pubsub

package
v0.0.0-...-949823d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2024 License: Apache-2.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Metadata = "metadata"
	Entries  = "entries"
)
View Source
const (
	APIVersionV1alpha1 = "dapr.io/v1alpha1"
	APIVersionV2alpha1 = "dapr.io/v2alpha1"

	MetadataKeyPubSub = "pubsubName"
)

Variables

View Source
var ErrBulkPublishFailure = errors.New("bulk publish failed")
View Source
var ErrMessageDropped = errors.New("pubsub message dropped") // TODO: remove this and use apierrors.PubSubMsgDropped

Functions

func ExtractCloudEventExtensions

func ExtractCloudEventExtensions(cloudEvent map[string]any) (*structpb.Struct, error)

func ExtractCloudEventProperty

func ExtractCloudEventProperty(cloudEvent map[string]any, property string) string

func FetchEntry

func FetchEntry(rawPayload bool, entry *contribpubsub.BulkMessageEntry, cloudEvent map[string]interface{}) (*runtimev1pb.TopicEventBulkRequestEntry, error)

func GRPCEnvelopeFromSubscriptionMessage

func GRPCEnvelopeFromSubscriptionMessage(ctx context.Context, msg *SubscribedMessage, log logger.Logger, tracingSpec *config.TracingSpec) (*runtimev1pb.TopicEventRequest, trace.Span, error)

func IsOperationAllowed

func IsOperationAllowed(topic string, pubSub *PubsubItem, scopedTopics []string) bool

func NewBulkSubscribeEnvelope

func NewBulkSubscribeEnvelope(req *BulkSubscribeEnvelope) map[string]interface{}

func NewCloudEvent

func NewCloudEvent(req *CloudEvent, metadata map[string]string) (map[string]interface{}, error)

NewCloudEvent encapsulates the creation of a Dapr cloudevent from an existing cloudevent or a raw payload.

func NewDefaultBulkPublisher

func NewDefaultBulkPublisher(p contribPubsub.PubSub) contribPubsub.BulkPublisher

NewDefaultBulkPublisher returns a new defaultBulkPublisher from a PubSub.

func NewDefaultBulkSubscriber

func NewDefaultBulkSubscriber(p contribPubsub.PubSub) *defaultBulkSubscriber

NewDefaultBulkSubscriber returns a new defaultBulkSubscriber from a PubSub.

func NewOutbox

func NewOutbox(opts OptionsOutbox) outbox.Outbox

NewOutbox returns an instance of an Outbox.

Types

type Adapter

Adapter is the interface for message buses.

type BulkSubscribe

type BulkSubscribe struct {
	Enabled            bool  `json:"enabled"`
	MaxMessagesCount   int32 `json:"maxMessagesCount,omitempty"`
	MaxAwaitDurationMs int32 `json:"maxAwaitDurationMs,omitempty"`
}

type BulkSubscribeEnvelope

type BulkSubscribeEnvelope struct {
	ID        string
	Entries   []BulkSubscribeMessageItem
	Metadata  map[string]string
	Topic     string
	Pubsub    string
	EventType string
}

type BulkSubscribeJSON

type BulkSubscribeJSON struct {
	Enabled            bool  `json:"enabled"`
	MaxMessagesCount   int32 `json:"maxMessagesCount,omitempty"`
	MaxAwaitDurationMs int32 `json:"maxAwaitDurationMs,omitempty"`
}

type BulkSubscribeMessageItem

type BulkSubscribeMessageItem struct {
	EntryId     string            `json:"entryId"` //nolint:stylecheck
	Event       interface{}       `json:"event"`
	Metadata    map[string]string `json:"metadata"`
	ContentType string            `json:"contentType,omitempty"`
}

type CloudEvent

type CloudEvent struct {
	ID              string `mapstructure:"cloudevent.id"`
	Data            []byte `mapstructure:"-"` // cannot be overridden
	Topic           string `mapstructure:"-"` // cannot be overridden
	Pubsub          string `mapstructure:"-"` // cannot be overridden
	DataContentType string `mapstructure:"-"` // cannot be overridden
	TraceID         string `mapstructure:"cloudevent.traceid"`
	TraceState      string `mapstructure:"cloudevent.tracestate"`
	Source          string `mapstructure:"cloudevent.source"`
	Type            string `mapstructure:"cloudevent.type"`
	TraceParent     string `mapstructure:"cloudevent.traceparent"`
}

CloudEvent is a request object to create a Dapr compliant cloudevent. The cloud event properties can manually be overwritten by using metadata beginning with "cloudevent." as prefix.

type Expr

type Expr interface {
	fmt.Stringer

	Eval(variables map[string]interface{}) (interface{}, error)
}

type NotAllowedError

type NotAllowedError struct {
	Topic string
	ID    string
}

pubsub.NotAllowedError is returned by the runtime when publishing is forbidden.

func (NotAllowedError) Error

func (e NotAllowedError) Error() string

type NotFoundError

type NotFoundError struct {
	PubsubName string
}

pubsub.NotFoundError is returned by the runtime when the pubsub does not exist.

func (NotFoundError) Error

func (e NotFoundError) Error() string

type OptionsOutbox

type OptionsOutbox struct {
	Publisher             Adapter
	GetPubsubFn           func(string) (contribPubsub.PubSub, bool)
	GetStateFn            func(string) (state.Store, bool)
	CloudEventExtractorFn func(map[string]any, string) string
	Namespace             string
}

type PubsubItem

type PubsubItem struct {
	Component           contribPubsub.PubSub
	ScopedSubscriptions []string
	ScopedPublishings   []string
	AllowedTopics       []string
	ProtectedTopics     []string
	NamespaceScoped     bool
}

PubsubItem is a pubsub component with its scoped subscriptions and publishings.

type RoutesJSON

type RoutesJSON struct {
	Rules   []*RuleJSON `json:"rules,omitempty"`
	Default string      `json:"default,omitempty"`
}

type Rule

type Rule struct {
	Match Expr   `json:"match"`
	Path  string `json:"path"`
}

func CreateRoutingRule

func CreateRoutingRule(match, path string) (*Rule, error)

type RuleJSON

type RuleJSON struct {
	Match string `json:"match"`
	Path  string `json:"path"`
}

type Streamer

type Streamer interface {
	Subscribe(rtv1pb.Dapr_SubscribeTopicEventsAlpha1Server) error
}

type SubscribedMessage

type SubscribedMessage struct {
	CloudEvent map[string]interface{}
	Data       []byte
	Topic      string
	Metadata   map[string]string
	Path       string
	PubSub     string
}

type Subscription

type Subscription struct {
	PubsubName      string            `json:"pubsubname"`
	Topic           string            `json:"topic"`
	DeadLetterTopic string            `json:"deadLetterTopic"`
	Metadata        map[string]string `json:"metadata"`
	Rules           []*Rule           `json:"rules,omitempty"`
	Scopes          []string          `json:"scopes"`
	BulkSubscribe   *BulkSubscribe    `json:"bulkSubscribe"`
}

func GetSubscriptionsHTTP

func GetSubscriptionsHTTP(ctx context.Context, channel channel.AppChannel, log logger.Logger, r resiliency.Provider) ([]Subscription, error)

type SubscriptionJSON

type SubscriptionJSON struct {
	PubsubName      string            `json:"pubsubname"`
	Topic           string            `json:"topic"`
	DeadLetterTopic string            `json:"deadLetterTopic"`
	Metadata        map[string]string `json:"metadata,omitempty"`
	Route           string            `json:"route"`  // Single route from v1alpha1
	Routes          RoutesJSON        `json:"routes"` // Multiple routes from v2alpha1
	BulkSubscribe   BulkSubscribeJSON `json:"bulkSubscribe,omitempty"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL