gpubsub

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SubTypeShared = "shared"
	SubTypeUnique = "unique"

	ALREADY_EXISTS = 409 // Defined here due to lack of proper other place in GCP libs
)

Variables

View Source
var (
	ErrClienNotProvided      = errors.New("a client must be provided")
	ErrStreamSpecNotProvided = errors.New("the stream spec must be provided")
	ErrTopicNotProvided      = errors.New("a topic name is required")
	ErrSubNotProvided        = errors.New("a valid subscription must be provided")
)

Functions

func NewExtractorFactory

func NewExtractorFactory(ctx context.Context, config PubsubConfig) (entity.ExtractorFactory, error)

NewExtractorFactory creates a Pubsub extractory factory.

Types

type DefaultSubConfigurator

type DefaultSubConfigurator struct{}

type MsgAckFunc

type MsgAckFunc func(*pubsub.Message)

type PubsubClient

type PubsubClient interface {
	Topic(id string) *pubsub.Topic
	CreateSubscription(ctx context.Context, id string, cfg pubsub.SubscriptionConfig) (*pubsub.Subscription, error)
	Subscription(id string) *pubsub.Subscription
}

type PubsubConfig

type PubsubConfig struct {

	// ProjectId (required) specifies GCP project ID for this deployment.
	ProjectId string

	// Env is only required to be filled in if stream specs for this use of Geist are using different
	// topic specs for different environments, typically "dev", "stage", and "prod".
	// Any string is allowed as long as it matches the ones used in the stream specs.
	Env string

	// The following fields (optional) sets the default values, if not specified in the stream spec.
	// See entity.Spec for more info.
	MaxOutstandingMessages int
	MaxOutstandingBytes    int
}

PubsubConfig is the external config provided by the geist client to the factory when starting up, which is to be used during stream creations

TODO: Check if non-nil default values should be added to MaxOutstandingXxx

type SourceConfig added in v0.4.0

type SourceConfig struct {

	// Topics and Subscription are required for extraction/consumption from PubSub
	Topics       []Topics            `json:"topics,omitempty"`
	Subscription *SubscriptionConfig `json:"subscription,omitempty"`

	// MaxOutstandingMessages is a PubSub consumer specific property, specifying max number of fetched but not yet
	// acknowledged messages in pubsub consumer. If this is omitted the value will be set to the loaded Pubsub entity
	// config default.
	// For time consuming transform/sink streams decrease this value while increasing ops.streamsPerPod
	MaxOutstandingMessages *int `json:"maxOutstandingMessages,omitempty"`

	// MaxOutstandingBytes is a PubSub consumer specific property, specifying max size of fetched but not yet
	// acknowledged messages.
	MaxOutstandingBytes *int `json:"maxOutstandingBytes,omitempty"`

	// Synchronous can be used to tune certain type of streams (e.g. spiky input flow of messages with very heavy
	// transforms or slow sinks), where setting this to true could reduce number of expired messages. It is optional
	// for a source connector to implement.
	// Default is false.
	Synchronous *bool `json:"synchronous,omitempty"`

	// NumGoroutines is a PubSub consumer specific property used for increasing rate of incoming messages in case
	// downstream ETL is not cpu starved or blocked on sink ops, while Extractor cannot keep up with consuming
	// incoming messages. Depending on type of Sink/Loader a better/alternative approach is to increase ops.streamsPerPod.
	// If omitted it is set to 1.
	NumGoroutines *int `json:"numGoroutines,omitempty"`
}

SourceConfig specifies the schema for the "customConfig" field in the "source" section of the stream spec. It enables arbitrary connector specific fields to be present in the stream spec.

func NewSourceConfig added in v0.4.0

func NewSourceConfig(spec *entity.Spec) (sc SourceConfig, err error)

type SubConfigurator

type SubConfigurator interface {
	Update(sub Subscription, rs pubsub.ReceiveSettings)
}

type Subscription

type Subscription interface {
	Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error
	String() string
	Delete(ctx context.Context) error
}

type SubscriptionConfig added in v0.4.0

type SubscriptionConfig struct {
	// Type can be:
	//
	// 		"shared" - meaning multiple consumers share this subscription in a competing consumer pattern.
	//				   Only one of the subscribers will receive each event.
	//				   If this is set, the name of the subscription needs to be present in the "Name" field.
	//
	//		"unique" - meaning each transloading stream instance will have its own unique subscription.
	//				   All instances will thus get all events from the topic.
	//				   If this is set, a unique subscription name will be created and the Name field is
	//				   ignored. This one is used internally by each pod's Supervisor to receive notifications
	//                 about registry updates, from other Supervisors' registry instances.
	Type string `json:"type,omitempty"`

	// Name of subscription
	Name string `json:"name,omitempty"`
}

type Topic

type Topic interface {
	Publish(ctx context.Context, msg *pubsub.Message) *pubsub.PublishResult
}

type Topics added in v0.4.0

type Topics struct {
	// Env specifies for which environment/stage the topic names config should be used.
	// Allowed values are "all" or any string matching the config provided to registered entity factories.
	// Examples: "dev", "staging", and "prod", etc.
	Env   string   `json:"env,omitempty"`
	Names []string `json:"names,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL