pubsub_datasource

package
v2.0.0-rc.139 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: MIT Imports: 16 Imported by: 23

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Configuration

type Configuration struct {
	Events []EventConfiguration `json:"events"`
}

type EventConfiguration

type EventConfiguration struct {
	Metadata      *EventMetadata `json:"metadata"`
	Configuration any            `json:"configuration"`
}

type EventMetadata

type EventMetadata struct {
	ProviderID string    `json:"providerId"`
	Type       EventType `json:"type"`
	TypeName   string    `json:"typeName"`
	FieldName  string    `json:"fieldName"`
}

type EventType

type EventType string
const (
	EventTypePublish   EventType = "publish"
	EventTypeRequest   EventType = "request"
	EventTypeSubscribe EventType = "subscribe"
)

func EventTypeFromString

func EventTypeFromString(s string) (EventType, error)

type Factory

type Factory[T Configuration] struct {
	// contains filtered or unexported fields
}

func NewFactory

func NewFactory[T Configuration](executionContext context.Context, natsPubSubByProviderID map[string]NatsPubSub, kafkaPubSubByProviderID map[string]KafkaPubSub) *Factory[T]

func (*Factory[T]) Context

func (f *Factory[T]) Context() context.Context

func (*Factory[T]) Planner

func (f *Factory[T]) Planner(_ abstractlogger.Logger) plan.DataSourcePlanner[T]

func (*Factory[T]) UpstreamSchema

func (f *Factory[T]) UpstreamSchema(dataSourceConfig plan.DataSourceConfiguration[T]) (*ast.Document, bool)

type KafkaConnector

type KafkaConnector interface {
	New(ctx context.Context) KafkaPubSub
}

type KafkaEventConfiguration

type KafkaEventConfiguration struct {
	Topics []string `json:"topics"`
}

type KafkaEventManager

type KafkaEventManager struct {
	// contains filtered or unexported fields
}

type KafkaPubSub

type KafkaPubSub interface {
	// Subscribe starts listening on the given subjects and sends the received messages to the given next channel
	Subscribe(ctx context.Context, config KafkaSubscriptionEventConfiguration, updater resolve.SubscriptionUpdater) error
	// Publish sends the given data to the given subject
	Publish(ctx context.Context, config KafkaPublishEventConfiguration) error
}

KafkaPubSub describe the interface that implements the primitive operations for pubsub

type KafkaPublishDataSource

type KafkaPublishDataSource struct {
	// contains filtered or unexported fields
}

func (*KafkaPublishDataSource) Load

func (s *KafkaPublishDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error

func (*KafkaPublishDataSource) LoadWithFiles

func (s *KafkaPublishDataSource) LoadWithFiles(ctx context.Context, input []byte, files []httpclient.File, out *bytes.Buffer) (err error)

type KafkaPublishEventConfiguration

type KafkaPublishEventConfiguration struct {
	ProviderID string          `json:"providerId"`
	Topic      string          `json:"topic"`
	Data       json.RawMessage `json:"data"`
}

func (*KafkaPublishEventConfiguration) MarshalJSONTemplate

func (s *KafkaPublishEventConfiguration) MarshalJSONTemplate() string

type KafkaSubscriptionEventConfiguration

type KafkaSubscriptionEventConfiguration struct {
	ProviderID string   `json:"providerId"`
	Topics     []string `json:"topics"`
}

type KafkaSubscriptionSource

type KafkaSubscriptionSource struct {
	// contains filtered or unexported fields
}

func (*KafkaSubscriptionSource) Start

func (s *KafkaSubscriptionSource) Start(ctx *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error

func (*KafkaSubscriptionSource) UniqueRequestID

func (s *KafkaSubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte, xxh *xxhash.Digest) error

type NatsConnector

type NatsConnector interface {
	New(ctx context.Context) NatsPubSub
}

type NatsEventConfiguration

type NatsEventConfiguration struct {
	StreamConfiguration *NatsStreamConfiguration `json:"streamConfiguration,omitempty"`
	Subjects            []string                 `json:"subjects"`
}

type NatsEventManager

type NatsEventManager struct {
	// contains filtered or unexported fields
}

type NatsPubSub

type NatsPubSub interface {
	// Subscribe starts listening on the given subjects and sends the received messages to the given next channel
	Subscribe(ctx context.Context, event NatsSubscriptionEventConfiguration, updater resolve.SubscriptionUpdater) error
	// Publish sends the given data to the given subject
	Publish(ctx context.Context, event NatsPublishAndRequestEventConfiguration) error
	// Request sends a request on the given subject and writes the response to the given writer
	Request(ctx context.Context, event NatsPublishAndRequestEventConfiguration, w io.Writer) error
}

NatsPubSub describe the interface that implements the primitive operations for pubsub

type NatsPublishAndRequestEventConfiguration

type NatsPublishAndRequestEventConfiguration struct {
	ProviderID string          `json:"providerId"`
	Subject    string          `json:"subject"`
	Data       json.RawMessage `json:"data"`
}

func (*NatsPublishAndRequestEventConfiguration) MarshalJSONTemplate

func (s *NatsPublishAndRequestEventConfiguration) MarshalJSONTemplate() string

type NatsPublishDataSource

type NatsPublishDataSource struct {
	// contains filtered or unexported fields
}

func (*NatsPublishDataSource) Load

func (s *NatsPublishDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error

func (*NatsPublishDataSource) LoadWithFiles

func (s *NatsPublishDataSource) LoadWithFiles(ctx context.Context, input []byte, files []httpclient.File, out *bytes.Buffer) error

type NatsRequestDataSource

type NatsRequestDataSource struct {
	// contains filtered or unexported fields
}

func (*NatsRequestDataSource) Load

func (s *NatsRequestDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error

func (*NatsRequestDataSource) LoadWithFiles

func (s *NatsRequestDataSource) LoadWithFiles(ctx context.Context, input []byte, files []httpclient.File, out *bytes.Buffer) error

type NatsStreamConfiguration

type NatsStreamConfiguration struct {
	Consumer                  string `json:"consumer"`
	ConsumerInactiveThreshold int32  `json:"consumerInactiveThreshold"`
	StreamName                string `json:"streamName"`
}

type NatsSubscriptionEventConfiguration

type NatsSubscriptionEventConfiguration struct {
	ProviderID          string                   `json:"providerId"`
	Subjects            []string                 `json:"subjects"`
	StreamConfiguration *NatsStreamConfiguration `json:"streamConfiguration,omitempty"`
}

type NatsSubscriptionSource

type NatsSubscriptionSource struct {
	// contains filtered or unexported fields
}

func (*NatsSubscriptionSource) Start

func (s *NatsSubscriptionSource) Start(ctx *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error

func (*NatsSubscriptionSource) UniqueRequestID

func (s *NatsSubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte, xxh *xxhash.Digest) error

type Planner

type Planner[T Configuration] struct {
	// contains filtered or unexported fields
}

func (*Planner[T]) ConfigureFetch

func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration

func (*Planner[T]) ConfigureSubscription

func (p *Planner[T]) ConfigureSubscription() plan.SubscriptionConfiguration

func (*Planner[T]) DataSourcePlanningBehavior

func (p *Planner[T]) DataSourcePlanningBehavior() plan.DataSourcePlanningBehavior

func (*Planner[T]) DownstreamResponseFieldAlias

func (p *Planner[T]) DownstreamResponseFieldAlias(_ int) (alias string, exists bool)

func (*Planner[T]) EnterDocument

func (p *Planner[T]) EnterDocument(_, _ *ast.Document)

func (*Planner[T]) EnterField

func (p *Planner[T]) EnterField(ref int)

func (*Planner[T]) ID

func (p *Planner[T]) ID() (id int)

func (*Planner[T]) Register

func (p *Planner[T]) Register(visitor *plan.Visitor, configuration plan.DataSourceConfiguration[T], dataSourcePlannerConfiguration plan.DataSourcePlannerConfiguration) error

func (*Planner[T]) SetID

func (p *Planner[T]) SetID(id int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL