dagger

package
v0.2.3-rc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SourceKafkaConsumerConfigAutoCommitEnable = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE"
	SourceKafkaConsumerConfigAutoOffsetReset  = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"
	SourceKafkaConsumerConfigBootstrapServers = "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS"
)

Kafka-related constants

View Source
const (
	SinkTypeInflux   = "INFLUX"
	SinkTypeKafka    = "KAFKA"
	SinkTypeBigquery = "BIGQUERY"
)

Sink types

View Source
const (
	JobStateRunning    = "running"
	JobStateSuspended  = "suspended"
	StateDeployed      = "DEPLOYED"
	StateUserStopped   = "USER_STOPPED"
	StateSystemStopped = "SYSTEM_STOPPED"
)
View Source
const (
	StopAction  = "stop"
	StartAction = "start"
	ResetAction = "reset"
)
View Source
const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"

Variables

View Source
var Module = module.Descriptor{
	Kind: "dagger",
	Dependencies: map[string]string{
		// contains filtered or unexported fields
	},
	Actions: []module.ActionDesc{
		{
			Name:        module.CreateAction,
			Description: "Creates a new dagger",
		},
		{
			Name:        module.UpdateAction,
			Description: "Updates an existing dagger",
		},
		{
			Name:        StopAction,
			Description: "Suspends a running dagger",
		},
		{
			Name:        StartAction,
			Description: "Starts a suspended dagger",
		},
		{
			Name:        ResetAction,
			Description: "Resets the offset of a dagger",
		},
	},
	DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) {
		conf := defaultDriverConf
		if err := json.Unmarshal(confJSON, &conf); err != nil {
			return nil, err
		} else if err := validator.TaggedStruct(conf); err != nil {
			return nil, err
		}

		return &daggerDriver{
			conf:    conf,
			timeNow: time.Now,
			kubeDeploy: func(_ context.Context, isCreate bool, kubeConf kube.Config, hc helm.ReleaseConfig) error {
				canUpdate := func(rel *release.Release) bool {
					curLabels, ok := rel.Config[labelsConfKey].(map[string]any)
					if !ok {
						return false
					}
					newLabels, ok := hc.Values[labelsConfKey].(map[string]string)
					if !ok {
						return false
					}

					isManagedByEntropy := curLabels[labelOrchestrator] == orchestratorLabelValue
					isSameDeployment := curLabels[labelDeployment] == newLabels[labelDeployment]

					return isManagedByEntropy && isSameDeployment
				}

				helmCl := helm.NewClient(&helm.Config{Kubernetes: kubeConf})
				_, errHelm := helmCl.Upsert(&hc, canUpdate)
				return errHelm
			},
			kubeGetPod: func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) {
				kubeCl, err := kube.NewClient(ctx, conf)
				if err != nil {
					return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error())
				}
				return kubeCl.GetPodDetails(ctx, ns, labels, func(pod v1.Pod) bool {

					return pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil
				})
			},
			kubeGetCRD: func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) {
				kubeCl, err := kube.NewClient(ctx, conf)
				if err != nil {
					return kube.FlinkDeploymentStatus{}, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error())
				}
				crd, err := kubeCl.GetCRDDetails(ctx, ns, name)
				if err != nil {
					return kube.FlinkDeploymentStatus{}, err
				}
				return parseFlinkCRDStatus(crd.Object)
			},
			consumerReset: consumerReset,
		}, nil
	},
}

Functions

This section is empty.

Types

type ChartValues

type ChartValues struct {
	ImageRepository string `json:"image_repository" validate:"required"`
	ImageTag        string `json:"image_tag" validate:"required"`
	ChartVersion    string `json:"chart_version" validate:"required"`
	ImagePullPolicy string `json:"image_pull_policy"`
}

type Config

type Config struct {
	Resources     Resources         `json:"resources,omitempty"`
	Source        []Source          `json:"source,omitempty"`
	Sink          Sink              `json:"sink,omitempty"`
	EnvVariables  map[string]string `json:"env_variables,omitempty"`
	Replicas      int               `json:"replicas"`
	SinkType      string            `json:"sink_type"`
	Team          string            `json:"team"`
	FlinkName     string            `json:"flink_name,omitempty"`
	DeploymentID  string            `json:"deployment_id,omitempty"`
	Savepoint     any               `json:"savepoint,omitempty"`
	ChartValues   *ChartValues      `json:"chart_values,omitempty"`
	Deleted       bool              `json:"deleted,omitempty"`
	Namespace     string            `json:"namespace,omitempty"`
	PrometheusURL string            `json:"prometheus_url,omitempty"`
	JarURI        string            `json:"jar_uri,omitempty"`
	State         string            `json:"state"`
	JobState      string            `json:"job_state"`
	ResetOffset   string            `json:"reset_offset"`
	StopTime      *time.Time        `json:"stop_time,omitempty"`
}

type FlinkCRDStatus

