flink

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2020 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JobManagerDefaultReplicaCount  = 1
	TaskManagerDefaultSlots        = 16
	RPCDefaultPort                 = 6123
	QueryDefaultPort               = 6124
	BlobDefaultPort                = 6125
	UIDefaultPort                  = 8081
	MetricsQueryDefaultPort        = 50101
	OffHeapMemoryDefaultFraction   = 0.5
	HighAvailabilityKey            = "high-availability"
	MaxCheckpointRestoreAgeSeconds = 3600
)
View Source
const (
	AppName                          = "APP_NAME"
	AwsMetadataServiceTimeoutKey     = "AWS_METADATA_SERVICE_TIMEOUT"
	AwsMetadataServiceNumAttemptsKey = "AWS_METADATA_SERVICE_NUM_ATTEMPTS"
	AwsMetadataServiceTimeout        = "5"
	AwsMetadataServiceNumAttempts    = "20"
	OperatorFlinkConfig              = "FLINK_PROPERTIES"
	HostName                         = "HOST_NAME"
	HostIP                           = "HOST_IP"
	FlinkDeploymentTypeEnv           = "FLINK_DEPLOYMENT_TYPE"
	FlinkDeploymentType              = "flink-deployment-type"
	FlinkDeploymentTypeJobmanager    = "jobmanager"
	FlinkDeploymentTypeTaskmanager   = "taskmanager"
	FlinkAppHash                     = "flink-app-hash"
	FlinkJobProperties               = "flink-job-properties"
	RestartNonce                     = "restart-nonce"
	FlinkApplicationVersionEnv       = "FLINK_APPLICATION_VERSION"
	FlinkApplicationVersion          = "flink-application-version"
)
View Source
const (
	JobManagerNameFormat                = "%s-%s-jm"
	JobManagerVersionNameFormat         = "%s-%s-%s-jm"
	JobManagerPodNameFormat             = "%s-%s-jm-pod"
	JobManagerServiceName               = "%s"
	JobManagerVersionServiceName        = "%s-%s"
	JobManagerContainerName             = "jobmanager"
	JobManagerArg                       = "jobmanager"
	JobManagerReadinessPath             = "/overview"
	JobManagerReadinessInitialDelaySec  = 10
	JobManagerReadinessTimeoutSec       = 1
	JobManagerReadinessSuccessThreshold = 1
	JobManagerReadinessFailureThreshold = 2
	JobManagerReadinessPeriodSec        = 5
)
View Source
const (
	FlinkRPCPortName            = "rpc"
	FlinkQueryPortName          = "query"
	FlinkBlobPortName           = "blob"
	FlinkUIPortName             = "ui"
	FlinkInternalMetricPortName = "metrics"
)
View Source
const (
	TaskManagerNameFormat        = "%s-%s-tm"
	TaskManagerVersionNameFormat = "%s-%s-%s-tm"
	TaskManagerPodNameFormat     = "%s-%s-tm-pod"
	TaskManagerContainerName     = "taskmanager"
	TaskManagerArg               = "taskmanager"
	TaskManagerHostnameEnvVar    = "TASKMANAGER_HOSTNAME"
)
View Source
const AppIngressName = "%s-%s"

Variables

Functions

func ComputeDeploymentHash added in v0.2.0

func ComputeDeploymentHash(deployment appsv1.Deployment) ([]byte, error)

Generate a deterministic hash in bytes for the pb object

func DeploymentIsJobmanager

func DeploymentIsJobmanager(deployment *v1.Deployment) bool

func DeploymentIsTaskmanager

func DeploymentIsTaskmanager(deployment *v1.Deployment) bool

func FetchJobManagerContainerObj

func FetchJobManagerContainerObj(application *v1beta1.FlinkApplication) *coreV1.Container

func FetchJobManagerIngressCreateObj

func FetchJobManagerIngressCreateObj(app *flinkapp.FlinkApplication) *v1beta1.Ingress

func FetchJobManagerServiceCreateObj

func FetchJobManagerServiceCreateObj(app *v1beta1.FlinkApplication, hash string) *coreV1.Service

func FetchJobMangerDeploymentCreateObj

func FetchJobMangerDeploymentCreateObj(app *v1beta1.FlinkApplication, hash string) *v1.Deployment

func FetchTaskManagerContainerObj

func FetchTaskManagerContainerObj(application *v1beta1.FlinkApplication) *coreV1.Container

func FetchTaskMangerDeploymentCreateObj

func FetchTaskMangerDeploymentCreateObj(app *v1beta1.FlinkApplication, hash string) *v1.Deployment

func GetAWSServiceEnv

func GetAWSServiceEnv() []v1.EnvVar

func GetActiveFlinkJobs added in v0.3.0

func GetActiveFlinkJobs(jobs []client.FlinkJob) []client.FlinkJob

func GetDeploySpecificEnv added in v0.5.0

func GetDeploySpecificEnv(app *v1beta1.FlinkApplication) []v1.EnvVar

