Documentation ¶
Index ¶
Constants ¶
const (
// DefaultHealthCheckPort is the default port for checking sync pool health.
DefaultHealthCheckPort = 8080
)
Variables ¶
var ( DefaultCEClientOpts = []ceclient.Option{ ceclient.WithUUIDs(), ceclient.WithTimeNow(), ceclient.WithTracePropagation(), } DefaultHTTPClient = &http.Client{ Transport: &ochttp.Transport{ Base: &http.Transport{ MaxIdleConns: 1000, MaxIdleConnsPerHost: 500, MaxConnsPerHost: 500, IdleConnTimeout: 30 * time.Second, }, Propagation: &tracecontext.HTTPFormat{}, }, } // ProviderSet provides the fanout and retry sync pools using the default client options. In // order to inject either pool, ProjectID, []Option, and config.ReadOnlyTargets must be // externally provided. ProviderSet = wire.NewSet( NewFanoutPool, NewRetryPool, clients.NewPubsubClient, NewRetryClient, wire.Value(DefaultHTTPClient), wire.Value(DefaultCEClientOpts), ) )
Functions ¶
This section is empty.
Types ¶
type FanoutPool ¶ added in v0.16.0
type FanoutPool struct {
// contains filtered or unexported fields
}
FanoutPool is the sync pool for fanout handlers. For each broker in the config, it will attempt to create a handler. It will also stop/delete the handler if the corresponding broker is deleted in the config.
func InitializeTestFanoutPool ¶ added in v0.16.0
func InitializeTestFanoutPool(ctx context.Context, podName metrics.PodName, containerName metrics.ContainerName, targets config.ReadonlyTargets, pubsubClient *pubsub.Client, opts ...Option) (*FanoutPool, error)
func NewFanoutPool ¶ added in v0.16.0
func NewFanoutPool( targets config.ReadonlyTargets, pubsubClient *pubsub.Client, deliverClient *http.Client, retryClient RetryClient, statsReporter *metrics.DeliveryReporter, opts ...Option, ) (*FanoutPool, error)
NewFanoutPool creates a new fanout handler pool.
type Handler ¶
type Handler struct { // PubsubEvents is the CloudEvents Pubsub protocol to pull // messages as events. Subscription *pubsub.Subscription // Processor is the processor to process events. Processor processors.Interface // Timeout is the timeout for processing each individual event. Timeout time.Duration // contains filtered or unexported fields }
Handler pulls Pubsub messages as events and processes them with chain of processors.
func NewHandler ¶ added in v0.16.0
func NewHandler( sub *pubsub.Subscription, processor processors.Interface, timeout time.Duration, retryPolicy RetryPolicy, ) *Handler
NewHandler creates a new Handler.
type Option ¶ added in v0.16.0
type Option func(*Options)
Option is for providing individual option.
func WithDeliveryTimeout ¶ added in v0.16.0
WithDeliveryTimeout sets the DeliveryTimeout.
func WithHandlerConcurrency ¶ added in v0.16.0
WithHandlerConcurrency sets HandlerConcurrency.
func WithMaxConcurrentPerEvent ¶ added in v0.16.0
WithMaxConcurrentPerEvent sets MaxConcurrencyPerEvent.
func WithPubsubReceiveSettings ¶ added in v0.16.0
func WithPubsubReceiveSettings(s pubsub.ReceiveSettings) Option
WithPubsubReceiveSettings sets PubsubReceiveSettings.
func WithRetryPolicy ¶ added in v0.16.0
func WithRetryPolicy(r RetryPolicy) Option
WithRetryPolicy sets the RetryPolicy.
func WithTimeoutPerEvent ¶ added in v0.16.0
WithTimeoutPerEvent sets TimeoutPerEvent.
type Options ¶ added in v0.16.0
type Options struct { // HandlerConcurrency is the number of goroutines // will be spawned in each handler. HandlerConcurrency int // MaxConcurrencyPerEvent is the max number of goroutines // will be spawned to handle an event. MaxConcurrencyPerEvent int // TimeoutPerEvent is the timeout for handling an event. TimeoutPerEvent time.Duration // DeliveryTimeout is the timeout for delivering an event to a consumer. DeliveryTimeout time.Duration // PubsubReceiveSettings is the pubsub receive settings. PubsubReceiveSettings pubsub.ReceiveSettings // RetryPolicy defines the retry policy for pubsub messages. RetryPolicy RetryPolicy }
Options holds all the options for create handler pool.
func NewOptions ¶ added in v0.16.0
NewOptions creates a Options.
type RetryClient ¶ added in v0.16.0
func NewRetryClient ¶ added in v0.16.0
func NewRetryClient(ctx context.Context, client *pubsub.Client, opts ...ceclient.Option) (RetryClient, error)
NewRetryClient provides a retry CE client from a PubSub client and list of CE client options.
type RetryPolicy ¶ added in v0.16.0
RetryPolicy defines the retry policy for pubsub messages. TODO: https://github.com/google/knative-gcp/issues/1100#issuecomment-638304147
type RetryPool ¶ added in v0.16.0
type RetryPool struct {
// contains filtered or unexported fields
}
RetryPool is the sync pool for retry handlers. For each trigger in the config, it will attempt to create a handler. It will also stop/delete the handler if the corresponding trigger is deleted in the config.
func InitializeTestRetryPool ¶ added in v0.16.0
func NewRetryPool ¶ added in v0.16.0
func NewRetryPool( targets config.ReadonlyTargets, pubsubClient *pubsub.Client, deliverClient *http.Client, statsReporter *metrics.DeliveryReporter, opts ...Option) (*RetryPool, error)
NewRetryPool creates a new retry handler pool.