Documentation ¶
Index ¶
Constants ¶
View Source
const ( SourceKafkaConsumerConfigAutoCommitEnable = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE" SourceKafkaConsumerConfigAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" SourceKafkaConsumerConfigBootstrapServers = "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS" )
Kafka-related constants
View Source
const ( SinkTypeInflux = "INFLUX" SinkTypeKafka = "KAFKA" SinkTypeBigquery = "BIGQUERY" )
Sink types
View Source
const ( JobStateRunning = "running" JobStateSuspended = "suspended" StateDeployed = "DEPLOYED" StateUserStopped = "USER_STOPPED" StateSystemStopped = "SYSTEM_STOPPED" )
View Source
const ( StopAction = "stop" StartAction = "start" ResetAction = "reset" )
View Source
const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"
Variables ¶
View Source
var Module = module.Descriptor{ Kind: "dagger", Dependencies: map[string]string{ // contains filtered or unexported fields }, Actions: []module.ActionDesc{ { Name: module.CreateAction, Description: "Creates a new dagger", }, { Name: module.UpdateAction, Description: "Updates an existing dagger", }, { Name: StopAction, Description: "Suspends a running dagger", }, { Name: StartAction, Description: "Starts a suspended dagger", }, { Name: ResetAction, Description: "Resets the offset of a dagger", }, }, DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) { conf := defaultDriverConf if err := json.Unmarshal(confJSON, &conf); err != nil { return nil, err } else if err := validator.TaggedStruct(conf); err != nil { return nil, err } return &daggerDriver{ conf: conf, timeNow: time.Now, kubeDeploy: func(_ context.Context, isCreate bool, kubeConf kube.Config, hc helm.ReleaseConfig) error { canUpdate := func(rel *release.Release) bool { curLabels, ok := rel.Config[labelsConfKey].(map[string]any) if !ok { return false } newLabels, ok := hc.Values[labelsConfKey].(map[string]string) if !ok { return false } isManagedByEntropy := curLabels[labelOrchestrator] == orchestratorLabelValue isSameDeployment := curLabels[labelDeployment] == newLabels[labelDeployment] return isManagedByEntropy && isSameDeployment } helmCl := helm.NewClient(&helm.Config{Kubernetes: kubeConf}) _, errHelm := helmCl.Upsert(&hc, canUpdate) return errHelm }, kubeGetPod: func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) { kubeCl, err := kube.NewClient(ctx, conf) if err != nil { return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error()) } return kubeCl.GetPodDetails(ctx, ns, labels, func(pod v1.Pod) bool { return pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil }) }, kubeGetCRD: func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) { kubeCl, err := kube.NewClient(ctx, conf) if err != nil { return kube.FlinkDeploymentStatus{}, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error()) } crd, err := kubeCl.GetCRDDetails(ctx, ns, name) if err != nil { return kube.FlinkDeploymentStatus{}, err } return parseFlinkCRDStatus(crd.Object) }, consumerReset: consumerReset, }, nil }, }
Functions ¶
This section is empty.
Types ¶
type ChartValues ¶
type Config ¶
type Config struct { Resources Resources `json:"resources,omitempty"` Source []Source `json:"source,omitempty"` Sink Sink `json:"sink,omitempty"` EnvVariables map[string]string `json:"env_variables,omitempty"` Replicas int `json:"replicas"` SinkType string `json:"sink_type"` Team string `json:"team"` FlinkName string `json:"flink_name,omitempty"` DeploymentID string `json:"deployment_id,omitempty"` Savepoint any `json:"savepoint,omitempty"` ChartValues *ChartValues `json:"chart_values,omitempty"` Deleted bool `json:"deleted,omitempty"` Namespace string `json:"namespace,omitempty"` PrometheusURL string `json:"prometheus_url,omitempty"` JarURI string `json:"jar_uri,omitempty"` State string `json:"state"` JobState string `json:"job_state"` ResetOffset string `json:"reset_offset"` StopTime *time.Time `json:"stop_time,omitempty"` }
type FlinkCRDStatus ¶
type Output ¶
type Output struct { JMDeployStatus string `json:"jm_deploy_status,omitempty"` JobStatus string `json:"job_status,omitempty"` Reconcilation string `json:"reconcilation,omitempty"` Pods []kube.Pod `json:"pods,omitempty"` Namespace string `json:"namespace,omitempty"` JobID string `json:"job_id,omitempty"` }
type Sink ¶
type Sink struct { SinkKafka SinkInflux SinkBigquery }
type SinkBigquery ¶
type SinkBigquery struct { SinkBigqueryGoogleCloudProjectID string `json:"SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"` SinkBigqueryTableName string `json:"SINK_BIGQUERY_TABLE_NAME"` SinkBigqueryDatasetLabels string `json:"SINK_BIGQUERY_DATASET_LABELS"` SinkBigqueryTableLabels string `json:"SINK_BIGQUERY_TABLE_LABELS"` SinkBigqueryDatasetName string `json:"SINK_BIGQUERY_DATASET_NAME"` SinkBigqueryTablePartitioningEnable string `json:"SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"` SinkBigqueryTablePartitionKey string `json:"SINK_BIGQUERY_TABLE_PARTITION_KEY"` SinkBigqueryRowInsertIDEnable string `json:"SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"` SinkBigqueryClientReadTimeoutMs string `json:"SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"` SinkBigqueryClientConnectTimeoutMs string `json:"SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"` SinkBigqueryTablePartitionExpiryMs string `json:"SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"` SinkBigqueryDatasetLocation string `json:"SINK_BIGQUERY_DATASET_LOCATION"` SinkBigqueryBatchSize string `json:"SINK_BIGQUERY_BATCH_SIZE"` SinkBigqueryTableClusteringEnable string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE"` SinkBigqueryTableClusteringKeys string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_KEYS"` SinkErrorTypesForFailure string `json:"SINK_ERROR_TYPES_FOR_FAILURE"` }
type SinkInflux ¶
type SinkKafka ¶
type SinkKafka struct { SinkKafkaBrokers string `json:"SINK_KAFKA_BROKERS"` SinkKafkaStream string `json:"SINK_KAFKA_STREAM"` SinkKafkaTopic string `json:"SINK_KAFKA_TOPIC"` SinkKafkaProtoMsg string `json:"SINK_KAFKA_PROTO_MESSAGE"` SinkKafkaLingerMs string `json:"SINK_KAFKA_LINGER_MS"` SinkKafkaProtoKey string `json:"SINK_KAFKA_PROTO_KEY"` }
type Source ¶
type Source struct { InputSchemaProtoClass string `json:"INPUT_SCHEMA_PROTO_CLASS"` InputSchemaEventTimestampFieldIndex string `json:"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX"` SourceDetails []SourceDetail `json:"SOURCE_DETAILS"` InputSchemaTable string `json:"INPUT_SCHEMA_TABLE"` SourceKafka SourceParquet }
type SourceDetail ¶
type SourceKafka ¶
type SourceKafka struct { SourceKafkaConsumerConfigAutoCommitEnable string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE"` SourceKafkaConsumerConfigAutoOffsetReset string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"` SourceKafkaTopicNames string `json:"SOURCE_KAFKA_TOPIC_NAMES"` SourceKafkaName string `json:"SOURCE_KAFKA_NAME"` SourceKafkaConsumerConfigGroupID string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID"` SourceKafkaConsumerConfigBootstrapServers string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS"` }
type SourceParquet ¶
type SourceParquet struct { SourceParquetFileDateRange interface{} `json:"SOURCE_PARQUET_FILE_DATE_RANGE"` SourceParquetFilePaths []string `json:"SOURCE_PARQUET_FILE_PATHS"` }
type StartParams ¶
Click to show internal directories.
Click to hide internal directories.