Injects labels and environment variables required for blue green deploys

func GetFlinkContainerEnv

func GetFlinkContainerEnv(app *v1beta1.FlinkApplication) []v1.EnvVar

func GetFlinkUIIngressURL

func GetFlinkUIIngressURL(jobName string) string

func GetTaskManagerPorts

func GetTaskManagerPorts(app *v1beta1.FlinkApplication) []coreV1.ContainerPort

func HashForApplication

func HashForApplication(app *v1beta1.FlinkApplication) string

Returns an 8 character hash sensitive to the application name, labels, annotations, and spec. TODO: we may need to add collision-avoidance to this

func ImagePullPolicy

func ImagePullPolicy(app *v1beta1.FlinkApplication) v1.PullPolicy

func InjectOperatorCustomizedConfig added in v0.1.3

func InjectOperatorCustomizedConfig(deployment *appsv1.Deployment, app *v1beta1.FlinkApplication, hash string, deploymentType string)

func JobManagerDeploymentMatches

func JobManagerDeploymentMatches(deployment *v1.Deployment, application *v1beta1.FlinkApplication, hash string) bool

func Min added in v0.5.0

func Min(x, y int32) int32

func ReplaceJobURL

func ReplaceJobURL(value string, input string) string

func TaskManagerDeploymentMatches

func TaskManagerDeploymentMatches(deployment *v1.Deployment, application *v1beta1.FlinkApplication, hash string) bool

func VersionedJobManagerServiceName added in v0.1.2

func VersionedJobManagerServiceName(app *v1beta1.FlinkApplication, hash string) string

Types

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

func (*Controller) CompareAndUpdateClusterStatus

func (f *Controller) CompareAndUpdateClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error)

Gets and updates the cluster status

func (*Controller) CompareAndUpdateJobStatus

func (f *Controller) CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (bool, error)

func (*Controller) CreateCluster

func (f *Controller) CreateCluster(ctx context.Context, application *v1beta1.FlinkApplication) error

func (*Controller) DeleteOldResourcesForApp added in v0.1.2

func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1beta1.FlinkApplication) error

func (*Controller) DeleteResourcesForAppWithHash added in v0.5.0

func (f *Controller) DeleteResourcesForAppWithHash(ctx context.Context, app *v1beta1.FlinkApplication, hash string) error

func (*Controller) DeleteStatusPostTeardown added in v0.5.0

func (f *Controller) DeleteStatusPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication, hash string)

func (*Controller) FindExternalizedCheckpoint

func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error)

func (*Controller) ForceCancel

func (f *Controller) ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error

func (*Controller) GetCurrentDeploymentsForApp added in v0.1.2

func (f *Controller) GetCurrentDeploymentsForApp(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error)

Gets the current deployment and any other deployments for the application. The current deployment will be the one that matches the FlinkApplication, unless the FailedDeployHash is set, in which case it will be the one with that hash.

func (*Controller) GetJobForApplication added in v0.3.0

func (f *Controller) GetJobForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error)

func (*Controller) GetJobToDeleteForApplication added in v0.5.0

func (f *Controller) GetJobToDeleteForApplication(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error)

func (*Controller) GetJobsForApplication

func (f *Controller) GetJobsForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error)

func (*Controller) GetLatestClusterStatus added in v0.5.0

func (f *Controller) GetLatestClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus

func (*Controller) GetLatestJobID added in v0.5.0

func (f *Controller) GetLatestJobID(ctx context.Context, application *v1beta1.FlinkApplication) string

func (*Controller) GetLatestJobStatus added in v0.5.0

func (f *Controller) GetLatestJobStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkJobStatus

func (*Controller) GetSavepointStatus

func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error)

func (*Controller) GetVersionAndHashPostTeardown added in v0.5.0

func (f *Controller) GetVersionAndHashPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication) (v1beta1.FlinkApplicationVersion, string)

func (*Controller) GetVersionAndJobIDForHash added in v0.5.0

func (f *Controller) GetVersionAndJobIDForHash(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (string, string, error)

func (*Controller) IsClusterReady

func (f *Controller) IsClusterReady(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)

func (*Controller) IsServiceReady

func (f *Controller) IsServiceReady(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error)

func (*Controller) LogEvent

func (f *Controller) LogEvent(ctx context.Context, app *v1beta1.FlinkApplication, eventType string, reason string, message string)

func (*Controller) Savepoint added in v0.5.0

func (f *Controller) Savepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (string, error)

func (*Controller) StartFlinkJob

func (f *Controller) StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string,
	jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool,
	savepointPath string) (string, error)

func (*Controller) UpdateLatestClusterStatus added in v0.5.0

func (f *Controller) UpdateLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication, clusterStatus v1beta1.FlinkClusterStatus)

func (*Controller) UpdateLatestJobID added in v0.5.0

func (f *Controller) UpdateLatestJobID(ctx context.Context, app *v1beta1.FlinkApplication, jobID string)

func (*Controller) UpdateLatestJobStatus added in v0.5.0

func (f *Controller) UpdateLatestJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkJobStatus)

func (*Controller) UpdateLatestVersionAndHash added in v0.5.0

func (f *Controller) UpdateLatestVersionAndHash(application *v1beta1.FlinkApplication, version v1beta1.FlinkApplicationVersion, hash string)

type ControllerInterface

type ControllerInterface interface {
	// Creates a Flink cluster with necessary Job Manager, Task Managers and services for UI
	CreateCluster(ctx context.Context, application *v1beta1.FlinkApplication) error

	// Cancels the running/active jobs in the Cluster for the Application after savepoint is created
	Savepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string, isCancel bool, jobID string) (string, error)

	// Force cancels the running/active job without taking a savepoint
	ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) error

	// Starts the Job in the Flink Cluster
	StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string,
		jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool,
		savepointPath string) (string, error)

	// Savepoint creation is asynchronous.
	// Polls the status of the Savepoint, using the triggerID
	GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string, jobID string) (*client.SavepointResponse, error)

	// Check if the Flink Kubernetes Cluster is Ready.
	// Checks if all the pods of task and job managers are ready.
	IsClusterReady(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)

	// Checks to see if the Flink Cluster is ready to handle API requests
	IsServiceReady(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error)

	// Returns the list of Jobs running on the Flink Cluster for the Application
	GetJobsForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error)

	// Returns the current job for the application, if one exists in the cluster
	GetJobForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error)

	// Returns the pair of deployments (tm/jm) for the current version of the application
	GetCurrentDeploymentsForApp(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error)

	// Deletes all old resources (deployments and services) for the app
	DeleteOldResourcesForApp(ctx context.Context, app *v1beta1.FlinkApplication) error

	// Attempts to find an externalized checkpoint for the job. This can be used to recover an application that is not
	// able to savepoint for some reason.
	FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error)

	// Logs an event to the FlinkApplication resource and to the operator log
	LogEvent(ctx context.Context, app *v1beta1.FlinkApplication, eventType string, reason string, message string)

	// Compares and updates new cluster status with current cluster status
	// Returns true if there is a change in ClusterStatus
	CompareAndUpdateClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error)

	// Compares and updates new job status with current job status
	// Returns true if there is a change in JobStatus
	CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (bool, error)

	// Gets the last updated cluster status
	GetLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus

	// Gets the last updated job status
	GetLatestJobStatus(ctx context.Context, app *v1beta1.FlinkApplication) v1beta1.FlinkJobStatus

	// Gets the last updated job ID
	GetLatestJobID(ctx context.Context, app *v1beta1.FlinkApplication) string

	// Updates the jobID on the latest jobStatus
	UpdateLatestJobID(ctx context.Context, app *v1beta1.FlinkApplication, jobID string)

	// Update jobStatus on the latest VersionStatuses
	UpdateLatestJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkJobStatus)

	// Update clusterStatus on the latest VersionStatuses
	UpdateLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkClusterStatus)

	// Update Version and Hash for application
	UpdateLatestVersionAndHash(application *v1beta1.FlinkApplication, version v1beta1.FlinkApplicationVersion, hash string)

	// Delete Resources with Hash
	DeleteResourcesForAppWithHash(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error
	// Delete status for torn down cluster/job
	DeleteStatusPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication, hash string)
	// Get job given hash
	GetJobToDeleteForApplication(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error)
	// Get hash given the version
	GetVersionAndJobIDForHash(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, string, error)
	// Get version and hash after teardown is complete
	GetVersionAndHashPostTeardown(ctx context.Context, application *v1beta1.FlinkApplication) (v1beta1.FlinkApplicationVersion, string)
}

Interface to manage Flink Application in Kubernetes

func NewController

func NewController(k8sCluster k8.ClusterInterface, eventRecorder record.EventRecorder, config controllerConfig.RuntimeConfig) ControllerInterface

type JobManagerController

type JobManagerController struct {
	// contains filtered or unexported fields
}

func (*JobManagerController) CreateIfNotExist

func (j *JobManagerController) CreateIfNotExist(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)

type JobManagerControllerInterface

type JobManagerControllerInterface interface {
	CreateIfNotExist(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)
}

func NewJobManagerController

func NewJobManagerController(k8sCluster k8.ClusterInterface, config config.RuntimeConfig) JobManagerControllerInterface

type TaskManagerController

type TaskManagerController struct {
	// contains filtered or unexported fields
}

func (*TaskManagerController) CreateIfNotExist

func (t *TaskManagerController) CreateIfNotExist(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)

type TaskManagerControllerInterface

type TaskManagerControllerInterface interface {
	CreateIfNotExist(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)
}

func NewTaskManagerController

func NewTaskManagerController(k8sCluster k8.ClusterInterface, config config.RuntimeConfig) TaskManagerControllerInterface

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL