conduit

package
v0.13.0-nightly.20241221 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: Apache-2.0 Imports: 60 Imported by: 1

Documentation

Overview

Package conduit wires up everything under the hood of a Conduit instance including metrics, telemetry, logging, and server construction. It should only ever interact with the Orchestrator, never individual services. All of that responsibility should be left to the Orchestrator.

Index

Constants

View Source
const (
	DBTypeBadger   = "badger"
	DBTypePostgres = "postgres"
	DBTypeInMemory = "inmemory"
	DBTypeSQLite   = "sqlite"

	SchemaRegistryTypeConfluent = "confluent"
	SchemaRegistryTypeBuiltin   = "builtin"
)

Variables

This section is empty.

Functions

func Version

func Version(appendOSArch bool) string

Types

type Config

type Config struct {
	ConduitCfgPath string `long:"config.path" usage:"global conduit configuration file" default:"./conduit.yaml"`
	DB             ConfigDB
	API            ConfigAPI
	Log            ConfigLog

	Connectors struct {
		Path string `long:"connectors.path" usage:"path to standalone connectors' directory"`
	}

	Processors struct {
		Path string `long:"processors.path" usage:"path to standalone processors' directory"`
	}

	Pipelines struct {
		Path           string `long:"pipelines.path" usage:"path to pipelines' directory"`
		ExitOnDegraded bool   `long:"pipelines.exit-on-degraded" usage:"exit Conduit if a pipeline is degraded"`
		ErrorRecovery  struct {
			// MinDelay is the minimum delay before restart: Default: 1 second
			MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" usage:"minimum delay before restart"`
			// MaxDelay is the maximum delay before restart: Default: 10 minutes
			MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" usage:"maximum delay before restart"`
			// BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2
			BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" usage:"backoff factor applied to the last delay"`
			// MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite)
			MaxRetries int64 `long:"pipelines.error-recovery.max-retries" usage:"maximum number of retries"`
			// MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes
			MaxRetriesWindow time.Duration `` /* 145-byte string literal not displayed */
		}
	}

	ConnectorPlugins map[string]sdk.Connector

	SchemaRegistry struct {
		Type string `long:"schema-registry.type" usage:"schema registry type; accepts builtin,confluent"`

		Confluent struct {
			ConnectionString string `long:"schema-registry.confluent.connection-string" usage:"confluent schema registry connection string"`
		}
	}

	Preview struct {
		// PipelineArchV2 enables the new pipeline architecture.
		PipelineArchV2 bool `` /* 182-byte string literal not displayed */
	}

	Dev struct {
		CPUProfile   string `long:"dev.cpuprofile" usage:"write CPU profile to file"`
		MemProfile   string `long:"dev.memprofile" usage:"write memory profile to file"`
		BlockProfile string `long:"dev.blockprofile" usage:"write block profile to file"`
	}
}

Config holds all configurable values for Conduit.

func DefaultConfig added in v0.7.2

func DefaultConfig() Config

func DefaultConfigWithBasePath added in v0.12.2

func DefaultConfigWithBasePath(basePath string) Config

func (Config) Validate

func (c Config) Validate() error

type ConfigAPI

type ConfigAPI struct {
	Enabled bool `long:"api.enabled" usage:"enable HTTP and gRPC API"`
	HTTP    struct {
		Address string `long:"http.address" usage:"address for serving the HTTP API"`
	}
	GRPC struct {
		Address string `long:"grpc.address" usage:"address for serving the gRPC API"`
	}
}

type ConfigDB

type ConfigDB struct {
	// When Driver is specified it takes precedence over other DB related
	// fields.
	Driver database.DB

	Type   string `long:"db.type" usage:"database type; accepts badger,postgres,inmemory,sqlite"`
	Badger struct {
		Path string `long:"db.badger.path" usage:"path to badger DB"`
	}
	Postgres struct {
		ConnectionString string `` /* 132-byte string literal not displayed */
		Table            string `long:"db.postgres.table" usage:"postgres table in which to store data (will be created if it does not exist)"`
	}
	SQLite struct {
		Path  string `long:"db.sqlite.path" usage:"path to sqlite3 DB"`
		Table string `long:"db.sqlite.table" usage:"sqlite3 table in which to store data (will be created if it does not exist)"`
	}
}

type ConfigLog

type ConfigLog struct {
	NewLogger func(level, format string) log.CtxLogger
	Level     string `long:"log.level" usage:"sets logging level; accepts debug, info, warn, error, trace"`
	Format    string `long:"log.format" usage:"sets the format of the logging; accepts json, cli"`
}

type Entrypoint added in v0.7.2

type Entrypoint struct{}

Entrypoint provides methods related to the Conduit entrypoint (parsing config, managing interrupt signals etc.).

func (*Entrypoint) CancelOnInterrupt added in v0.7.2

func (*Entrypoint) CancelOnInterrupt(ctx context.Context) context.Context

CancelOnInterrupt returns a context that is canceled when the interrupt signal is received. * After the first signal the function will continue to listen * On the second signal executes a hard exit, without waiting for a graceful shutdown.

func (*Entrypoint) Serve added in v0.7.2

func (e *Entrypoint) Serve(cfg Config)

Serve is the entrypoint for Conduit. It is a convenience function if you want to tweak the Conduit CLI and inject different default values or built-in plugins while retaining the same flags and exit behavior. You can adjust the default values by setting the corresponding field in Config. The default config can be retrieved with DefaultConfig. The config will be populated with values parsed from:

  • command line flags (highest priority)
  • environment variables
  • config file (lowest priority)

func (*Entrypoint) Splash added in v0.7.2

func (*Entrypoint) Splash() string

type Runtime

type Runtime struct {
	Config Config

	DB               database.DB
	Orchestrator     *orchestrator.Orchestrator
	ProvisionService *provisioning.Service
	SchemaRegistry   schemaregistry.Registry

	// Ready will be closed when Runtime has successfully started
	Ready chan struct{}
	// contains filtered or unexported fields
}

Runtime sets up all services for serving and monitoring a Conduit instance.

func NewRuntime

func NewRuntime(cfg Config) (*Runtime, error)

NewRuntime sets up a Runtime instance and primes it for start.

func (*Runtime) Run

func (r *Runtime) Run(ctx context.Context) (err error)

Run initializes all of Conduit's underlying services and starts the GRPC and HTTP APIs. This function blocks until the supplied context is cancelled or one of the services experiences a fatal error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL