Documentation ¶
Index ¶
- Constants
- Variables
- func ComputeDeploymentHash(deployment appsv1.Deployment) ([]byte, error)
- func DeploymentIsJobmanager(deployment *v1.Deployment) bool
- func DeploymentIsTaskmanager(deployment *v1.Deployment) bool
- func FetchJobManagerContainerObj(application *v1beta1.FlinkApplication) *coreV1.Container
- func FetchJobManagerIngressCreateObj(app *flinkapp.FlinkApplication) *v1beta1.Ingress
- func FetchJobManagerServiceCreateObj(app *v1beta1.FlinkApplication, hash string) *coreV1.Service
- func FetchJobMangerDeploymentCreateObj(app *v1beta1.FlinkApplication, hash string) *v1.Deployment
- func FetchTaskManagerContainerObj(application *v1beta1.FlinkApplication) *coreV1.Container
- func FetchTaskMangerDeploymentCreateObj(app *v1beta1.FlinkApplication, hash string) *v1.Deployment
- func GetAWSServiceEnv() []v1.EnvVar
- func GetActiveFlinkJobs(jobs []client.FlinkJob) []client.FlinkJob
- func GetDeploySpecificEnv(app *v1beta1.FlinkApplication) []v1.EnvVar
- func GetFlinkContainerEnv(app *v1beta1.FlinkApplication) []v1.EnvVar
- func GetFlinkUIIngressURL(jobName string) string
- func GetTaskManagerPorts(app *v1beta1.FlinkApplication) []coreV1.ContainerPort
- func HashForApplication(app *v1beta1.FlinkApplication) string
- func ImagePullPolicy(app *v1beta1.FlinkApplication) v1.PullPolicy
- func InjectOperatorCustomizedConfig(deployment *appsv1.Deployment, app *v1beta1.FlinkApplication, hash string, ...)
- func JobManagerDeploymentMatches(deployment *v1.Deployment, application *v1beta1.FlinkApplication, hash string) bool
- func Min(x, y int32) int32
- func ReplaceJobURL(value string, input string) string
- func TaskManagerDeploymentMatches(deployment *v1.Deployment, application *v1beta1.FlinkApplication, hash string) bool
- func VersionedJobManagerServiceName(app *v1beta1.FlinkApplication, hash string) string
- type Controller
- func (f *Controller) CompareAndUpdateClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error)
- func (f *Controller) CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (bool, error)
- func (f *Controller) CreateCluster(ctx context.Context, application *v1beta1.FlinkApplication) error
- func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1beta1.FlinkApplication) error
- func (f *Controller) DeleteResourcesForAppWithHash(ctx context.Context, app *v1beta1.FlinkApplication, hash string) error
- func (f *Controller) DeleteStatusPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication, hash string)
- func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error)
- func (f *Controller) ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string, ...) error
- func (f *Controller) GetCurrentDeploymentsForApp(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error)
- func (f *Controller) GetJobForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error)
- func (f *Controller) GetJobToDeleteForApplication(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error)
- func (f *Controller) GetJobsForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error)
- func (f *Controller) GetLatestClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus
- func (f *Controller) GetLatestJobID(ctx context.Context, application *v1beta1.FlinkApplication) string
- func (f *Controller) GetLatestJobStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkJobStatus
- func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string, ...) (*client.SavepointResponse, error)
- func (f *Controller) GetVersionAndHashPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication) (v1beta1.FlinkApplicationVersion, string)
- func (f *Controller) GetVersionAndJobIDForHash(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (string, string, error)
- func (f *Controller) IsClusterReady(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)
- func (f *Controller) IsServiceReady(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error)
- func (f *Controller) LogEvent(ctx context.Context, app *v1beta1.FlinkApplication, eventType string, ...)
- func (f *Controller) Savepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string, ...) (string, error)
- func (f *Controller) StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string, ...) (string, error)
- func (f *Controller) UpdateLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication, ...)
- func (f *Controller) UpdateLatestJobID(ctx context.Context, app *v1beta1.FlinkApplication, jobID string)
- func (f *Controller) UpdateLatestJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, ...)
- func (f *Controller) UpdateLatestVersionAndHash(application *v1beta1.FlinkApplication, version v1beta1.FlinkApplicationVersion, ...)
- type ControllerInterface
- type JobManagerController
- type JobManagerControllerInterface
- type TaskManagerController
- type TaskManagerControllerInterface
Constants ¶
View Source
const ( JobManagerDefaultReplicaCount = 1 TaskManagerDefaultSlots = 16 RPCDefaultPort = 6123 QueryDefaultPort = 6124 BlobDefaultPort = 6125 UIDefaultPort = 8081 MetricsQueryDefaultPort = 50101 OffHeapMemoryDefaultFraction = 0.5 HighAvailabilityKey = "high-availability" MaxCheckpointRestoreAgeSeconds = 3600 )
View Source
const ( AppName = "APP_NAME" AwsMetadataServiceTimeoutKey = "AWS_METADATA_SERVICE_TIMEOUT" AwsMetadataServiceNumAttemptsKey = "AWS_METADATA_SERVICE_NUM_ATTEMPTS" AwsMetadataServiceTimeout = "5" AwsMetadataServiceNumAttempts = "20" OperatorFlinkConfig = "FLINK_PROPERTIES" HostName = "HOST_NAME" HostIP = "HOST_IP" FlinkDeploymentTypeEnv = "FLINK_DEPLOYMENT_TYPE" FlinkDeploymentType = "flink-deployment-type" FlinkDeploymentTypeJobmanager = "jobmanager" FlinkDeploymentTypeTaskmanager = "taskmanager" FlinkAppHash = "flink-app-hash" FlinkJobProperties = "flink-job-properties" RestartNonce = "restart-nonce" FlinkApplicationVersionEnv = "FLINK_APPLICATION_VERSION" FlinkApplicationVersion = "flink-application-version" )
View Source
const ( JobManagerNameFormat = "%s-%s-jm" JobManagerVersionNameFormat = "%s-%s-%s-jm" JobManagerPodNameFormat = "%s-%s-jm-pod" JobManagerServiceName = "%s" JobManagerVersionServiceName = "%s-%s" JobManagerContainerName = "jobmanager" JobManagerArg = "jobmanager" JobManagerReadinessPath = "/overview" JobManagerReadinessInitialDelaySec = 10 JobManagerReadinessTimeoutSec = 1 JobManagerReadinessSuccessThreshold = 1 JobManagerReadinessFailureThreshold = 2 JobManagerReadinessPeriodSec = 5 )
View Source
const ( FlinkRPCPortName = "rpc" FlinkQueryPortName = "query" FlinkBlobPortName = "blob" FlinkUIPortName = "ui" FlinkInternalMetricPortName = "metrics" )
View Source
const ( TaskManagerNameFormat = "%s-%s-tm" TaskManagerVersionNameFormat = "%s-%s-%s-tm" TaskManagerPodNameFormat = "%s-%s-tm-pod" TaskManagerContainerName = "taskmanager" TaskManagerArg = "taskmanager" TaskManagerHostnameEnvVar = "TASKMANAGER_HOSTNAME" )
View Source
const AppIngressName = "%s-%s"
Variables ¶
View Source
var JobManagerDefaultResources = coreV1.ResourceRequirements{ Requests: coreV1.ResourceList{ coreV1.ResourceCPU: resource.MustParse("4"), coreV1.ResourceMemory: resource.MustParse("3072Mi"), }, Limits: coreV1.ResourceList{ coreV1.ResourceCPU: resource.MustParse("4"), coreV1.ResourceMemory: resource.MustParse("3072Mi"), }, }
View Source
var TaskManagerDefaultResources = coreV1.ResourceRequirements{ Requests: coreV1.ResourceList{ coreV1.ResourceCPU: resource.MustParse("2"), coreV1.ResourceMemory: resource.MustParse("1024Mi"), }, Limits: coreV1.ResourceList{ coreV1.ResourceCPU: resource.MustParse("2"), coreV1.ResourceMemory: resource.MustParse("1024Mi"), }, }
Functions ¶
func ComputeDeploymentHash ¶ added in v0.2.0
func ComputeDeploymentHash(deployment appsv1.Deployment) ([]byte, error)
Generate a deterministic hash in bytes for the pb object
func DeploymentIsJobmanager ¶
func DeploymentIsJobmanager(deployment *v1.Deployment) bool
func DeploymentIsTaskmanager ¶
func DeploymentIsTaskmanager(deployment *v1.Deployment) bool
func FetchJobManagerContainerObj ¶
func FetchJobManagerContainerObj(application *v1beta1.FlinkApplication) *coreV1.Container
func FetchJobManagerIngressCreateObj ¶
func FetchJobManagerIngressCreateObj(app *flinkapp.FlinkApplication) *v1beta1.Ingress
func FetchJobManagerServiceCreateObj ¶
func FetchJobManagerServiceCreateObj(app *v1beta1.FlinkApplication, hash string) *coreV1.Service
func FetchJobMangerDeploymentCreateObj ¶
func FetchJobMangerDeploymentCreateObj(app *v1beta1.FlinkApplication, hash string) *v1.Deployment
func FetchTaskManagerContainerObj ¶
func FetchTaskManagerContainerObj(application *v1beta1.FlinkApplication) *coreV1.Container
func FetchTaskMangerDeploymentCreateObj ¶
func FetchTaskMangerDeploymentCreateObj(app *v1beta1.FlinkApplication, hash string) *v1.Deployment
func GetAWSServiceEnv ¶
func GetActiveFlinkJobs ¶ added in v0.3.0
func GetDeploySpecificEnv ¶ added in v0.5.0
func GetDeploySpecificEnv(app *v1beta1.FlinkApplication) []v1.EnvVar
Injects labels and environment variables required for blue green deploys
func GetFlinkContainerEnv ¶
func GetFlinkContainerEnv(app *v1beta1.FlinkApplication) []v1.EnvVar
func GetFlinkUIIngressURL ¶
func GetTaskManagerPorts ¶
func GetTaskManagerPorts(app *v1beta1.FlinkApplication) []coreV1.ContainerPort
func HashForApplication ¶
func HashForApplication(app *v1beta1.FlinkApplication) string
Returns an 8 character hash sensitive to the application name, labels, annotations, and spec. TODO: we may need to add collision-avoidance to this
func ImagePullPolicy ¶
func ImagePullPolicy(app *v1beta1.FlinkApplication) v1.PullPolicy
func InjectOperatorCustomizedConfig ¶ added in v0.1.3
func InjectOperatorCustomizedConfig(deployment *appsv1.Deployment, app *v1beta1.FlinkApplication, hash string, deploymentType string)
func JobManagerDeploymentMatches ¶
func JobManagerDeploymentMatches(deployment *v1.Deployment, application *v1beta1.FlinkApplication, hash string) bool
func ReplaceJobURL ¶
func TaskManagerDeploymentMatches ¶
func TaskManagerDeploymentMatches(deployment *v1.Deployment, application *v1beta1.FlinkApplication, hash string) bool
func VersionedJobManagerServiceName ¶ added in v0.1.2
func VersionedJobManagerServiceName(app *v1beta1.FlinkApplication, hash string) string
Types ¶
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
func (*Controller) CompareAndUpdateClusterStatus ¶
func (f *Controller) CompareAndUpdateClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error)
Gets and updates the cluster status
func (*Controller) CompareAndUpdateJobStatus ¶
func (f *Controller) CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (bool, error)
func (*Controller) CreateCluster ¶
func (f *Controller) CreateCluster(ctx context.Context, application *v1beta1.FlinkApplication) error
func (*Controller) DeleteOldResourcesForApp ¶ added in v0.1.2
func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1beta1.FlinkApplication) error
func (*Controller) DeleteResourcesForAppWithHash ¶ added in v0.5.0
func (f *Controller) DeleteResourcesForAppWithHash(ctx context.Context, app *v1beta1.FlinkApplication, hash string) error
func (*Controller) DeleteStatusPostTeardown ¶ added in v0.5.0
func (f *Controller) DeleteStatusPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication, hash string)
func (*Controller) FindExternalizedCheckpoint ¶
func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error)
func (*Controller) ForceCancel ¶
func (f *Controller) ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error
func (*Controller) GetCurrentDeploymentsForApp ¶ added in v0.1.2
func (f *Controller) GetCurrentDeploymentsForApp(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error)
Gets the current deployment and any other deployments for the application. The current deployment will be the one that matches the FlinkApplication, unless the FailedDeployHash is set, in which case it will be the one with that hash.
func (*Controller) GetJobForApplication ¶ added in v0.3.0
func (f *Controller) GetJobForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error)
func (*Controller) GetJobToDeleteForApplication ¶ added in v0.5.0
func (f *Controller) GetJobToDeleteForApplication(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error)
func (*Controller) GetJobsForApplication ¶
func (f *Controller) GetJobsForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error)
func (*Controller) GetLatestClusterStatus ¶ added in v0.5.0
func (f *Controller) GetLatestClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus
func (*Controller) GetLatestJobID ¶ added in v0.5.0
func (f *Controller) GetLatestJobID(ctx context.Context, application *v1beta1.FlinkApplication) string
func (*Controller) GetLatestJobStatus ¶ added in v0.5.0
func (f *Controller) GetLatestJobStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkJobStatus
func (*Controller) GetSavepointStatus ¶
func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error)
func (*Controller) GetVersionAndHashPostTeardown ¶ added in v0.5.0
func (f *Controller) GetVersionAndHashPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication) (v1beta1.FlinkApplicationVersion, string)
func (*Controller) GetVersionAndJobIDForHash ¶ added in v0.5.0
func (f *Controller) GetVersionAndJobIDForHash(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (string, string, error)
func (*Controller) IsClusterReady ¶
func (f *Controller) IsClusterReady(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)
func (*Controller) IsServiceReady ¶
func (f *Controller) IsServiceReady(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error)
func (*Controller) LogEvent ¶
func (f *Controller) LogEvent(ctx context.Context, app *v1beta1.FlinkApplication, eventType string, reason string, message string)
func (*Controller) Savepoint ¶ added in v0.5.0
func (f *Controller) Savepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (string, error)
func (*Controller) StartFlinkJob ¶
func (*Controller) UpdateLatestClusterStatus ¶ added in v0.5.0
func (f *Controller) UpdateLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication, clusterStatus v1beta1.FlinkClusterStatus)
func (*Controller) UpdateLatestJobID ¶ added in v0.5.0
func (f *Controller) UpdateLatestJobID(ctx context.Context, app *v1beta1.FlinkApplication, jobID string)
func (*Controller) UpdateLatestJobStatus ¶ added in v0.5.0
func (f *Controller) UpdateLatestJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkJobStatus)
func (*Controller) UpdateLatestVersionAndHash ¶ added in v0.5.0
func (f *Controller) UpdateLatestVersionAndHash(application *v1beta1.FlinkApplication, version v1beta1.FlinkApplicationVersion, hash string)
type ControllerInterface ¶
type ControllerInterface interface { // Creates a Flink cluster with necessary Job Manager, Task Managers and services for UI CreateCluster(ctx context.Context, application *v1beta1.FlinkApplication) error // Cancels the running/active jobs in the Cluster for the Application after savepoint is created Savepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (string, error) // Force cancels the running/active job without taking a savepoint ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error // Starts the Job in the Flink Cluster StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) // Savepoint creation is asynchronous. // Polls the status of the Savepoint, using the triggerID GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error) // Check if the Flink Kubernetes Cluster is Ready. // Checks if all the pods of task and job managers are ready. IsClusterReady(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) // Checks to see if the Flink Cluster is ready to handle API requests IsServiceReady(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) // Returns the list of Jobs running on the Flink Cluster for the Application GetJobsForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error) // Returns the current job for the application, if one exists in the cluster GetJobForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) // Returns the pair of deployments (tm/jm) for the current version of the application GetCurrentDeploymentsForApp(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error) // Deletes all old resources (deployments and services) for the app DeleteOldResourcesForApp(ctx context.Context, app *v1beta1.FlinkApplication) error // Attempts to find an externalized checkpoint for the job. This can be used to recover an application that is not // able to savepoint for some reason. FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) // Logs an event to the FlinkApplication resource and to the operator log LogEvent(ctx context.Context, app *v1beta1.FlinkApplication, eventType string, reason string, message string) // Compares and updates new cluster status with current cluster status // Returns true if there is a change in ClusterStatus CompareAndUpdateClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) // Compares and updates new job status with current job status // Returns true if there is a change in JobStatus CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (bool, error) // Gets the last updated cluster status GetLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus // Gets the last updated job status GetLatestJobStatus(ctx context.Context, app *v1beta1.FlinkApplication) v1beta1.FlinkJobStatus // Gets the last updated job ID GetLatestJobID(ctx context.Context, app *v1beta1.FlinkApplication) string // Updates the jobID on the latest jobStatus UpdateLatestJobID(ctx context.Context, app *v1beta1.FlinkApplication, jobID string) // Update jobStatus on the latest VersionStatuses UpdateLatestJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkJobStatus) // Update clusterStatus on the latest VersionStatuses UpdateLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkClusterStatus) // Update Version and Hash for application UpdateLatestVersionAndHash(application *v1beta1.FlinkApplication, version v1beta1.FlinkApplicationVersion, hash string) // Delete Resources with Hash DeleteResourcesForAppWithHash(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error // Delete status for torn down cluster/job DeleteStatusPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication, hash string) // Get job given hash GetJobToDeleteForApplication(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) // Get hash given the version GetVersionAndJobIDForHash(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, string, error) // Get version and hash after teardown is complete GetVersionAndHashPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication) (v1beta1.FlinkApplicationVersion, string) }
Interface to manage Flink Application in Kubernetes
func NewController ¶
func NewController(k8sCluster k8.ClusterInterface, eventRecorder record.EventRecorder, config controllerConfig.RuntimeConfig) ControllerInterface
type JobManagerController ¶
type JobManagerController struct {
// contains filtered or unexported fields
}
func (*JobManagerController) CreateIfNotExist ¶
func (j *JobManagerController) CreateIfNotExist(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)
type JobManagerControllerInterface ¶
type JobManagerControllerInterface interface {
CreateIfNotExist(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)
}
func NewJobManagerController ¶
func NewJobManagerController(k8sCluster k8.ClusterInterface, config config.RuntimeConfig) JobManagerControllerInterface
type TaskManagerController ¶
type TaskManagerController struct {
// contains filtered or unexported fields
}
func (*TaskManagerController) CreateIfNotExist ¶
func (t *TaskManagerController) CreateIfNotExist(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)
type TaskManagerControllerInterface ¶
type TaskManagerControllerInterface interface {
CreateIfNotExist(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)
}
func NewTaskManagerController ¶
func NewTaskManagerController(k8sCluster k8.ClusterInterface, config config.RuntimeConfig) TaskManagerControllerInterface
Source Files ¶
Click to show internal directories.
Click to hide internal directories.