Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultSourceConfig = SourceConfig{ LostSegmentTimeout: 24 * time.Hour, LateSegmentRetries: 16, FirstSegmentDelay: 30 * time.Minute, BatchSize: 10000, WorkerInitDelay: 10 * time.Second, WorkerChanSize: 100, WorkerStopTimeout: 10 * time.Second, }
DefaultSourceConfig is the default ingress source configuration.
Functions ¶
This section is empty.
Types ¶
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller represents the ingress controller
func New ¶
func New(config ControllerConfig) (*Controller, error)
New returns a new ingress controller instance
type ControllerConfig ¶
type ControllerConfig struct { // Consumer is used to provide group membership functionality used to distribute // work across multiple instances. It ensures that only one instance is allowed to // process a certain source topic partition at any given moment. Consumer core.Factory `required:"true"` // Producer is used for writing messages to destination topic. Producer core.Factory `required:"true"` // SegmentStore provides access to segment events and contents. SegmentStore core.Factory `required:"true"` // CheckpointStore is used for offset tracking. CheckpointStore core.Factory `required:"true"` // Unique name that identifies the local region/data center/cloud. // // Field value is required. LocalRegion string `required:"true"` // List of sources to ingress from. // // Will use DefaultSourceConfig if source config was not set. // // Field value is required. Sources map[Source]*SourceConfig `required:"true"` }
ControllerConfig represents the ingress controller configuration
type Source ¶
type Source struct { // Region name. // // Field value is required. Region string `required:"true"` // Kafka topic name. // // Field value is required. Topic string `required:"true"` }
Source represents the ingress source.
type SourceConfig ¶
type SourceConfig struct { // Topic name where segment messages will be produced. // // Default value appends '.ingress' suffix to source topic name. DestinationTopic string // Maximum time to wait for a late segment before it is declared lost and skipped. // // WARNING: has the potential to break the at-least-once delivery promise. // // The ingress worker detects missing segments and will reload bucket state up to // LateSegmentRetries times using exponential backoff until LostSegmentTimeout is reached. // // Possible root causes for this scenario: // - problems with cross-region AWS S3 bucket sync operation. // - problems related to AWS S3 eventual consistency model. // - dropped AWS S3 notification event en-route from S3 -> SQS -> SQSSegmentEventSource -> ConsistentSegmentStore. // // The default value is currently set to 24h which should give enough time to // operations/SRE team to react and fix the problem. LostSegmentTimeout time.Duration `min:"1ms"` // Number of retry attempts for a late segment before it is declared lost and skipped. // // Uses exponential backoff. // The first computed backoff interval should be at least 1s. LateSegmentRetries int `min:"1" max:"50"` // Avoids issues related to S3 eventual consistency model. // // A higher value results in smaller chance of missing first segment. FirstSegmentDelay time.Duration `min:"1ms"` // Number of segment messages to read/produce in each request. // // A higher value usually results in better throughput. // A checkpoint is performed after each successful batch. BatchSize int `min:"1"` // Allows last checkpoint to propagate and avoids thundering herd effects during Kafka group rebalance. WorkerInitDelay time.Duration `min:"1ms"` // Size of ingress worker buffered channel. WorkerChanSize int `min:"1"` // Ingress worker shutdown grace period. WorkerStopTimeout time.Duration `min:"1ms"` // Retrier instance used for Producer operations ProducerRetrier core.Retrier // Retrier instance used for SegmentStore operations SegmentStoreRetrier core.Retrier // Retrier instance used for CheckpointStore operations CheckpointStoreRetrier core.Retrier // contains filtered or unexported fields }
SourceConfig represents the ingress source configuration.
Click to show internal directories.
Click to hide internal directories.