workers

package
v0.8.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultConfig = WorkerConfig{
	WorkerTimeout:         55 * time.Second,
	WorkerResponseTimeout: 45 * time.Second,
	ConnectionTimeout:     75 * time.Millisecond,
	FindPeerTimeout:       50 * time.Millisecond,
	MaxRetries:            1,
	MaxSpawnAttempts:      1,
	WorkerBufferSize:      100,
	MaxRemoteWorkers:      10,
}
View Source
var EnableDiscordScraperWorker = func(o *WorkerOption) {
	o.isDiscordScraperWorker = true
}
View Source
var EnableLLMServerWorker = func(o *WorkerOption) {
	o.isLLMServerWorker = true
}
View Source
var EnableTwitterWorker = func(o *WorkerOption) {
	o.isTwitterWorker = true
}
View Source
var EnableWebScraperWorker = func(o *WorkerOption) {
	o.isWebScraperWorker = true
}
View Source
var ErrHandlerNotFound = errors.New("work handler not found")

ErrHandlerNotFound is an error returned when a work handler cannot be found.

Functions

func GetEligibleWorkers added in v0.6.0

func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory, limit int) ([]data_types.Worker, *data_types.Worker)

GetEligibleWorkers returns eligible workers for a given message type. For Twitter workers, it uses a balanced approach between high-performing workers and fair distribution. For other worker types, it returns all eligible workers without modification.

Types

type ResponseChannelMap added in v0.6.0

type ResponseChannelMap struct {
	// contains filtered or unexported fields
}

func GetResponseChannelMap added in v0.6.0

func GetResponseChannelMap() *ResponseChannelMap

GetResponseChannelMap returns the singleton rcmInstance of ResponseChannelMap.

func (*ResponseChannelMap) CreateChannel added in v0.6.0

func (drm *ResponseChannelMap) CreateChannel(key string) chan data_types.WorkResponse

func (*ResponseChannelMap) Delete added in v0.6.0

func (drm *ResponseChannelMap) Delete(key string)

Delete removes the item with the specified key from the ResponseChannelMap. It acquires a write lock to ensure thread-safety while deleting the item.

func (*ResponseChannelMap) Get added in v0.6.0

func (drm *ResponseChannelMap) Get(key string) (chan data_types.WorkResponse, bool)

Get retrieves the value associated with the specified key from the ResponseChannelMap. It acquires a read lock to ensure thread-safety while reading the value. If the key exists in the ResponseChannelMap, it returns the corresponding value and true. If the key does not exist, it returns nil and false.

func (*ResponseChannelMap) Len added in v0.6.0

func (drm *ResponseChannelMap) Len() int

Len returns the number of items in the ResponseChannelMap. It acquires a read lock to ensure thread-safety while reading the length.

func (*ResponseChannelMap) Set added in v0.6.0

func (drm *ResponseChannelMap) Set(key string, value chan data_types.WorkResponse)

Set associates the specified value with the specified key in the ResponseChannelMap. It acquires a write lock to ensure thread-safety while setting the value.

type WorkHandler added in v0.6.0

type WorkHandler interface {
	HandleWork(data []byte) data_types.WorkResponse
}

WorkHandler defines the interface for handling different types of work.

type WorkHandlerInfo added in v0.6.0

type WorkHandlerInfo struct {
	Handler      WorkHandler
	CallCount    int64
	TotalRuntime time.Duration
}

WorkHandlerInfo contains information about a work handler, including metrics.

type WorkHandlerManager added in v0.6.0

type WorkHandlerManager struct {
	// contains filtered or unexported fields
}

WorkHandlerManager manages work handlers and tracks their execution metrics.

func NewWorkHandlerManager added in v0.8.2

func NewWorkHandlerManager(opts ...WorkerOptionFunc) *WorkHandlerManager

func (*WorkHandlerManager) DistributeWork added in v0.6.0

func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest data_types.WorkRequest) (response data_types.WorkResponse)

func (*WorkHandlerManager) ExecuteWork added in v0.6.0

func (whm *WorkHandlerManager) ExecuteWork(workRequest data_types.WorkRequest) (response data_types.WorkResponse)

ExecuteWork finds and executes the work handler associated with the given name. It tracks the call count and execution duration for the handler.

func (*WorkHandlerManager) HandleWorkerStream added in v0.6.0

func (whm *WorkHandlerManager) HandleWorkerStream(stream network.Stream)

type WorkerConfig added in v0.6.0

type WorkerConfig struct {
	WorkerTimeout         time.Duration
	WorkerResponseTimeout time.Duration
	ConnectionTimeout     time.Duration
	FindPeerTimeout       time.Duration
	MaxRetries            int
	MaxSpawnAttempts      int
	WorkerBufferSize      int
	MaxRemoteWorkers      int
}

func LoadConfig added in v0.6.0

func LoadConfig() (*WorkerConfig, error)

type WorkerOption added in v0.8.2

type WorkerOption struct {
	// contains filtered or unexported fields
}

func (*WorkerOption) Apply added in v0.8.2

func (a *WorkerOption) Apply(opts ...WorkerOptionFunc)

type WorkerOptionFunc added in v0.8.2

type WorkerOptionFunc func(*WorkerOption)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL