worker

package
v0.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2024 License: BSD-3-Clause Imports: 25 Imported by: 0

Documentation

Overview

Package worker provides a set of tools designed to facilitate the interaction with Kubernetes resources from within a cluster. It offers a convenient abstraction for managing Kubernetes operations, focusing on pod health checks, pod labeling, structured logging, scaling deployments, updating deployment images, creating PersistentVolumeClaims (PVCs), updating network policies, and task configuration through YAML or JSON files.

The package is intended for applications running as pods within Kubernetes clusters and leverages in-cluster configuration to establish a clientset for API interactions.

Enhancements in the latest version:

  • Retry policies have been introduced to provide a robust mechanism for handling transient failures when performing Kubernetes operations. The RetryPolicy struct allows for configuring the maximum number of retries and the delay between attempts, ensuring that temporary issues can be overcome without manual intervention.

  • Structured logging has been integrated throughout the package, providing clear and consistent logging messages that are easier to read and debug. Logging now includes emojis for quick visual parsing and additional context such as task names and worker indices.

  • The dynamic task execution model allows for registering and retrieving TaskRunner implementations based on task types. This extensibility makes it possible to easily add new task handling logic without modifying the core package code.

  • Pod Labeling Logic has been optimized to check existing labels and only update when necessary, reducing API calls and improving performance. It also includes retry logic to handle intermittent API errors.

  • Configuration loading from YAML files has been added, enhancing the flexibility and configurability of task management within the worker processes.

  • Scaling deployments is now supported with functions that allow for adjusting the number of replicas with retry logic to handle conflicts.

  • Updating deployment images has been introduced, enabling the change of container images within deployments. This includes handling retries on update conflicts and reporting the outcome of the operation.

  • Creation of PersistentVolumeClaims (PVCs) is now supported, allowing for dynamic provisioning of storage resources within the cluster. The process is logged with emojis to indicate success or failure, improving the visibility of the operation's outcome.

  • Network Policy updates can now be handled, allowing for the modification of network policies within the cluster to manage traffic flow between pods/services.

Functions

  • NewKubernetesClient: Creates a new Kubernetes clientset configured for in-cluster communication with the Kubernetes API server.

  • CrewWorker: Orchestrates a worker process to perform tasks such as health checks, labeling of pods, scaling deployments, updating deployment images, creating PVCs, updating network policies, and other configurable tasks within a specified namespace. It includes retry logic to handle transient errors and respects cancellation and timeout contexts. Structured logging is used to provide detailed contextual information, now with emojis for better visual cues.

  • LoadTasksFromYAML: Loads task configurations from a YAML file, allowing for dynamic task management based on external configuration.

  • CrewGetPods: Retrieves all pods within a given namespace, logging the attempt and outcome of the operation, now with emojis for quick status recognition.

  • CrewProcessPods: Iterates over a collection of pods, assessing their health, updating labels, and reporting the status to a results channel. It also handles context cancellation.

  • CrewCheckingisPodHealthy: Evaluates the health of a pod based on its phase and container readiness statuses.

  • CrewLabelPods: Updates the labels on a pod, if necessary, based on the provided labeling rules and specifications.

  • CrewScaleDeployment: Scales a Kubernetes deployment to a specified number of replicas, with retries on conflicts. It provides detailed logs and returns success or failure messages through a results channel.

  • UpdateDeploymentImage: Updates the image of a specified container within a deployment, handling retries on conflicts and reporting the outcome through a results channel.

  • CrewCreatePVCStorage: Creates a PersistentVolumeClaim in the specified namespace, allowing for storage provisioning according to the parameters provided. The creation process is logged with success and error emojis for immediate feedback.

  • CrewUpdateNetworkPolicy: Updates a Kubernetes NetworkPolicy based on the provided parameters, handling retries on conflicts and reporting the outcome through a results channel. It logs the update process with structured logging, including emojis for visual cues.

Usage:

Initialize the Kubernetes client using NewKubernetesClient, then leverage the client to perform operations such as retrieving and processing pods within a namespace, scaling deployments, updating deployment images, creating PVCs, updating network policies, and more as required. Contexts are used to manage the lifecycle of the worker processes, including graceful shutdowns and cancellation. Task configurations can be loaded from a YAML file for enhanced flexibility.

Example:

clientset, err := worker.NewKubernetesClient()
if err != nil {
    // Handle error
}
namespace := "default" // Replace with your namespace
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Ensure cancellation is called to free resources

tasks, err := worker.LoadTasksFromYAML("tasks.yaml")
if err != nil {
    // Handle error
}

resultsChan := make(chan string)
go worker.CrewWorker(ctx, clientset, namespace, tasks, resultsChan)

// Process results as they come in
for result := range resultsChan {
    fmt.Println(result)
}

Enhancements

  • The package now includes structured logging capabilities, enhanced with emojis, improving traceability and aiding in debugging efforts by providing detailed contextual information.

  • Logging functionality is customizable, allowing different workers to provide unique contextual information, such as worker indices or specific namespaces, with visual cues.

  • The dynamic task execution model supports adding new tasks and task runners without changing existing code, facilitating scalability and extensibility.

  • Pod Labeling Logic has been enhanced to perform more efficiently by minimizing unnecessary API calls, and it now includes robust error handling and retry mechanisms.

  • The scaling functionality has been introduced to adjust deployment sizes with conflict resolution strategies, catering to dynamic workload requirements.

  • Image update functionality has been added to modify the image of a container within a deployment, with built-in retry logic for handling update conflicts.

  • The introduction of PVC creation allows for dynamic storage provisioning, complete with emoji-based logging for immediate operation feedback.

  • Network Policy update functionality has been introduced, allowing for the management of network traffic policies within the cluster, with structured logging and retry mechanisms.

TODO

  • Extend the functionality of the CrewWorker function to support a wider range of tasks and allow for greater configurability.

  • Expand the package's support for additional Kubernetes resources and operations, catering to more complex orchestration needs.

  • Introduce metrics collection to monitor the health and performance of worker processes, facilitating proactive maintenance and scaling decisions.

Copyright (c) 2023 H0llyW00dzZ

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CaptainTellWorkers

func CaptainTellWorkers(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, tasks []configuration.Task, workerCount int) (<-chan string, func())

CaptainTellWorkers launches worker goroutines to execute tasks within a Kubernetes namespace. It returns a channel to receive task results and a function to initiate a graceful shutdown. The shutdown function ensures all workers are stopped and the results channel is closed.

Parameters:

ctx context.Context: Parent context to control the lifecycle of the workers.
clientset *kubernetes.Clientset: Kubernetes API client for task operations.
shipsNamespace string: Namespace in Kubernetes to perform tasks.
tasks []configuration.Task: Slice of Task structs to be executed by the workers.
workerCount int: Number of worker goroutines to start.

Returns:

<-chan string: A read-only channel to receive task results.
func()): A function to call for initiating a graceful shutdown of the workers.

func CrewCheckingisPodHealthy

func CrewCheckingisPodHealthy(pod *corev1.Pod) bool

CrewCheckingisPodHealthy assesses a pod's health by its phase and container readiness. It returns true if the pod is in the running phase and all its containers are ready.

Parameters:

pod *corev1.Pod: The pod to check for health status.

Returns:

bool: True if the pod is considered healthy, false otherwise.

func CrewProcessPods

func CrewProcessPods(ctx context.Context, pods []corev1.Pod, results chan<- string)

CrewProcessPods iterates over a list of pods to evaluate their health. It sends a health status message for each pod to the results channel. If the context is cancelled during the process, it logs the cancellation and sends a corresponding message through the results channel.

Note: this dead code is left here for future use.

func CrewWorker

func CrewWorker(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, tasks []configuration.Task, results chan<- string, logger *zap.Logger, taskStatus *TaskStatusMap, workerIndex int)

CrewWorker orchestrates the execution of tasks within a Kubernetes namespace by utilizing performTaskWithRetries to attempt each task with built-in retry logic. If a task fails after the maximum number of retries, it logs the error and sends a failure message through the results channel. Tasks are claimed to prevent duplicate executions, and they can be released if necessary for subsequent retries.

Parameters:

ctx context.Context: Context for cancellation and timeout of the worker process.
clientset *kubernetes.Clientset: Kubernetes API client for cluster interactions.
shipsNamespace string: Namespace in Kubernetes for task operations.
tasks []configuration.Task: List of Task structs, each representing an executable task.
results chan<- string: Channel to return execution results to the caller.
logger *zap.Logger: Logger for structured logging within the worker.
taskStatus *TaskStatusMap: Map to track and control the status of tasks.
workerIndex int: Identifier for the worker instance for logging.

func InitializeTasks added in v0.2.3

func InitializeTasks(filePath string) ([]configuration.Task, error)

InitializeTasks loads tasks from the specified configuration file. filePath is the path to the configuration file that contains the task definitions. It returns a slice of Task structs loaded from the configuration file and any error encountered.

func LabelPods

func LabelPods(ctx context.Context, clientset *kubernetes.Clientset, namespace, labelKey, labelValue string) error

LabelPods sets a specific label on all pods within the specified namespace that do not already have it. This function iterates over all pods in the namespace and delegates the labeling of each individual pod to the labelSinglePod function.

Parameters:

ctx context.Context: A context.Context for managing cancellation and deadlines.
clientset *kubernetes.Clientset: A *kubernetes.Clientset instance used to interact with the Kubernetes API.
namespace: The namespace in which the pods are located.
labelKey: The key of the label to be added or updated.
labelValue string: The value for the label.

Returns:

error: An error if listing pods or updating any pod's labels fails.

func NewKubernetesClient

func NewKubernetesClient() (*kubernetes.Clientset, error)

NewKubernetesClient creates a new Kubernetes client using the in-cluster configuration or the kubeconfig file, depending on the environment.

Returns:

*kubernetes.Clientset: A pointer to a Kubernetes Clientset ready for API interactions.
error: An error if the configuration fails or the client cannot be created.

func RegisterTaskRunner added in v0.1.0

func RegisterTaskRunner(taskType string, constructor func() TaskRunner)

RegisterTaskRunner associates a task type with a TaskRunner constructor in the registry. This function is used to extend the system with new types of tasks.

func ScaleDeployment added in v0.1.8

func ScaleDeployment(ctx context.Context, clientset *kubernetes.Clientset, namespace string, deploymentName string, scale int, maxRetries int, retryDelay time.Duration, results chan<- string, logger *zap.Logger) error

ScaleDeployment attempts to scale a Kubernetes deployment to the desired number of replicas. It retries the scaling operation up to a maximum number of retries upon encountering conflicts. Non-conflict errors are reported immediately without retries. Success or failure messages are sent through the results channel, and logs are produced accordingly.

Parameters:

ctx context.Context: Context for cancellation and timeout of the scaling process.
clientset *kubernetes.Clientset: Kubernetes API client for interacting with the cluster.
namespace string: The namespace of the deployment.
deploymentName string: The name of the deployment to scale.
scale int: The desired number of replicas to scale to.
maxRetries int: The maximum number of retries for the scaling operation.
retryDelay time.Duration: The duration to wait before retrying the scaling operation.
results chan<- string: A channel for sending the results of the scaling operation.
logger *zap.Logger: A structured logger for logging information and errors.

Returns:

error: An error if scaling fails after all retries, or nil on success.

func UpdateDeploymentImage added in v0.1.9

func UpdateDeploymentImage(ctx context.Context, clientset *kubernetes.Clientset, namespace, deploymentName, containerName, newImage string, maxRetries int, retryDelay time.Duration, results chan<- string, logger *zap.Logger) error

UpdateDeploymentImage attempts to update the image of a specified container within a deployment in Kubernetes. It performs retries on conflicts and reports the outcome through a results channel. If the image update is successful, a success message is sent to the results channel. In case of errors other than conflicts or after exceeding the maximum number of retries, it reports the failure.

Parameters:

ctx context.Context: Context for cancellation and timeout.
clientset *kubernetes.Clientset: A Kubernetes clientset to interact with the Kubernetes API.
namespace: The Kubernetes namespace containing the deployment.
deploymentName: The name of the deployment to update.
containerName: The name of the container within the deployment to update.
newImage string: The new image to apply to the container.
maxRetries int: A channel to send operation results for logging.
retryDelay time.Duration: A logger for structured logging.

Returns an error if the operation fails after the maximum number of retries or if a non-conflict error is encountered.

func UpdateNetworkPolicy added in v0.2.2

func UpdateNetworkPolicy(ctx context.Context, clientset *kubernetes.Clientset, namespace, policyName string, policySpec networkingv1.NetworkPolicySpec, results chan<- string, logger *zap.Logger) error

UpdateNetworkPolicy updates a Kubernetes NetworkPolicy with the provided specification. It performs the update operation with retries on conflict errors and reports the outcome through a results channel. On success, a success message is sent to the results channel. In case of errors other than conflicts or after exceeding the maximum number of retries, a failure is reported.

Parameters:

ctx context.Context: Context for cancellation and timeout.
clientset *kubernetes.Clientset: A Kubernetes clientset for interacting with the Kubernetes API.
namespace: The Kubernetes namespace containing the NetworkPolicy.
policyName string: The name of the NetworkPolicy to update.
policySpec networkingv1.NetworkPolicySpec: The new specification for the NetworkPolicy.
esults chan<- string: A channel to send operation results for logging.
logger *zap.Logger: A logger for structured logging.

Returns an error if the operation fails after retries or if a non-conflict error is encountered.

Types

type CrewCreatePVCStorage added in v0.2.1

type CrewCreatePVCStorage struct {
	// contains filtered or unexported fields
}

CrewCreatePVCStorage is an implementation of TaskRunner that creates a PersistentVolumeClaim.

This struct is responsible for creating PVCs (PersistentVolumeClaims) in a Kubernetes cluster. It extracts the necessary parameters from the task parameters, calls the createPVC function to create the PVC, and handles logging and error handling during the process.

func (*CrewCreatePVCStorage) Run added in v0.2.1

func (c *CrewCreatePVCStorage) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, parameters map[string]interface{}, workerIndex int) error

Run creates a PersistentVolumeClaim in the specified namespace using the provided parameters.

This method orchestrates the task execution by extracting the required parameters, invoking the createPVC function to create the PVC, and handling any errors or logging messages.

type CrewGetPods

type CrewGetPods struct {
	// contains filtered or unexported fields
}

CrewGetPods is an example TaskRunner which currently only prints the task's parameters. This struct is intended to be a placeholder and should be extended to implement the backup logic for the task it represents.

func (*CrewGetPods) Run added in v0.1.0

func (b *CrewGetPods) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task configuration.Task, parameters map[string]interface{}, workerIndex int) error

Run prints the task parameters to stdout. This method should be replaced with actual backup logic to fulfill the TaskRunner interface.

type CrewGetPodsTaskRunner added in v0.1.0

type CrewGetPodsTaskRunner struct {
	// contains filtered or unexported fields
}

CrewGetPodsTaskRunner is an implementation of TaskRunner that lists and logs all pods in a given Kubernetes namespace.

func (*CrewGetPodsTaskRunner) Run added in v0.1.0

func (c *CrewGetPodsTaskRunner) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, parameters map[string]interface{}, workerIndex int) error

Run lists all pods in the specified namespace and logs each pod's name and status. It uses the provided Kubernetes clientset and context to interact with the Kubernetes cluster.

type CrewLabelPodsTaskRunner added in v0.1.3

type CrewLabelPodsTaskRunner struct {
	// contains filtered or unexported fields
}

CrewLabelPodsTaskRunner is an implementation of TaskRunner that labels all pods in a given Kubernetes namespace with a specific label.

func (*CrewLabelPodsTaskRunner) Run added in v0.1.3

func (c *CrewLabelPodsTaskRunner) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, parameters map[string]interface{}, workerIndex int) error

CrewLabelPodsTaskRunner is an implementation of the TaskRunner interface that applies a set of labels to all pods within a given Kubernetes namespace. It is responsible for parsing the label parameters, invoking the labeling operation, and logging the process. The Run method orchestrates these steps, handling any errors that occur during the execution and ensuring that the task's intent is fulfilled effectively.

type CrewManageDeployments added in v0.1.5

type CrewManageDeployments struct {
	// contains filtered or unexported fields
}

TODO: Add the new TaskRunner for managing deployments.

func (*CrewManageDeployments) Run added in v0.1.5

func (c *CrewManageDeployments) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, parameters map[string]interface{}, workerIndex int) error

TODO: Add the new TaskRunner for managing deployments.

type CrewProcessCheckHealthTask added in v0.1.0

type CrewProcessCheckHealthTask struct {
	// contains filtered or unexported fields
}

CrewProcessCheckHealthTask is an implementation of TaskRunner that checks the health of each pod in a given Kubernetes namespace and sends the results to a channel.

func (*CrewProcessCheckHealthTask) Run added in v0.1.0

func (c *CrewProcessCheckHealthTask) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, parameters map[string]interface{}, workerIndex int) error

Run iterates over the pods in the specified namespace, checks their health status, and sends a formatted status message to the provided results channel. It respects the context's cancellation signal and stops processing if the context is cancelled.

type CrewScaleDeployments added in v0.1.8

type CrewScaleDeployments struct {
	// contains filtered or unexported fields
}

CrewScaleDeployments is an implementation of the TaskRunner interface that scales deployments within a given Kubernetes namespace. It is responsible for parsing the scaling parameters, performing the scaling operation, and logging the activity. The Run method orchestrates these steps, handling any errors that occur during the execution and ensuring that the scaling task is carried out effectively.

func (*CrewScaleDeployments) Run added in v0.1.8

func (c *CrewScaleDeployments) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, parameters map[string]interface{}, workerIndex int) error

Run executes the scaling operation for a Kubernetes deployment. It reads the 'deploymentName' and 'replicas' from the task parameters, validates them, and then calls the ScaleDeployment function to adjust the number of replicas for the deployment. The method logs the initiation and completion of the scaling operation and reports any errors encountered during the process.

type CrewUpdateImageDeployments added in v0.1.9

type CrewUpdateImageDeployments struct {
	// contains filtered or unexported fields
}

CrewUpdateImageDeployments contains information required to update the image of a Kubernetes deployment.

func (*CrewUpdateImageDeployments) Run added in v0.1.9

func (c *CrewUpdateImageDeployments) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, parameters map[string]interface{}, workerIndex int) error

Run performs the update operation for a Kubernetes deployment's container image. It extracts the deployment name, container name, and new image from the task parameters, and then proceeds with the update using the UpdateDeploymentImage function. The method logs the start and end of the update operation and handles any errors encountered.

type CrewUpdateNetworkPolicy added in v0.2.2

type CrewUpdateNetworkPolicy struct {
	// contains filtered or unexported fields
}

CrewUpdateNetworkPolicy is a TaskRunner that updates a Kubernetes NetworkPolicy according to the provided parameters.

func (*CrewUpdateNetworkPolicy) Run added in v0.2.2

func (c *CrewUpdateNetworkPolicy) Run(ctx context.Context, clientset *kubernetes.Clientset, shipsNamespace string, task configuration.Task, parameters map[string]interface{}, workerIndex int) error

Run executes the update operation for a Kubernetes NetworkPolicy. It extracts the policy name and specification from the task parameters, updates the policy using the UpdateNetworkPolicy function, and logs the process. The method handles parameter extraction, the update operation, and error reporting. It uses a results channel to report the outcome of the update operation.

type RetryPolicy added in v0.2.7

type RetryPolicy struct {
	MaxRetries int           // The maximum number of times to retry the operation.
	RetryDelay time.Duration // The delay between consecutive retry attempts.
}

RetryPolicy encapsulates the configuration for how an operation should be retried in the event of a failure. It specifies the maximum number of retries ('MaxRetries') that should be attempted and the delay ('RetryDelay') between each retry attempt.

Fields:

MaxRetries int: The maximum number of retry attempts to make before giving up.
RetryDelay time.Duration: The duration to wait between successive retry attempts.

func (*RetryPolicy) Execute added in v0.2.7

func (r *RetryPolicy) Execute(ctx context.Context, operation func() (string, error), logFunc func(string, ...zap.Field)) error

Execute runs the given operation according to the retry policy defined by the RetryPolicy struct. It attempts to execute the operation within the context's deadline and retries upon failure according to the MaxRetries and RetryDelay settings.

This method takes a context for cancellation, a function representing the operation to be executed, and a logging function to log retries. The operation function is expected to return a string, which usually represents a task name or identifier, and an error indicating the success or failure of the operation. If the operation is successful (no error returned), Execute will return nil. If the operation fails after the maximum number of retries, the last error is returned.

The logFunc parameter is a function that adheres to the signature of the zap logging library's logging methods (e.g., Info, Error) and is used to log retry attempts with structured logging fields.

ctx context.Context: The context that controls the cancellation of the operation and retries.
operation func() (string, error): The operation to be executed, which returns a result string and error.
logFunc func(string, ...zap.Field): The logging function to log retry attempts.

Returns an error if the operation does not succeed within the maximum number of retries or if the context is cancelled, otherwise returns nil.

type TaskRunner added in v0.1.0

type TaskRunner interface {
	Run(ctx context.Context, clientset *kubernetes.Clientset, shipsnamespace string, task configuration.Task, parameters map[string]interface{}, workerIndex int) error
}

TaskRunner defines the interface for running tasks. Implementations of TaskRunner should execute tasks based on the provided context, Kubernetes clientset, namespace, and task parameters.

func GetTaskRunner added in v0.1.0

func GetTaskRunner(taskType string) (TaskRunner, error)

GetTaskRunner retrieves a TaskRunner from the registry based on the provided task type. It returns an error if the task type is not recognized.

type TaskStatusMap added in v0.1.0

type TaskStatusMap struct {
	// contains filtered or unexported fields
}

TaskStatusMap is a thread-safe data structure that maintains the status and claim state of tasks. It provides synchronized access to tasks and their claim status using a read/write mutex, which allows multiple readers or one writer at a time. This structure is particularly useful for coordinating task claims among multiple worker routines in a concurrent environment.

The struct contains two maps:

tasks   map[string]configuration.Task: A map that stores tasks by their names, allowing quick retrieval and updates.
claimed map[string]bool: A map that tracks whether tasks have been claimed, with a boolean indicating the claim status.

The methods of TaskStatusMap provide safe manipulation of tasks and their claim status, ensuring that all operations are atomic and no data races occur.

func NewTaskStatusMap added in v0.1.0

func NewTaskStatusMap() *TaskStatusMap

NewTaskStatusMap initializes a new TaskStatusMap with empty maps for tasks and claimed status. It is intended to be called when a new task manager is required, providing a ready-to-use structure for task tracking.

Returns:

*TaskStatusMap: A pointer to the newly created TaskStatusMap instance.

func (*TaskStatusMap) AddTask added in v0.2.0

func (s *TaskStatusMap) AddTask(task configuration.Task)

AddTask adds a new task to the tasks map. If a task with the same name already exists, it updates the existing task. This method ensures that the addition or update of a task is thread-safe and does not interfere with other concurrent operations on the TaskStatusMap.

Parameters:

task configuration.Task: The task to add or update in the map.

Note: this deadcode is left here for future use.

func (*TaskStatusMap) Claim added in v0.1.0

func (s *TaskStatusMap) Claim(taskName string) bool

Claim attempts to mark a task as claimed if it is not already claimed by another worker. It locks the map for writing to ensure the claim operation is atomic. If the task is already claimed, it returns false. Otherwise, it marks the task as claimed and returns true. This method is critical for coordinating task claims among concurrent workers.

Parameters:

taskName string: The name of the task to claim.

Returns:

bool: A boolean indicating whether the task was successfully claimed.

Note: this deadcode is left here for future use.

func (*TaskStatusMap) DeleteTask added in v0.2.0

func (s *TaskStatusMap) DeleteTask(name string)

DeleteTask removes a task from the tasks map and also unclaims it if it was previously claimed. This method ensures that the removal of a task is thread-safe and consistent, preventing access to a task that is no longer relevant.

Parameters:

name string: The name of the task to remove.

Note: this deadcode is left here for future use.

func (*TaskStatusMap) GetAllTasks added in v0.2.0

func (s *TaskStatusMap) GetAllTasks() []configuration.Task

GetAllTasks compiles a list of all tasks currently stored in the tasks map. It locks the map for reading to provide safe concurrent access. The returned slice contains copies of the tasks, ensuring that further manipulations of the slice do not affect the original tasks in the map.

Returns:

[]configuration.Task: A slice containing all tasks from the tasks map.

Note: this deadcode is left here for future use.

func (*TaskStatusMap) GetTask added in v0.2.0

func (s *TaskStatusMap) GetTask(name string) (configuration.Task, bool)

GetTask retrieves a task by its name, providing safe and concurrent read access. It returns the task and a boolean indicating whether the task was found. This method allows workers to check the existence and details of a task without risking a data race.

Parameters:

name string: The name of the task to retrieve.

Returns:

configuration.Task: The retrieved task.
bool: A boolean indicating whether the task was found in the map.

Note: this deadcode is left here for future use.

func (*TaskStatusMap) IsClaimed added in v0.2.0

func (s *TaskStatusMap) IsClaimed(taskName string) bool

IsClaimed checks if a task is currently marked as claimed in the claimed map. It provides safe concurrent read access to determine the claim status of a task. This method is useful for workers to verify if a task is already being processed by another worker.

Parameters:

taskName string: The name of the task to check the claim status for.

Returns:

bool: A boolean indicating whether the task is currently claimed.

Note: this deadcode is left here for future use.

func (*TaskStatusMap) Release added in v0.1.0

func (s *TaskStatusMap) Release(taskName string)

Release marks a task as unclaimed, making it available for other workers to claim. It locks the map for writing to ensure that the release operation is exclusive and atomic. This method is used when a worker has finished processing a task or when the task needs to be retried and thus made available again.

Parameters:

taskName string: The name of the task to unclaim.

func (*TaskStatusMap) UpdateTask added in v0.2.0

func (s *TaskStatusMap) UpdateTask(task configuration.Task)

UpdateTask modifies the details of an existing task in the tasks map. It locks the map for writing, ensuring that the update operation is exclusive and no other write operations interfere. This method is useful when a task's properties need to be changed during its lifecycle.

Parameters:

task configuration.Task: The task with updated information to be stored in the map.

Note: this deadcode is left here for future use.

Directories

Path Synopsis
Package configuration manages the loading and parsing of task configurations for the K8sBlackPearl project, supporting both JSON and YAML formats.
Package configuration manages the loading and parsing of task configurations for the K8sBlackPearl project, supporting both JSON and YAML formats.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL