workers

package
v0.0.2-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MonitorWorkers

func MonitorWorkers(ctx context.Context, node *masa.OracleNode)

MonitorWorkers monitors worker data by subscribing to the completed work topic, computing a CID for each received data, and writing the data to the database.

Parameters:

  • ctx: The context for the monitoring operation.
  • node: A pointer to the OracleNode instance.

The function uses a ticker to periodically log a debug message every 60 seconds. It subscribes to the completed work topic using the PubSubManager and handles the received data. For each received data, it computes a CID using the computeCid function, logs the CID, marshals the data to JSON, and writes it to the database using the WriteData function. The monitoring continues until the context is done.

func NewWorker

func NewWorker() actor.Producer

NewWorker creates a new instance of the Worker actor. It implements the actor.Receiver interface, allowing it to receive and handle messages.

Returns:

  • An instance of the Worker struct that implements the actor.Receiver interface.

func SendWork

func SendWork(node *masa.OracleNode, data []byte)

SendWork is responsible for handling work messages and processing them based on the request type. It supports the following request types: - "web": Scrapes web data from the specified URL with the given depth. - "twitter": Scrapes tweets based on the provided query and count.

The Worker actor receives messages through its Receive method, which is called by the actor system when a message is sent to the actor. It handles the following message types:

  • *messages.Connect: Indicates that a client has connected to the worker. The client's sender information is added to the clients set.
  • *actor.Started: Indicates that the actor has started. It prints a debug message.
  • *messages.Work: Contains the work data to be processed. The work data is parsed based on the request type, and the corresponding scraping function is called. The scraped data is then sent to the workerStatusCh channel for further processing.

The Worker actor is responsible for the NewWorker function, which returns an actor.Producer that can be used to spawn new instances of the Worker actor. @note we can use the WorkerTopic to SendWork anywhere in the service Usage with the Worker Gossip Topic

if err := node.PubSubManager.Publish(config.TopicWithVersion(config.WorkerTopic), data); err != nil {
    logrus.Errorf("%v", err)
}

Types

type CID

type CID struct {
	RecordId  string    `json:"recordid"`
	Timestamp time.Time `json:"timestamp"`
}

type OracleData

type OracleData struct {
	Id        string `json:"id"`
	PeerId    string `json:"peer_id"`
	Request   string `json:"request"`
	Domain    string `json:"domain"`
	ModelType string `json:"model_type"`
	ModelName string `json:"model_name"`
	Steps     []struct {
		Idx               int    `json:"idx"`
		StructuredPrompt  string `json:"structured_prompt,omitempty"`
		Timestamp         string `json:"timestamp"`
		UserPrompt        string `json:"user_prompt,omitempty"`
		RawContent        string `json:"raw_content,omitempty"`
		StructuredContent string `json:"structured_content,omitempty"`
	} `json:"steps"`
}

type Record

type Record struct {
	PeerId string `json:"peerid"`
	CIDs   []CID  `json:"cids"`
}

type Worker

type Worker struct{}

func (*Worker) Receive

func (a *Worker) Receive(ctx actor.Context)

Receive is the message handling method for the Worker actor. It receives messages through the actor context and processes them based on their type.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL