errandboi

command module
v0.0.0-...-765649b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2022 License: MIT Imports: 1 Imported by: 0

README

Errandboi

Errandboi is a scheduler that publishes given events to Nats and EMQX. It receives events via http request and caches them in Redis until their release time, then they get published to either EMQX or Nats or both. After events get published, their status changes to done.

Instructions

Lint and Run
  • run docker containers, lint and run app: make all
Build
  • build app locally: make run build
  • build docker image locally: make run docker-build
Run
  • run services: make up
  • run docker containers: make run

Scheduling and publish structure

scheduler
   ticker := time.NewTicker(d)  
  
   go func() {  
      for {  
         select {  
         case <-ticker.C:  
            sch.Publisher.GetEvents()  
            sch.Publisher.Work()  
         case <-sch.Stop:  
            sch.Publisher.Cancel()  
            sch.Publisher.Wp = workerpool.New(sch.Publisher.WorkerSize)  
  
            ticker.Stop()  
  
            return  
  }  
      }  
   }()
Nats

*creating stream

func (n *Nats) CreateStream() error {  
   stream, _ := n.JSCtx.StreamInfo(ChannelName)  
  
   if stream == nil {  
      in, err2 := n.JSCtx.AddStream(&nats.StreamConfig{  
         Name:     ChannelName,  
  Subjects: []string{SubjectName},  
  MaxAge:   0,  
  Storage:  nats.FileStorage,  
  })  
      if err2 != nil {  
         return fmt.Errorf("cannot create stream %w", err2)  
      }  
  
      stream = in  
   }  
  
   n.Logger.Info("events stream", zap.Any("stream", stream))  
  
   return nil  
}
  • publish
func (pb *Publisher) publishEventNats(event Event) {  
   t := natsp.ChannelName + "." + event.Topic  
   if _, err := pb.Nats.JSCtx.Publish(t, []byte(event.Payload)); err != nil {  
      pb.logger.Error("failed to publish event", zap.String("payload", event.Payload),  
  zap.String("topic", event.Topic), zap.Error(err))  
   }  
  
   pb.logger.Info("message published to nats", zap.String("payload", event.Payload))  
}
EMQX
  • publish
func (pb *Publisher) publishEventEMQ(event Event) {  
   if token := pb.Mqtt.Client.Subscribe(event.Topic, 0, nil); token.Wait() && token.Error() != nil {  
      fmt.Println(token.Error())  
   }  
  
   token := pb.Mqtt.Client.Publish(event.Topic, 0, false, event.Payload)  
   pb.logger.Info("message published to emq: ", zap.String("payload", event.Payload))  
   token.Wait()  
}

Example

Posting events
  • request
curl --request POST '{{baseURL}}/events' \
--data-raw '{
	"type": [ "nats","emqx"],
	"events": [
		{
		"description": "This is an event description",
		"delay": "10s",
		"topic": "topic1",
		"payload": "hello"
		}
	]
}'
  • response
{
	"id": "62495362f8358c7b9d83ff4e"
}
Check event status
  • request
curl --request GET '{{baseURL}}/events/{{eventId}}/status'
  • response
{
	"status": "done",
	"events": [
		{
		"description": "This is an event description",
		"publish_date":"2022-04-11T13:31:28+04:30",
		"status": "Done"
		}
	]
}

Documentation

The Go Gopher

There is no documentation for this package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL