Documentation ¶
Index ¶
- Constants
- func MaskDiff(topics []string, maskFile string, desiredVersion string, currentVersion string, ...) ([]string, []string, []string, error)
- func NewRedshiftConn(client client.Client, secretName, secretNamespace string, database *string) (*redshift.Redshift, error)
- func NewRedshiftConnection(secret map[string]string, schema string) (*redshift.Redshift, error)
- type Batcher
- func (b Batcher) Config() *corev1.ConfigMap
- func (b Batcher) Deployment() *appsv1.Deployment
- func (b Batcher) Name() string
- func (b Batcher) Namespace() string
- func (b Batcher) Topics() []string
- func (b Batcher) UpdateConfig(current *corev1.ConfigMap) bool
- func (b Batcher) UpdateDeployment(current *appsv1.Deployment) bool
- type ConfigMapCreatedEvent
- type ConfigMapDeletedEvent
- type Deployment
- type DeploymentCreatedEvent
- type DeploymentDeletedEvent
- type DeploymentUpdatedEvent
- type Loader
- func (l Loader) Config() *corev1.ConfigMap
- func (l Loader) Deployment() *appsv1.Deployment
- func (l Loader) Name() string
- func (l Loader) Namespace() string
- func (l Loader) Topics() []string
- func (l Loader) UpdateConfig(current *corev1.ConfigMap) bool
- func (l Loader) UpdateDeployment(current *appsv1.Deployment) bool
- type ReconcilerEvent
- type RedshiftSinkReconciler
Constants ¶
View Source
const ( BatcherTag = "batcher" BatcherLabelInstance = "redshiftbatcher" )
View Source
const ( LoaderTag = "loader" LoaderLabelInstance = "redshiftloader" )
View Source
const ( K8sEventTypeNormal = "Normal" K8sEventTypeWarning = "Warning" )
View Source
const ( AllSinkGroup = "all" MainSinkGroup = "main" ReloadSinkGroup = "reload" ReloadDupeSinkGroup = "reload-dupe" DefaultMaxBatcherLag = int64(100) DefautMaxLoaderLag = int64(10) ReloadTableSuffix = "_ts_adx_reload" )
View Source
const ( InstanceLabel = "app.kubernetes.io/instance" InstanceName = "practo.dev/name" SinkGroupLabel = "practo.dev/sinkgroup" RskResource = "practo.dev/resource" )
View Source
const (
MaxTopicRelease = 5
)
Variables ¶
This section is empty.
Functions ¶
func MaskDiff ¶
func MaskDiff( topics []string, maskFile string, desiredVersion string, currentVersion string, gitToken string, kafkaTopicsCache *sync.Map, includeTablesCache *sync.Map, ) ( []string, []string, []string, error, )
MaskDiff reads two database mask configurations and returns the list of topics whose mask values has changed. returns the updated list of kafka topics return the list of include tables based on desired mask config
func NewRedshiftConn ¶
Types ¶
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
func (Batcher) Deployment ¶
func (b Batcher) Deployment() *appsv1.Deployment
func (Batcher) UpdateDeployment ¶
func (b Batcher) UpdateDeployment(current *appsv1.Deployment) bool
type ConfigMapCreatedEvent ¶
func (ConfigMapCreatedEvent) Record ¶
func (d ConfigMapCreatedEvent) Record(recorder record.EventRecorder)
type ConfigMapDeletedEvent ¶
func (ConfigMapDeletedEvent) Record ¶
func (d ConfigMapDeletedEvent) Record(recorder record.EventRecorder)
type Deployment ¶
type Deployment interface { Name() string Namespace() string Config() *corev1.ConfigMap Deployment() *appsv1.Deployment UpdateConfig(current *corev1.ConfigMap) bool UpdateDeployment(current *appsv1.Deployment) bool Topics() []string }
func NewBatcher ¶
func NewBatcher( name string, rsk *tipocav1.RedshiftSink, maskFileVersion string, secret map[string]string, sinkGroup string, sinkGroupSpec *tipocav1.SinkGroupSpec, consumerGroups map[string]consumerGroup, defaultImage string, defaultKafkaVersion string, tlsConfig *kafka.TLSConfig, ) ( Deployment, error, )
func NewLoader ¶
func NewLoader( name string, rsk *tipocav1.RedshiftSink, tableSuffix string, secret map[string]string, sinkGroup string, sinkGroupSpec *tipocav1.SinkGroupSpec, consumerGroups map[string]consumerGroup, defaultImage string, defaultKafkaVersion string, tlsConfig *kafka.TLSConfig, defaultMaxOpenConns int, defaultMaxIdleConns int, prometheusURL string, redshiftMetrics bool, ) ( Deployment, error, )
type DeploymentCreatedEvent ¶
func (DeploymentCreatedEvent) Record ¶
func (d DeploymentCreatedEvent) Record(recorder record.EventRecorder)
type DeploymentDeletedEvent ¶
func (DeploymentDeletedEvent) Record ¶
func (d DeploymentDeletedEvent) Record(recorder record.EventRecorder)
type DeploymentUpdatedEvent ¶
func (DeploymentUpdatedEvent) Record ¶
func (d DeploymentUpdatedEvent) Record(recorder record.EventRecorder)
type Loader ¶
type Loader struct {
// contains filtered or unexported fields
}
func (Loader) Deployment ¶
func (l Loader) Deployment() *appsv1.Deployment
func (Loader) UpdateDeployment ¶
func (l Loader) UpdateDeployment(current *appsv1.Deployment) bool
type ReconcilerEvent ¶
type ReconcilerEvent interface { // Record this into an event recorder as a Kubernetes API event Record(recorder record.EventRecorder) }
ReconcilerEvent represents the action of the operator having actually done anything. Any meaningful change should result in one of these.
type RedshiftSinkReconciler ¶
type RedshiftSinkReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme Recorder record.EventRecorder KafkaTopicRegexes *sync.Map KafkaClients *sync.Map KafkaTopicsCache *sync.Map KafkaRealtimeCache *sync.Map ReleaseCache *sync.Map GitCache *sync.Map IncludeTablesCache *sync.Map DefaultBatcherImage string DefaultLoaderImage string DefaultSecretRefName string DefaultSecretRefNamespace string DefaultKafkaVersion string DefaultRedshiftMaxIdleConns int DefaultRedshiftMaxOpenConns int AllowedResources []string PrometheusClient prometheus.Client RedshiftMetrics bool }
RedshiftSinkReconciler reconciles a RedshiftSink object
func (*RedshiftSinkReconciler) SetupWithManager ¶
func (r *RedshiftSinkReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up the controller and applies all controller configs
Click to show internal directories.
Click to hide internal directories.