Documentation ¶
Index ¶
- Variables
- func GetEligibleWorkers(node *masa.OracleNode, category pubsub.WorkerCategory, config *WorkerConfig) ([]data_types.Worker, *data_types.Worker)
- type ResponseChannelMap
- func (drm *ResponseChannelMap) CreateChannel(key string) chan data_types.WorkResponse
- func (drm *ResponseChannelMap) Delete(key string)
- func (drm *ResponseChannelMap) Get(key string) (chan data_types.WorkResponse, bool)
- func (drm *ResponseChannelMap) Len() int
- func (drm *ResponseChannelMap) Set(key string, value chan data_types.WorkResponse)
- type WorkHandler
- type WorkHandlerInfo
- type WorkHandlerManager
- func (whm *WorkHandlerManager) DistributeWork(node *masa.OracleNode, workRequest data_types.WorkRequest) (response data_types.WorkResponse)
- func (whm *WorkHandlerManager) ExecuteWork(workRequest data_types.WorkRequest) (response data_types.WorkResponse)
- func (whm *WorkHandlerManager) HandleWorkerStream(stream network.Stream)
- type WorkerConfig
Constants ¶
This section is empty.
Variables ¶
var DefaultConfig = WorkerConfig{ WorkerTimeout: 55 * time.Second, WorkerResponseTimeout: 30 * time.Second, ConnectionTimeout: 1 * time.Second, MaxRetries: 1, MaxSpawnAttempts: 1, WorkerBufferSize: 100, MaxRemoteWorkers: 1, }
var ErrHandlerNotFound = errors.New("work handler not found")
ErrHandlerNotFound is an error returned when a work handler cannot be found.
Functions ¶
func GetEligibleWorkers ¶ added in v0.6.0
func GetEligibleWorkers(node *masa.OracleNode, category pubsub.WorkerCategory, config *WorkerConfig) ([]data_types.Worker, *data_types.Worker)
GetEligibleWorkers Uses the new NodeTracker method to get the eligible workers for a given message type I'm leaving this returning an array so that we can easily increase the number of workers in the future
Types ¶
type ResponseChannelMap ¶ added in v0.6.0
type ResponseChannelMap struct {
// contains filtered or unexported fields
}
func GetResponseChannelMap ¶ added in v0.6.0
func GetResponseChannelMap() *ResponseChannelMap
GetResponseChannelMap returns the singleton rcmInstance of ResponseChannelMap.
func (*ResponseChannelMap) CreateChannel ¶ added in v0.6.0
func (drm *ResponseChannelMap) CreateChannel(key string) chan data_types.WorkResponse
func (*ResponseChannelMap) Delete ¶ added in v0.6.0
func (drm *ResponseChannelMap) Delete(key string)
Delete removes the item with the specified key from the ResponseChannelMap. It acquires a write lock to ensure thread-safety while deleting the item.
func (*ResponseChannelMap) Get ¶ added in v0.6.0
func (drm *ResponseChannelMap) Get(key string) (chan data_types.WorkResponse, bool)
Get retrieves the value associated with the specified key from the ResponseChannelMap. It acquires a read lock to ensure thread-safety while reading the value. If the key exists in the ResponseChannelMap, it returns the corresponding value and true. If the key does not exist, it returns nil and false.
func (*ResponseChannelMap) Len ¶ added in v0.6.0
func (drm *ResponseChannelMap) Len() int
Len returns the number of items in the ResponseChannelMap. It acquires a read lock to ensure thread-safety while reading the length.
func (*ResponseChannelMap) Set ¶ added in v0.6.0
func (drm *ResponseChannelMap) Set(key string, value chan data_types.WorkResponse)
Set associates the specified value with the specified key in the ResponseChannelMap. It acquires a write lock to ensure thread-safety while setting the value.
type WorkHandler ¶ added in v0.6.0
type WorkHandler interface {
HandleWork(data []byte) data_types.WorkResponse
}
WorkHandler defines the interface for handling different types of work.
type WorkHandlerInfo ¶ added in v0.6.0
type WorkHandlerInfo struct { Handler WorkHandler CallCount int64 TotalRuntime time.Duration }
WorkHandlerInfo contains information about a work handler, including metrics.
type WorkHandlerManager ¶ added in v0.6.0
type WorkHandlerManager struct {
// contains filtered or unexported fields
}
WorkHandlerManager manages work handlers and tracks their execution metrics.
func GetWorkHandlerManager ¶ added in v0.6.0
func GetWorkHandlerManager() *WorkHandlerManager
func (*WorkHandlerManager) DistributeWork ¶ added in v0.6.0
func (whm *WorkHandlerManager) DistributeWork(node *masa.OracleNode, workRequest data_types.WorkRequest) (response data_types.WorkResponse)
func (*WorkHandlerManager) ExecuteWork ¶ added in v0.6.0
func (whm *WorkHandlerManager) ExecuteWork(workRequest data_types.WorkRequest) (response data_types.WorkResponse)
ExecuteWork finds and executes the work handler associated with the given name. It tracks the call count and execution duration for the handler.
func (*WorkHandlerManager) HandleWorkerStream ¶ added in v0.6.0
func (whm *WorkHandlerManager) HandleWorkerStream(stream network.Stream)
type WorkerConfig ¶ added in v0.6.0
type WorkerConfig struct { WorkerTimeout time.Duration WorkerResponseTimeout time.Duration ConnectionTimeout time.Duration MaxRetries int MaxSpawnAttempts int WorkerBufferSize int MaxRemoteWorkers int }
func LoadConfig ¶ added in v0.6.0
func LoadConfig() (*WorkerConfig, error)