config

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxBufferSize = 1 << 20 //1048576

)

Variables

This section is empty.

Functions

This section is empty.

Types

type Assignment

type Assignment struct {
	Version   int
	UpdatedAt int64               // timestamp when created
	UpdatedBy string              // leader instance
	Map       map[string][]string // map instance to a list of task_name
}

type ClickHouseConfig

type ClickHouseConfig struct {
	Cluster   string
	DB        string
	Hosts     [][]string
	Port      int
	Username  string
	Password  string
	DsnParams string

	// Whether enable TLS encryption with clickhouse-server
	Secure bool
	// Whether skip verify clickhouse-server cert
	InsecureSkipVerify bool

	RetryTimes   int //<=0 means retry infinitely
	MaxOpenConns int
}

ClickHouseConfig configuration parameters

type Config

type Config struct {
	Kafka          KafkaConfig
	SchemaRegistry SchemaRegistryConfig
	Clickhouse     ClickHouseConfig
	Task           *TaskConfig
	Tasks          []*TaskConfig
	Assignment     Assignment
	LogLevel       string
}

Config struct used for different configurations use

func ParseLocalCfgFile

func ParseLocalCfgFile(cfgPath string) (cfg *Config, err error)

func (*Config) IsAssigned

func (cfg *Config) IsAssigned(instance, task string) (assigned bool)

func (*Config) Normallize

func (cfg *Config) Normallize() (err error)

normallize and validate configuration

type KafkaConfig

type KafkaConfig struct {
	Brokers  string
	Version  string
	Security map[string]string
	TLS      struct {
		Enable         bool
		CaCertFiles    string // Required. It's the CA cert.pem with which Kafka brokers certs be signed.
		ClientCertFile string // Required for client authentication. It's client cert.pem.
		ClientKeyFile  string // Required if and only if ClientCertFile is present. It's client key.pem.

		TrustStoreLocation string //JKS format of CA certificate, used to extract CA cert.pem.
		TrustStorePassword string
		KeystoreLocation   string //JKS format of client certificate and key, used to extrace client cert.pem and key.pem.
		KeystorePassword   string
		EndpIdentAlgo      string
	}
	//simplified sarama.Config.Net.SASL to only support SASL/PLAIN and SASL/GSSAPI(Kerberos)
	Sasl struct {
		// Whether or not to use SASL authentication when connecting to the broker
		// (defaults to false).
		Enable bool
		// Mechanism is the name of the enabled SASL mechanism.
		// Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI (defaults to PLAIN)
		Mechanism string
		// Username is the authentication identity (authcid) to present for
		// SASL/PLAIN or SASL/SCRAM authentication
		Username string
		// Password for SASL/PLAIN or SASL/SCRAM authentication
		Password string
		GSSAPI   struct {
			AuthType           int //1. KRB5_USER_AUTH, 2. KRB5_KEYTAB_AUTH
			KeyTabPath         string
			KerberosConfigPath string
			ServiceName        string
			Username           string
			Password           string
			Realm              string
			DisablePAFXFAST    bool
		}
	}
}

KafkaConfig configuration parameters

type SchemaRegistryConfig

type SchemaRegistryConfig struct {
	URL string
}

SchemaRegistryConfig configuration parameters

type TaskConfig

type TaskConfig struct {
	Name string

	KafkaClient   string
	Topic         string
	ConsumerGroup string

	// Earliest set to true to consume the message from oldest position
	Earliest bool
	Parser   string
	// the csv cloum title if Parser is csv
	CsvFormat []string
	Delimiter string

	TableName string

	// AutoSchema will auto fetch the schema from clickhouse
	AutoSchema     bool
	ExcludeColumns []string
	Dims           []struct {
		Name       string
		Type       string
		SourceName string
	} `json:"dims"`
	// DynamicSchema will add columns present in message to clickhouse. Requires AutoSchema be true.
	DynamicSchema struct {
		Enable  bool
		MaxDims int // the upper limit of dynamic columns number, <=0 means math.MaxInt16. protecting dirty data attack
		// A column is added for new key K if all following conditions are true:
		// - K isn't in ExcludeColumns
		// - number of existing columns doesn't reach MaxDims-1
		// - WhiteList is empty, or K matchs WhiteList
		// - BlackList is empty, or K doesn't match BlackList
		WhiteList string // the regexp of white list
		BlackList string // the regexp of black list
	}
	// PrometheusSchema expects each message is a Prometheus metric(timestamp, value, metric name and a list of labels).
	PrometheusSchema bool
	// fields match PromLabelsBlackList are not considered as labels. Requires PrometheusSchema be true.
	PromLabelsBlackList string // the regexp of black list
	// whether load series at startup
	LoadSeriesAtStartup bool

	// ShardingKey is the column name to which sharding against
	ShardingKey string `json:"shardingKey,omitempty"`
	// ShardingStripe take effect iff the sharding key is numerical
	ShardingStripe uint64 `json:"shardingStripe,omitempty"`

	FlushInterval int     `json:"flushInterval,omitempty"`
	BufferSize    int     `json:"bufferSize,omitempty"`
	TimeZone      string  `json:"timeZone"`
	TimeUnit      float64 `json:"timeUnit"`
}

Task configuration parameters

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL