conduit

package
v0.13.0-nightly.20250117 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: Apache-2.0 Imports: 60 Imported by: 1

Documentation

Overview

Package conduit wires up everything under the hood of a Conduit instance including metrics, telemetry, logging, and server construction. It should only ever interact with the Orchestrator, never individual services. All of that responsibility should be left to the Orchestrator.

Index

Constants

View Source
const (
	DBTypeBadger   = "badger"
	DBTypePostgres = "postgres"
	DBTypeInMemory = "inmemory"
	DBTypeSQLite   = "sqlite"

	SchemaRegistryTypeConfluent = "confluent"
	SchemaRegistryTypeBuiltin   = "builtin"
)

Variables

This section is empty.

Functions

func Version

func Version(appendOSArch bool) string

Types

type Config

type Config struct {
	ConduitCfg struct {
		Path string `long:"config.path" usage:"global conduit configuration file" default:"./conduit.yaml"`
	} `mapstructure:"config"`

	DB struct {
		// When Driver is specified it takes precedence over other DB related
		// fields.
		Driver database.DB

		Type   string `long:"db.type" usage:"database type; accepts badger,postgres,inmemory,sqlite"`
		Badger struct {
			Path string `long:"db.badger.path" usage:"path to badger DB"`
		}
		Postgres struct {
			ConnectionString string `` /* 132-byte string literal not displayed */
			Table            string `long:"db.postgres.table" usage:"postgres table in which to store data (will be created if it does not exist)"`
		}
		SQLite struct {
			Path  string `long:"db.sqlite.path" usage:"path to sqlite3 DB"`
			Table string `long:"db.sqlite.table" usage:"sqlite3 table in which to store data (will be created if it does not exist)"`
		}
	}

	API struct {
		Enabled bool `long:"api.enabled" usage:"enable HTTP and gRPC API"`
		HTTP    struct {
			Address string `long:"api.http.address" usage:"address for serving the HTTP API"`
		}
		GRPC struct {
			// This is the address where the gRPC API will be served which is shared as a global flag
			// for the Conduit CLI.
			Address string `long:"api.grpc.address" usage:"address for serving the gRPC API"`
		}
	}

	Log struct {
		NewLogger func(level, format string) log.CtxLogger
		Level     string `long:"log.level" usage:"sets logging level; accepts debug, info, warn, error, trace"`
		Format    string `long:"log.format" usage:"sets the format of the logging; accepts json, cli"`
	}

	Connectors struct {
		Path string `long:"connectors.path" usage:"path to standalone connectors' directory"`
	}

	Processors struct {
		Path string `long:"processors.path" usage:"path to standalone processors' directory"`
	}

	Pipelines struct {
		Path           string `long:"pipelines.path" usage:"path to pipelines' directory"`
		ExitOnDegraded bool   `long:"pipelines.exit-on-degraded" mapstructure:"exit-on-degraded" usage:"exit Conduit if a pipeline is degraded"`
		ErrorRecovery  struct {
			// MinDelay is the minimum delay before restart: Default: 1 second
			MinDelay time.Duration `long:"pipelines.error-recovery.min-delay" mapstructure:"min-delay" usage:"minimum delay before restart"`
			// MaxDelay is the maximum delay before restart: Default: 10 minutes
			MaxDelay time.Duration `long:"pipelines.error-recovery.max-delay" mapstructure:"max-delay" usage:"maximum delay before restart"`
			// BackoffFactor is the factor by which the delay is multiplied after each restart: Default: 2
			BackoffFactor int `long:"pipelines.error-recovery.backoff-factor" mapstructure:"backoff-factor" usage:"backoff factor applied to the last delay"`
			// MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite)
			MaxRetries int64 `long:"pipelines.error-recovery.max-retries" mapstructure:"max-retries" usage:"maximum number of retries"`
			// MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes
			MaxRetriesWindow time.Duration `` /* 179-byte string literal not displayed */
		} `mapstructure:"error-recovery"`
	}

	ConnectorPlugins map[string]sdk.Connector

	SchemaRegistry struct {
		Type string `long:"schema-registry.type" usage:"schema registry type; accepts builtin,confluent"`

		Confluent struct {
			ConnectionString string `` /* 135-byte string literal not displayed */
		}
	} `mapstructure:"schema-registry"`

	Preview struct {
		// PipelineArchV2 enables the new pipeline architecture.
		PipelineArchV2 bool `` /* 182-byte string literal not displayed */
	}

	Dev struct {
		CPUProfile   string `long:"dev.cpuprofile" usage:"write CPU profile to file"`
		MemProfile   string `long:"dev.memprofile" usage:"write memory profile to file"`
		BlockProfile string `long:"dev.blockprofile" usage:"write block profile to file"`
	}
}

Config holds all configurable values for Conduit.

func DefaultConfig added in v0.7.2

func DefaultConfig() Config

func DefaultConfigWithBasePath added in v0.12.2

func DefaultConfigWithBasePath(basePath string) Config

func (Config) Validate

func (c Config) Validate() error

type Entrypoint added in v0.7.2

type Entrypoint struct{}

Entrypoint provides methods related to the Conduit entrypoint (parsing config, managing interrupt signals etc.).

func (*Entrypoint) CancelOnInterrupt added in v0.7.2

func (*Entrypoint) CancelOnInterrupt(ctx context.Context) context.Context

CancelOnInterrupt returns a context that is canceled when the interrupt signal is received. * After the first signal the function will continue to listen * On the second signal executes a hard exit, without waiting for a graceful shutdown.

func (*Entrypoint) Serve added in v0.7.2

func (e *Entrypoint) Serve(cfg Config)

Serve is the entrypoint for Conduit. It is a convenience function if you want to tweak the Conduit CLI and inject different default values or built-in plugins while retaining the same flags and exit behavior. You can adjust the default values by setting the corresponding field in Config. The default config can be retrieved with DefaultConfig. The config will be populated with values parsed from:

  • command line flags (highest priority)
  • environment variables
  • config file (lowest priority)

func (*Entrypoint) Splash added in v0.7.2

func (*Entrypoint) Splash() string

type Runtime

type Runtime struct {
	Config Config

	DB               database.DB
	Orchestrator     *orchestrator.Orchestrator
	ProvisionService *provisioning.Service
	SchemaRegistry   schemaregistry.Registry

	// Ready will be closed when Runtime has successfully started
	Ready chan struct{}
	// contains filtered or unexported fields
}

Runtime sets up all services for serving and monitoring a Conduit instance.

func NewRuntime

func NewRuntime(cfg Config) (*Runtime, error)

NewRuntime sets up a Runtime instance and primes it for start.

func (*Runtime) Run

func (r *Runtime) Run(ctx context.Context) (err error)

Run initializes all of Conduit's underlying services and starts the GRPC and HTTP APIs. This function blocks until the supplied context is cancelled or one of the services experiences a fatal error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL