Documentation ¶
Overview ¶
Package conduit wires up everything under the hood of a Conduit instance including metrics, telemetry, logging, and server construction. It should only ever interact with the Orchestrator, never individual services. All of that responsibility should be left to the Orchestrator.
Index ¶
Constants ¶
const ( DBTypeBadger = "badger" DBTypePostgres = "postgres" DBTypeInMemory = "inmemory" DBTypeSQLite = "sqlite" SchemaRegistryTypeConfluent = "confluent" SchemaRegistryTypeBuiltin = "builtin" )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct { ConduitCfg struct { Path string `long:"config.path" usage:"global conduit configuration file" default:"./conduit.yaml"` } `mapstructure:"config"` DB struct { // When Driver is specified it takes precedence over other DB related // fields. Driver database.DB Type string `long:"db.type" usage:"database type; accepts badger,postgres,inmemory,sqlite"` Badger struct { Path string `long:"db.badger.path" usage:"path to badger DB"` } Postgres struct { ConnectionString string `` /* 132-byte string literal not displayed */ Table string `long:"db.postgres.table" usage:"postgres table in which to store data (will be created if it does not exist)"` } SQLite struct { Path string `long:"db.sqlite.path" usage:"path to sqlite3 DB"` Table string `long:"db.sqlite.table" usage:"sqlite3 table in which to store data (will be created if it does not exist)"` } } API struct { Enabled bool `long:"api.enabled" usage:"enable HTTP and gRPC API"` HTTP struct { Address string `long:"api.http.address" usage:"address for serving the HTTP API"` } GRPC struct { // This is the address where the gRPC API will be served which is shared as a global flag // for the Conduit CLI. Address string `long:"api.grpc.address" usage:"address for serving the gRPC API"` } } Log struct { NewLogger func(level, format string) log.CtxLogger Level string `long:"log.level" usage:"sets logging level; accepts debug, info, warn, error, trace"` Format string `long:"log.format" usage:"sets the format of the logging; accepts json, cli"` } Connectors struct { Path string `long:"connectors.path" usage:"path to standalone connectors' directory"` } Processors struct { Path string `long:"processors.path" usage:"path to standalone processors' directory"` } Pipelines struct { Path string `long:"pipelines.path" usage:"path to pipelines' directory"` ExitOnDegraded bool `long:"pipelines.exit-on-degraded" mapstructure:"exit-on-degraded" usage:"exit Conduit if a pipeline is degraded"` ErrorRecovery struct { // MinDelay is the minimum delay before restart: Default: 1 second MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" mapstructure:"min-delay" usage:"minimum delay before restart"` // MaxDelay is the maximum delay before restart: Default: 10 minutes MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" mapstructure:"max-delay" usage:"maximum delay before restart"` // BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2 BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" mapstructure:"backoff-factor" usage:"backoff factor applied to the last delay"` // MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite) MaxRetries int64 `long:"pipelines.error-recovery.max-retries" mapstructure:"max-retries" usage:"maximum number of retries"` // MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes MaxRetriesWindow time.Duration `` /* 179-byte string literal not displayed */ } `mapstructure:"error-recovery"` } ConnectorPlugins map[string]sdk.Connector SchemaRegistry struct { Type string `long:"schema-registry.type" usage:"schema registry type; accepts builtin,confluent"` Confluent struct { ConnectionString string `` /* 135-byte string literal not displayed */ } } `mapstructure:"schema-registry"` Preview struct { // PipelineArchV2 enables the new pipeline architecture. PipelineArchV2 bool `` /* 182-byte string literal not displayed */ } Dev struct { CPUProfile string `long:"dev.cpuprofile" usage:"write CPU profile to file"` MemProfile string `long:"dev.memprofile" usage:"write memory profile to file"` BlockProfile string `long:"dev.blockprofile" usage:"write block profile to file"` } }
Config holds all configurable values for Conduit.
func DefaultConfig ¶ added in v0.7.2
func DefaultConfig() Config
func DefaultConfigWithBasePath ¶ added in v0.12.2
type Entrypoint ¶ added in v0.7.2
type Entrypoint struct{}
Entrypoint provides methods related to the Conduit entrypoint (parsing config, managing interrupt signals etc.).
func (*Entrypoint) CancelOnInterrupt ¶ added in v0.7.2
func (*Entrypoint) CancelOnInterrupt(ctx context.Context) context.Context
CancelOnInterrupt returns a context that is canceled when the interrupt signal is received. * After the first signal the function will continue to listen * On the second signal executes a hard exit, without waiting for a graceful shutdown.
func (*Entrypoint) Serve ¶ added in v0.7.2
func (e *Entrypoint) Serve(cfg Config)
Serve is the entrypoint for Conduit. It is a convenience function if you want to tweak the Conduit CLI and inject different default values or built-in plugins while retaining the same flags and exit behavior. You can adjust the default values by setting the corresponding field in Config. The default config can be retrieved with DefaultConfig. The config will be populated with values parsed from:
- command line flags (highest priority)
- environment variables
- config file (lowest priority)
func (*Entrypoint) Splash ¶ added in v0.7.2
func (*Entrypoint) Splash() string
type Runtime ¶
type Runtime struct { Config Config DB database.DB Orchestrator *orchestrator.Orchestrator ProvisionService *provisioning.Service SchemaRegistry schemaregistry.Registry // Ready will be closed when Runtime has successfully started Ready chan struct{} // contains filtered or unexported fields }
Runtime sets up all services for serving and monitoring a Conduit instance.
func NewRuntime ¶
NewRuntime sets up a Runtime instance and primes it for start.