type FlinkCRDStatus struct {
	JobManagerDeploymentStatus string `json:"jobManagerDeploymentStatus"`
	JobStatus                  string `json:"jobStatus"`
	ReconciliationStatus       string `json:"reconciliationStatus"`
}

type Output

type Output struct {
	JMDeployStatus string     `json:"jm_deploy_status,omitempty"`
	JobStatus      string     `json:"job_status,omitempty"`
	Reconcilation  string     `json:"reconcilation,omitempty"`
	Pods           []kube.Pod `json:"pods,omitempty"`
	Namespace      string     `json:"namespace,omitempty"`
	JobID          string     `json:"job_id,omitempty"`
}

type Resources

type Resources struct {
	TaskManager UsageSpec `json:"taskmanager,omitempty"`
	JobManager  UsageSpec `json:"jobmanager,omitempty"`
}

type Sink

type Sink struct {
	SinkKafka
	SinkInflux
	SinkBigquery
}

type SinkBigquery

type SinkBigquery struct {
	SinkBigqueryGoogleCloudProjectID    string `json:"SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"`
	SinkBigqueryTableName               string `json:"SINK_BIGQUERY_TABLE_NAME"`
	SinkBigqueryDatasetLabels           string `json:"SINK_BIGQUERY_DATASET_LABELS"`
	SinkBigqueryTableLabels             string `json:"SINK_BIGQUERY_TABLE_LABELS"`
	SinkBigqueryDatasetName             string `json:"SINK_BIGQUERY_DATASET_NAME"`
	SinkBigqueryTablePartitioningEnable string `json:"SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"`
	SinkBigqueryTablePartitionKey       string `json:"SINK_BIGQUERY_TABLE_PARTITION_KEY"`
	SinkBigqueryRowInsertIDEnable       string `json:"SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"`
	SinkBigqueryClientReadTimeoutMs     string `json:"SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"`
	SinkBigqueryClientConnectTimeoutMs  string `json:"SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"`
	SinkBigqueryTablePartitionExpiryMs  string `json:"SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"`
	SinkBigqueryDatasetLocation         string `json:"SINK_BIGQUERY_DATASET_LOCATION"`
	SinkBigqueryBatchSize               string `json:"SINK_BIGQUERY_BATCH_SIZE"`
	SinkBigqueryTableClusteringEnable   string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE"`
	SinkBigqueryTableClusteringKeys     string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_KEYS"`
	SinkErrorTypesForFailure            string `json:"SINK_ERROR_TYPES_FOR_FAILURE"`
}

type SinkInflux

type SinkInflux struct {
	SinkInfluxDBName          string `json:"SINK_INFLUX_DB_NAME"`
	SinkInfluxMeasurementName string `json:"SINK_INFLUX_MEASUREMENT_NAME"`
}

type SinkKafka

type SinkKafka struct {
	SinkKafkaBrokers  string `json:"SINK_KAFKA_BROKERS"`
	SinkKafkaStream   string `json:"SINK_KAFKA_STREAM"`
	SinkKafkaTopic    string `json:"SINK_KAFKA_TOPIC"`
	SinkKafkaProtoMsg string `json:"SINK_KAFKA_PROTO_MESSAGE"`
	SinkKafkaLingerMs string `json:"SINK_KAFKA_LINGER_MS"`
	SinkKafkaProtoKey string `json:"SINK_KAFKA_PROTO_KEY"`
}

type Source

type Source struct {
	InputSchemaProtoClass               string         `json:"INPUT_SCHEMA_PROTO_CLASS"`
	InputSchemaEventTimestampFieldIndex string         `json:"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX"`
	SourceDetails                       []SourceDetail `json:"SOURCE_DETAILS"`
	InputSchemaTable                    string         `json:"INPUT_SCHEMA_TABLE"`
	SourceKafka
	SourceParquet
}

type SourceDetail

type SourceDetail struct {
	SourceName string `json:"SOURCE_NAME"`
	SourceType string `json:"SOURCE_TYPE"`
}

type SourceKafka

type SourceKafka struct {
	SourceKafkaConsumerConfigAutoCommitEnable string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE"`
	SourceKafkaConsumerConfigAutoOffsetReset  string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"`
	SourceKafkaTopicNames                     string `json:"SOURCE_KAFKA_TOPIC_NAMES"`
	SourceKafkaName                           string `json:"SOURCE_KAFKA_NAME"`
	SourceKafkaConsumerConfigGroupID          string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID"`
	SourceKafkaConsumerConfigBootstrapServers string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS"`
}

type SourceParquet

type SourceParquet struct {
	SourceParquetFileDateRange interface{} `json:"SOURCE_PARQUET_FILE_DATE_RANGE"`
	SourceParquetFilePaths     []string    `json:"SOURCE_PARQUET_FILE_PATHS"`
}

type StartParams

type StartParams struct {
	StopTime *time.Time `json:"stop_time"`
}

type UsageSpec

type UsageSpec struct {
	CPU    string `json:"cpu,omitempty" validate:"required"`
	Memory string `json:"memory,omitempty" validate:"required"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL