karta

package module
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2024 License: MIT Imports: 8 Imported by: 8

README

English | 中文

logo

Go Report Card Build Status Go Reference

Introduction

The Karta component is a lightweight task batch and asynchronous processing module, similar to the ThreadPoolExecutor in Python. It provides a simple interface to submit tasks and retrieve results.

Why Karta? In my work, I often need to process a large number of jobs. I wanted to use code similar to ThreadPoolExecutor in Python to handle these jobs. However, there was no such component available in Golang, so I decided to create one.

Karta is designed to be simple and consists of two main processes: Group and Pipeline.

  • Group: Batch processing of tasks using the Group component.
  • Pipeline: Sequential processing of tasks using the Pipeline component. Each task can specify a handle function.

Advantages

  • Simple and user-friendly
  • Lightweight with no external dependencies
  • Supports callback functions for custom actions

Design

Following the design, the architecture UML diagram for Karta is shown below:

design

Installation

go get github.com/shengyanli1982/karta

Quick Start

Karta is incredibly easy to use. With just a few lines of code, you can efficiently batch process tasks.

Config

The Karta library provides a config object that allows you to customize the behavior of the batch processing. The config object offers the following methods for configuration:

  • WithWorkerNumber: Sets the number of workers. The default value is 2, with a maximum of 524280.
  • WithCallback: Sets the callback function. The default value is &emptyCallback{}.
  • WithHandleFunc: Sets the handle function. The default value is defaultMsgHandleFunc.
  • WithResult: Specifies whether to record the results of all tasks. The default value is false, and it only applies to Group.
Components
1. Group

Group is a batch processing component that allows you to process tasks in batches. It uses a fixed number of workers to handle the tasks.

Methods

  • Map: Processes tasks in batches by providing a slice of objects, with each object serving as a parameter for the handle function. The method returns a slice of results when WithResult is set to true.

Callback

  • OnBefore: Callback function executed before task processing.
  • OnAfter: Callback function executed after task processing.

Example

package main

import (
	"time"

	k "github.com/shengyanli1982/karta"
)

// handleFunc 是一个处理函数,它接收一个任意类型的消息,暂停一段时间(消息值的100毫秒),然后返回该消息和nil错误。
// handleFunc is a handler function that takes a message of any type, sleeps for a duration (100 milliseconds of the message value), and then returns the message and a nil error.
func handleFunc(msg any) (any, error) {
	// 将消息转换为整数,然后暂停该整数值的100毫秒。
	// Convert the message to an integer, then pause for 100 milliseconds of the integer value.
	time.Sleep(time.Duration(msg.(int)) * time.Millisecond * 100)

	// 返回接收到的消息和nil错误。
	// Return the received message and a nil error.
	return msg, nil
}

func main() {
	// 创建一个新的配置对象。
	// Create a new configuration object.
	c := k.NewConfig()
	// 设置处理函数,工作线程数量和结果处理。
	// Set the handler function, the number of worker threads, and result processing.
	c.WithHandleFunc(handleFunc).WithWorkerNumber(2).WithResult()

	// 使用配置创建一个新的工作组。
	// Create a new work group using the configuration.
	g := k.NewGroup(c)

	// 确保在main函数结束时停止工作组。
	// Ensure the work group is stopped when the main function ends.
	defer g.Stop()

	// 将处理函数映射到一组输入值。
	// Map the handler function to a set of input values.
	r0 := g.Map([]any{3, 5, 2})

	// 打印第一个结果的整数值。
	// Print the integer value of the first result.
	println(r0[0].(int))
}

Result

$ go run demo.go
3
2. Pipeline

Pipeline is a task processing component that can process tasks one by one. It dynamically adjusts the number of workers based on the availability of tasks.

Idle workers are automatically closed after defaultWorkerIdleTimeout (10 seconds). If there are no tasks for a long time, the number of workers will decrease to defaultMinWorkerNum (1).

When a task is submitted using Submit or SubmitWithFunc, it is processed by an idle worker. If there are no idle workers, a new worker is created. The number of running workers increases to the value set by the WithWorkerNumber method if there are not enough running workers.

Pipeline requires a queue object that implements the DelayingQueue interface to store tasks.

// Queue 接口定义了一个队列应该具备的基本操作。
// The Queue interface defines the basic operations that a queue should have.
type Queue = interface {
	// Put 方法用于将元素放入队列。
	// The Put method is used to put an element into the queue.
	Put(value interface{}) error

	// Get 方法用于从队列中获取元素。
	// The Get method is used to get an element from the queue.
	Get() (value interface{}, err error)

	// Done 方法用于标记元素处理完成。
	// The Done method is used to mark the element as done.
	Done(value interface{})

	// Shutdown 方法用于关闭队列。
	// The Shutdown method is used to shut down the queue.
	Shutdown()

	// IsClosed 方法用于检查队列是否已关闭。
	// The IsClosed method is used to check if the queue is closed.
	IsClosed() bool
}

// DelayingQueue 接口继承了 Queue 接口,并添加了一个 PutWithDelay 方法,用于将元素延迟放入队列。
// The DelayingQueue interface inherits from the Queue interface and adds a PutWithDelay method to put an element into the queue with delay.
type DelayingQueue = interface {
	Queue

	// PutWithDelay 方法用于将元素延迟放入队列。
	// The PutWithDelay method is used to put an element into the queue with delay.
	PutWithDelay(value interface{}, delay int64) error
}

Methods

  • SubmitWithFunc: Submits a task with a handle function. msg is the handle function parameter. If fn is nil, the handle function will be set using WithHandleFunc.
  • Submit: Submits a task without a handle function. msg is the handle function parameter. The handle function will be set using WithHandleFunc.
  • SubmitAfterWithFunc: Submits a task with a handle function after a delay. msg is the handle function parameter. If fn is nil, the handle function will be set using WithHandleFunc. delay is the delay time (time.Duration).
  • SubmitAfter: Submits a task without a handle function after a delay. msg is the handle function parameter. The handle function will be set using WithHandleFunc. delay is the delay time (time.Duration).
  • Stop: Stops the pipeline.

Callback

  • OnBefore: Callback function executed before task processing.
  • OnAfter: Callback function executed after task processing.

Example

package main

import (
	"fmt"
	"time"

	k "github.com/shengyanli1982/karta"
	wkq "github.com/shengyanli1982/workqueue/v2"
)

// handleFunc 是一个处理函数,它接收一个任意类型的消息,打印该消息,然后返回该消息和nil错误。
// handleFunc is a handler function that takes a message of any type, prints the message, and then returns the message and a nil error.
func handleFunc(msg any) (any, error) {
	// 打印接收到的消息。
	// Print the received message.
	fmt.Println("default:", msg)

	// 返回接收到的消息和nil错误。
	// Return the received message and a nil error.
	return msg, nil
}

func main() {
	// 创建一个新的配置对象。
	// Create a new configuration object.
	c := k.NewConfig()

	// 设置处理函数和工作线程数量。
	// Set the handler function and the number of worker threads.
	c.WithHandleFunc(handleFunc).WithWorkerNumber(2)

	// 创建一个新的假延迟队列。
	// Create a new fake delaying queue.
	queue := k.NewFakeDelayingQueue(wkq.NewQueue(nil))

	// 使用队列和配置创建一个新的管道。
	// Create a new pipeline using the queue and configuration.
	pl := k.NewPipeline(queue, c)

	// 确保在main函数结束时停止管道。
	// Ensure the pipeline is stopped when the main function ends.
	defer pl.Stop()

	// 提交一个消息到管道。
	// Submit a message to the pipeline.
	_ = pl.Submit("foo")

	// 使用特定的处理函数提交一个消息到管道。
	// Submit a message to the pipeline using a specific handler function.
	_ = pl.SubmitWithFunc(func(msg any) (any, error) {
		// 打印接收到的消息。
		// Print the received message.
		fmt.Println("SpecFunc:", msg)

		// 返回接收到的消息和nil错误。
		// Return the received message and a nil error.
		return msg, nil
	}, "bar")

	// 暂停一秒钟。
	// Pause for one second.
	time.Sleep(time.Second)
}

Result

$ go run demo.go
default: foo
SpecFunc: bar

Documentation

Overview

abpxx6d04wxr 包含队列接口的定义

Index

Constants

This section is empty.

Variables

View Source
var (
	// 默认的消息处理函数,返回接收到的消息和nil错误
	// Default message handle function, returns the received message and a nil error
	DefaultMsgHandleFunc = func(msg any) (any, error) { return msg, nil }
)
View Source
var ErrorQueueClosed = errors.New("pipeline is closed")

定义一个错误类型 ErrorQueueClosed,表示管道已经关闭 Define an error type ErrorQueueClosed, indicating that the pipeline is closed

Functions

This section is empty.

Types

type Callback

type Callback = interface {
	// OnBefore 是一个方法,它在消息处理之前被调用,接收一个任意类型的参数 msg
	// OnBefore is a method that is called before message processing, it receives an argument of any type, msg
	OnBefore(msg any)

	// OnAfter 是一个方法,它在消息处理之后被调用,接收三个任意类型的参数:msg,result 和 err
	// OnAfter is a method that is called after message processing, it receives three arguments of any type: msg, result, and err
	OnAfter(msg, result any, err error)
}

Callback 是一个接口,定义了在消息处理前后需要调用的方法 Callback is an interface that defines methods to be called before and after message processing

func NewEmptyCallback added in v0.1.4

func NewEmptyCallback() Callback

NewEmptyCallback 是一个函数,它创建并返回一个新的 emptyCallback NewEmptyCallback is a function that creates and returns a new emptyCallback

type Config

type Config struct {
	// contains filtered or unexported fields
}

Config 是一个结构体,用于配置消息处理的参数 Config is a struct used to configure parameters for message processing

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig 创建一个默认的配置 DefaultConfig creates a default configuration

func NewConfig

func NewConfig() *Config

NewConfig 是一个函数,用于创建并返回一个新的 Config 结构体的指针 NewConfig is a function that creates and returns a pointer to a new Config struct

func (*Config) WithCallback

func (c *Config) WithCallback(callback Callback) *Config

WithCallback 是一个方法,用于设置 Config 结构体中的 callback 变量 WithCallback is a method used to set the callback variable in the Config struct

func (*Config) WithHandleFunc

func (c *Config) WithHandleFunc(fn MessageHandleFunc) *Config

WithHandleFunc 是一个方法,用于设置 Config 结构体中的 handleFunc 变量 WithHandleFunc is a method used to set the handleFunc variable in the Config struct

func (*Config) WithResult

func (c *Config) WithResult() *Config

WithResult 是一个方法,用于设置 Config 结构体中的 result 变量 WithResult is a method used to set the result variable in the Config struct

func (*Config) WithWorkerNumber

func (c *Config) WithWorkerNumber(num int) *Config

WithWorkerNumber 是一个方法,用于设置 Config 结构体中的 num 变量 WithWorkerNumber is a method used to set the num variable in the Config struct

type DelayingQueue added in v0.2.0

type DelayingQueue = interface {
	Queue

	// PutWithDelay 方法用于将元素延迟放入队列。
	// The PutWithDelay method is used to put an element into the queue with delay.
	PutWithDelay(value interface{}, delay int64) error
}

DelayingQueue 接口继承了 Queue 接口,并添加了一个 PutWithDelay 方法,用于将元素延迟放入队列。 The DelayingQueue interface inherits from the Queue interface and adds a PutWithDelay method to put an element into the queue with delay.

type FakeDelayingQueue added in v0.1.4

type FakeDelayingQueue struct{ Queue }

FakeDelayingQueue 是一个结构体,它实现了 DelayingQueueInterface 接口,但是它的 AddAfter 方法实际上并不会延迟添加元素 FakeDelayingQueue is a struct that implements the DelayingQueueInterface interface, but its AddAfter method does not actually delay adding values

func NewFakeDelayingQueue added in v0.1.4

func NewFakeDelayingQueue(queue Queue) *FakeDelayingQueue

NewFakeDelayingQueue 是一个函数,它创建并返回一个新的 FakeDelayingQueue NewFakeDelayingQueue is a function that creates and returns a new FakeDelayingQueue

func (*FakeDelayingQueue) PutWithDelay added in v0.2.0

func (q *FakeDelayingQueue) PutWithDelay(value any, delay int64) error

AddAfter 是 FakeDelayingQueue 的方法,它将元素添加到队列,但是并不会延迟 AddAfter is a method of FakeDelayingQueue, it adds an value to the queue, but does not delay

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group 是一个用于批量处理任务的结构体 Group is a struct for batch processing tasks

func NewGroup

func NewGroup(conf *Config) *Group

创建一个新的批量处理任务 Create a new batch processing task

func (*Group) Map

func (gr *Group) Map(elements []any) []any

Map 方法用于批量处理元素 The Map method is used for batch processing of elements

func (*Group) Stop

func (gr *Group) Stop()

停止批量处理任务 Stop the batch processing task

type MessageHandleFunc

type MessageHandleFunc = func(msg any) (any, error)

定义消息处理函数类型 Define the message handle function type

type Pipeline added in v0.1.3

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline 是一个结构体,表示管道,用于存储和处理数据 Pipeline is a struct that represents a pipeline, used for storing and processing data

func NewPipeline added in v0.1.3

func NewPipeline(queue DelayingQueue, conf *Config) *Pipeline

NewPipeline 是一个函数,它创建并返回一个新的 Pipeline NewPipeline is a function, it creates and returns a new Pipeline

func (*Pipeline) GetWorkerNumber added in v0.1.8

func (pl *Pipeline) GetWorkerNumber() int64

GetWorkerNumber 是 Pipeline 的一个方法,它返回当前正在运行的工作者数量 GetWorkerNumber is a method of Pipeline, it returns the number of workers currently running

func (*Pipeline) Stop added in v0.1.3

func (pl *Pipeline) Stop()

Stop 是 Pipeline 的一个方法,它用于停止管道 Stop is a method of Pipeline, it is used to stop the pipeline

func (*Pipeline) Submit added in v0.1.3

func (pl *Pipeline) Submit(msg any) error

Submit 是 Pipeline 的一个方法,它提交一个任务 Submit is a method of Pipeline, it submits a task

func (*Pipeline) SubmitAfter added in v0.1.4

func (pl *Pipeline) SubmitAfter(msg any, delay time.Duration) error

SubmitAfter 是 Pipeline 的一个方法,它在指定的延迟时间后提交一个任务 SubmitAfter is a method of Pipeline, it submits a task after a specified delay time

func (*Pipeline) SubmitAfterWithFunc added in v0.1.4

func (pl *Pipeline) SubmitAfterWithFunc(fn MessageHandleFunc, msg any, delay time.Duration) error

SubmitAfterWithFunc 是 Pipeline 的一个方法,它在指定的延迟时间后提交一个带有自定义处理函数的任务 SubmitAfterWithFunc is a method of Pipeline, it submits a task with a custom processing function after a specified delay time

func (*Pipeline) SubmitWithFunc added in v0.1.3

func (pl *Pipeline) SubmitWithFunc(fn MessageHandleFunc, msg any) error

SubmitWithFunc 是 Pipeline 的一个方法,它提交一个带有自定义处理函数的任务 SubmitWithFunc is a method of Pipeline, it submits a task with a custom processing function

type Queue

type Queue = interface {
	// Put 方法用于将元素放入队列。
	// The Put method is used to put an element into the queue.
	Put(value interface{}) error

	// Get 方法用于从队列中获取元素。
	// The Get method is used to get an element from the queue.
	Get() (value interface{}, err error)

	// Done 方法用于标记元素处理完成。
	// The Done method is used to mark the element as done.
	Done(value interface{})

	// Shutdown 方法用于关闭队列。
	// The Shutdown method is used to shut down the queue.
	Shutdown()

	// IsClosed 方法用于检查队列是否已关闭。
	// The IsClosed method is used to check if the queue is closed.
	IsClosed() bool
}

Queue 接口定义了一个队列应该具备的基本操作。 The Queue interface defines the basic operations that a queue should have.

Directories

Path Synopsis
examples
group Module
pipeline Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL