Documentation ¶
Index ¶
- Variables
- func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory, limit int) ([]data_types.Worker, *data_types.Worker)
- type ResponseChannelMap
- func (drm *ResponseChannelMap) CreateChannel(key string) chan data_types.WorkResponse
- func (drm *ResponseChannelMap) Delete(key string)
- func (drm *ResponseChannelMap) Get(key string) (chan data_types.WorkResponse, bool)
- func (drm *ResponseChannelMap) Len() int
- func (drm *ResponseChannelMap) Set(key string, value chan data_types.WorkResponse)
- type WorkHandler
- type WorkHandlerInfo
- type WorkHandlerManager
- func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest data_types.WorkRequest) (response data_types.WorkResponse)
- func (whm *WorkHandlerManager) ExecuteWork(workRequest data_types.WorkRequest) (response data_types.WorkResponse)
- func (whm *WorkHandlerManager) HandleWorkerStream(stream network.Stream)
- type WorkerConfig
- type WorkerOption
- type WorkerOptionFunc
Constants ¶
This section is empty.
Variables ¶
var DefaultConfig = WorkerConfig{ WorkerTimeout: 55 * time.Second, WorkerResponseTimeout: 45 * time.Second, ConnectionTimeout: 75 * time.Millisecond, FindPeerTimeout: 50 * time.Millisecond, MaxRetries: 1, MaxSpawnAttempts: 1, WorkerBufferSize: 100, MaxRemoteWorkers: 10, }
var EnableDiscordScraperWorker = func(o *WorkerOption) { o.isDiscordScraperWorker = true }
var EnableTwitterWorker = func(o *WorkerOption) { o.isTwitterWorker = true }
var EnableWebScraperWorker = func(o *WorkerOption) { o.isWebScraperWorker = true }
var ErrHandlerNotFound = errors.New("work handler not found")
ErrHandlerNotFound is an error returned when a work handler cannot be found.
Functions ¶
func GetEligibleWorkers ¶ added in v0.6.0
func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory, limit int) ([]data_types.Worker, *data_types.Worker)
GetEligibleWorkers returns eligible workers for a given message type. For Twitter workers, it uses a balanced approach between high-performing workers and fair distribution. For other worker types, it returns all eligible workers without modification.
Types ¶
type ResponseChannelMap ¶ added in v0.6.0
type ResponseChannelMap struct {
// contains filtered or unexported fields
}
func GetResponseChannelMap ¶ added in v0.6.0
func GetResponseChannelMap() *ResponseChannelMap
GetResponseChannelMap returns the singleton rcmInstance of ResponseChannelMap.
func (*ResponseChannelMap) CreateChannel ¶ added in v0.6.0
func (drm *ResponseChannelMap) CreateChannel(key string) chan data_types.WorkResponse
func (*ResponseChannelMap) Delete ¶ added in v0.6.0
func (drm *ResponseChannelMap) Delete(key string)
Delete removes the item with the specified key from the ResponseChannelMap. It acquires a write lock to ensure thread-safety while deleting the item.
func (*ResponseChannelMap) Get ¶ added in v0.6.0
func (drm *ResponseChannelMap) Get(key string) (chan data_types.WorkResponse, bool)
Get retrieves the value associated with the specified key from the ResponseChannelMap. It acquires a read lock to ensure thread-safety while reading the value. If the key exists in the ResponseChannelMap, it returns the corresponding value and true. If the key does not exist, it returns nil and false.
func (*ResponseChannelMap) Len ¶ added in v0.6.0
func (drm *ResponseChannelMap) Len() int
Len returns the number of items in the ResponseChannelMap. It acquires a read lock to ensure thread-safety while reading the length.
func (*ResponseChannelMap) Set ¶ added in v0.6.0
func (drm *ResponseChannelMap) Set(key string, value chan data_types.WorkResponse)
Set associates the specified value with the specified key in the ResponseChannelMap. It acquires a write lock to ensure thread-safety while setting the value.
type WorkHandler ¶ added in v0.6.0
type WorkHandler interface {
HandleWork(data []byte) data_types.WorkResponse
}
WorkHandler defines the interface for handling different types of work.
type WorkHandlerInfo ¶ added in v0.6.0
type WorkHandlerInfo struct { Handler WorkHandler CallCount int64 TotalRuntime time.Duration }
WorkHandlerInfo contains information about a work handler, including metrics.
type WorkHandlerManager ¶ added in v0.6.0
type WorkHandlerManager struct {
// contains filtered or unexported fields
}
WorkHandlerManager manages work handlers and tracks their execution metrics.
func NewWorkHandlerManager ¶ added in v0.8.2
func NewWorkHandlerManager(opts ...WorkerOptionFunc) *WorkHandlerManager
func (*WorkHandlerManager) DistributeWork ¶ added in v0.6.0
func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest data_types.WorkRequest) (response data_types.WorkResponse)
func (*WorkHandlerManager) ExecuteWork ¶ added in v0.6.0
func (whm *WorkHandlerManager) ExecuteWork(workRequest data_types.WorkRequest) (response data_types.WorkResponse)
ExecuteWork finds and executes the work handler associated with the given name. It tracks the call count and execution duration for the handler.
func (*WorkHandlerManager) HandleWorkerStream ¶ added in v0.6.0
func (whm *WorkHandlerManager) HandleWorkerStream(stream network.Stream)
type WorkerConfig ¶ added in v0.6.0
type WorkerConfig struct { WorkerTimeout time.Duration WorkerResponseTimeout time.Duration ConnectionTimeout time.Duration FindPeerTimeout time.Duration MaxRetries int MaxSpawnAttempts int WorkerBufferSize int MaxRemoteWorkers int }
func LoadConfig ¶ added in v0.6.0
func LoadConfig() (*WorkerConfig, error)
type WorkerOption ¶ added in v0.8.2
type WorkerOption struct {
// contains filtered or unexported fields
}
func (*WorkerOption) Apply ¶ added in v0.8.2
func (a *WorkerOption) Apply(opts ...WorkerOptionFunc)
type WorkerOptionFunc ¶ added in v0.8.2
type WorkerOptionFunc func(*WorkerOption)
func WithMasaDir ¶ added in v0.8.7
func WithMasaDir(dir string) WorkerOptionFunc