handler

package
v0.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2020 License: Apache-2.0 Imports: 27 Imported by: 3

Documentation

Index

Constants

View Source
const (
	// DefaultHealthCheckPort is the default port for checking sync pool health.
	DefaultHealthCheckPort = 8080
)

Variables

View Source
var (
	DefaultCEClientOpts = []ceclient.Option{
		ceclient.WithUUIDs(),
		ceclient.WithTimeNow(),
		ceclient.WithTracePropagation(),
	}

	DefaultHTTPClient = &http.Client{
		Transport: &ochttp.Transport{
			Base: &http.Transport{
				MaxIdleConns:        1000,
				MaxIdleConnsPerHost: 500,
				MaxConnsPerHost:     500,
				IdleConnTimeout:     30 * time.Second,
			},
			Propagation: &tracecontext.HTTPFormat{},
		},
	}

	// ProviderSet provides the fanout and retry sync pools using the default client options. In
	// order to inject either pool, ProjectID, []Option, and config.ReadOnlyTargets must be
	// externally provided.
	ProviderSet = wire.NewSet(
		NewFanoutPool,
		NewRetryPool,
		clients.NewPubsubClient,
		NewRetryClient,
		wire.Value(DefaultHTTPClient),
		wire.Value(DefaultCEClientOpts),
	)
)

Functions

This section is empty.

Types

type FanoutPool added in v0.16.0

type FanoutPool struct {
	// contains filtered or unexported fields
}

FanoutPool is the sync pool for fanout handlers. For each broker in the config, it will attempt to create a handler. It will also stop/delete the handler if the corresponding broker is deleted in the config.

func InitializeTestFanoutPool added in v0.16.0

func InitializeTestFanoutPool(ctx context.Context, podName metrics.PodName, containerName metrics.ContainerName, targets config.ReadonlyTargets, pubsubClient *pubsub.Client, opts ...Option) (*FanoutPool, error)

func NewFanoutPool added in v0.16.0

func NewFanoutPool(
	targets config.ReadonlyTargets,
	pubsubClient *pubsub.Client,
	deliverClient *http.Client,
	retryClient RetryClient,
	statsReporter *metrics.DeliveryReporter,
	opts ...Option,
) (*FanoutPool, error)

NewFanoutPool creates a new fanout handler pool.

func (*FanoutPool) SyncOnce added in v0.16.0

func (p *FanoutPool) SyncOnce(ctx context.Context) error

SyncOnce syncs once the handler pool based on the targets config.

type Handler

type Handler struct {
	// PubsubEvents is the CloudEvents Pubsub protocol to pull
	// messages as events.
	Subscription *pubsub.Subscription

	// Processor is the processor to process events.
	Processor processors.Interface

	// Timeout is the timeout for processing each individual event.
	Timeout time.Duration
	// contains filtered or unexported fields
}

Handler pulls Pubsub messages as events and processes them with chain of processors.

func NewHandler added in v0.16.0

func NewHandler(
	sub *pubsub.Subscription,
	processor processors.Interface,
	timeout time.Duration,
	retryPolicy RetryPolicy,
) *Handler

NewHandler creates a new Handler.

func (*Handler) IsAlive added in v0.15.0

func (h *Handler) IsAlive() bool

IsAlive indicates whether the handler is alive.

func (*Handler) Start

func (h *Handler) Start(ctx context.Context, done func(error))

Start starts the handler. done func will be called if the pubsub inbound is closed.

func (*Handler) Stop

func (h *Handler) Stop()

Stop stops the handlers.

type Option added in v0.16.0

type Option func(*Options)

Option is for providing individual option.

func WithDeliveryTimeout added in v0.16.0

func WithDeliveryTimeout(t time.Duration) Option

WithDeliveryTimeout sets the DeliveryTimeout.

func WithHandlerConcurrency added in v0.16.0

func WithHandlerConcurrency(c int) Option

WithHandlerConcurrency sets HandlerConcurrency.

func WithMaxConcurrentPerEvent added in v0.16.0

func WithMaxConcurrentPerEvent(c int) Option

WithMaxConcurrentPerEvent sets MaxConcurrencyPerEvent.

func WithPubsubReceiveSettings added in v0.16.0

func WithPubsubReceiveSettings(s pubsub.ReceiveSettings) Option

WithPubsubReceiveSettings sets PubsubReceiveSettings.

func WithRetryPolicy added in v0.16.0

func WithRetryPolicy(r RetryPolicy) Option

WithRetryPolicy sets the RetryPolicy.

func WithTimeoutPerEvent added in v0.16.0

func WithTimeoutPerEvent(t time.Duration) Option

WithTimeoutPerEvent sets TimeoutPerEvent.

type Options added in v0.16.0

type Options struct {
	// HandlerConcurrency is the number of goroutines
	// will be spawned in each handler.
	HandlerConcurrency int
	// MaxConcurrencyPerEvent is the max number of goroutines
	// will be spawned to handle an event.
	MaxConcurrencyPerEvent int
	// TimeoutPerEvent is the timeout for handling an event.
	TimeoutPerEvent time.Duration
	// DeliveryTimeout is the timeout for delivering an event to a consumer.
	DeliveryTimeout time.Duration
	// PubsubReceiveSettings is the pubsub receive settings.
	PubsubReceiveSettings pubsub.ReceiveSettings
	// RetryPolicy defines the retry policy for pubsub messages.
	RetryPolicy RetryPolicy
}

Options holds all the options for create handler pool.

func NewOptions added in v0.16.0

func NewOptions(opts ...Option) (*Options, error)

NewOptions creates a Options.

type RetryClient added in v0.16.0

type RetryClient ceclient.Client

func NewRetryClient added in v0.16.0

func NewRetryClient(ctx context.Context, client *pubsub.Client, opts ...ceclient.Option) (RetryClient, error)

NewRetryClient provides a retry CE client from a PubSub client and list of CE client options.

type RetryPolicy added in v0.16.0

type RetryPolicy struct {
	MinBackoff, MaxBackoff time.Duration
}

RetryPolicy defines the retry policy for pubsub messages. TODO: https://github.com/google/knative-gcp/issues/1100#issuecomment-638304147

type RetryPool added in v0.16.0

type RetryPool struct {
	// contains filtered or unexported fields
}

RetryPool is the sync pool for retry handlers. For each trigger in the config, it will attempt to create a handler. It will also stop/delete the handler if the corresponding trigger is deleted in the config.

func InitializeTestRetryPool added in v0.16.0

func InitializeTestRetryPool(targets config.ReadonlyTargets, podName metrics.PodName, containerName metrics.ContainerName, pubsubClient *pubsub.Client, opts ...Option) (*RetryPool, error)

func NewRetryPool added in v0.16.0

func NewRetryPool(
	targets config.ReadonlyTargets,
	pubsubClient *pubsub.Client,
	deliverClient *http.Client,
	statsReporter *metrics.DeliveryReporter,
	opts ...Option) (*RetryPool, error)

NewRetryPool creates a new retry handler pool.

func (*RetryPool) SyncOnce added in v0.16.0

func (p *RetryPool) SyncOnce(ctx context.Context) error

SyncOnce syncs once the handler pool based on the targets config.

type SyncPool added in v0.16.0

type SyncPool interface {
	SyncOnce(ctx context.Context) error
}

func StartSyncPool added in v0.16.0

func StartSyncPool(
	ctx context.Context,
	syncPool SyncPool,
	syncSignal <-chan struct{},
	maxStaleDuration time.Duration,
	healthCheckPort int,
) (SyncPool, error)

StartSyncPool starts the sync pool.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL