Documentation ¶
Overview ¶
Package worker provides a way to manipulate concurrent processing. This guarantees all start/restart/stop operation for worker is always thread-safe by using Go channel feature.
Example ¶
package main import ( "github.com/iwataka/mybot/backend/worker" "context" "fmt" ) type MyWorker struct { name string } func NewMyWorker(name string) *MyWorker { return &MyWorker{name} } func (w *MyWorker) Start(ctx context.Context, outChan chan<- interface{}) error { <-ctx.Done() return nil } func (w *MyWorker) Name() string { return w.name } func main() { w := NewMyWorker("foo") wm := worker.NewWorkerManager(w, 0) defer wm.Close() // Start worker wm.Send(worker.StartSignal) fmt.Printf("Worker Status: %s\n", wm.Receive()) // Stop worker wm.Send(worker.StopSignal) fmt.Printf("Worker Status: %s\n", wm.Receive()) }
Output: Worker Status: Started Worker Status: Stopped
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type StrategicRestarter ¶
type StrategicRestarter struct {
// contains filtered or unexported fields
}
StrategicRestarter is a channel layer applied to restart a worker automatically. This restarts a worker when some error happens. If error happens more than `count` times in `interval` duration, this stops.
func NewStrategicRestarter ¶
func NewStrategicRestarter(interval time.Duration, count int, suppressError bool) StrategicRestarter
NewStrategicRestarter creates a new StrategicRestarter.
func (StrategicRestarter) Apply ¶
func (sr StrategicRestarter) Apply(ctx context.Context, inChan chan<- WorkerSignal, outChan <-chan interface{}, bufSize int, wg *sync.WaitGroup) (chan<- WorkerSignal, <-chan interface{})
type WorkerChannelLayer ¶
type WorkerChannelLayer interface { // Apply applies this layer to inChan and outChan asynchronously. Apply(ctx context.Context, inChan chan<- WorkerSignal, outChan <-chan interface{}, bufSize int, wg *sync.WaitGroup) (chan<- WorkerSignal, <-chan interface{}) }
WorkerChannelLayer represents a layer for worker channels to catch inChan/outChan outputs, apply some filter or conversion and then rethrow them.
type WorkerManager ¶
type WorkerManager struct {
// contains filtered or unexported fields
}
func NewWorkerManager ¶
func NewWorkerManager(worker models.Worker, bufSize int, layers ...WorkerChannelLayer) *WorkerManager
func (*WorkerManager) Close ¶
func (wm *WorkerManager) Close()
func (*WorkerManager) HandleOutput ¶
func (wm *WorkerManager) HandleOutput(h WorkerManagerOutHandler)
func (*WorkerManager) Receive ¶
func (wm *WorkerManager) Receive() interface{}
func (*WorkerManager) Send ¶
func (wm *WorkerManager) Send(s WorkerSignal)
func (*WorkerManager) Status ¶
func (wm *WorkerManager) Status() WorkerStatus
type WorkerManagerOutHandler ¶
type WorkerManagerOutHandler interface {
Handle(out interface{})
}
type WorkerSignal ¶
type WorkerSignal int
WorkerSignal is a signal sent to Worker. Worker should behave as per the content of it and respond.
const ( StartSignal WorkerSignal = iota StopSignal RestartSignal )
These constants indicate signal type sent to worker
func (WorkerSignal) String ¶
func (s WorkerSignal) String() string
String returns a text indicating a type of this WorkerSignal.
Example ¶
package main import ( "github.com/iwataka/mybot/backend/worker" "fmt" ) func main() { fmt.Println(worker.StartSignal) fmt.Println(worker.RestartSignal) fmt.Println(worker.StopSignal) fmt.Println(worker.WorkerSignal(-1)) }
Output: Start Restart Stop
type WorkerStatus ¶
type WorkerStatus int
WorkerStatus is a type indicating Worker status
const ( StatusStarted WorkerStatus = iota StatusStopped // StatusFinished means worker was finished successfully. StatusFinished )
These constants indicate status type of Worker
func (WorkerStatus) String ¶
func (s WorkerStatus) String() string
String returns a text to indicating a type of this WorkerStatus.
Example ¶
package main import ( "github.com/iwataka/mybot/backend/worker" "fmt" ) func main() { fmt.Println(worker.StatusFinished) fmt.Println(worker.StatusStarted) fmt.Println(worker.StatusStopped) fmt.Println(worker.WorkerStatus(-1)) }
Output: Finished Started Stopped