wpool

package module
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2022 License: MIT Imports: 5 Imported by: 0

README

go-wpool

Go Reference Go Report Card Coverage Pipeline

Golang package for making a pool of workers.

Pool example:
package main

import (
	"fmt"
	"sync"

	"github.com/egnd/go-wpool/v2"
	"github.com/egnd/go-wpool/v2/interfaces"
	"github.com/rs/zerolog"
)

func main() {
	// create pipeline and pool
	pipeline := make(chan interfaces.Worker)
	pool := wpool.NewPipelinePool(pipeline, 
		wpool.NewZerologAdapter(zerolog.New()),
	)
	defer pool.Close()

	// add few workers
	pool.AddWorker(wpool.NewPipelineWorker(pipeline))
	pool.AddWorker(wpool.NewPipelineWorker(pipeline))

	// put some tasks to pool
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		if err := pool.AddTask(&SomeTask{&wg, "task"+fmt.Sprint(i)}); err != nil {
			panic(err)
		}
	}

	// wait for tasks to be completed
	wg.Wait()
}
Sticky pool example (tasks with the same ID will be processed by the same worker):
package main

import (
	"fmt"
	"sync"

	"github.com/egnd/go-wpool/v2"
	"github.com/egnd/go-wpool/v2/interfaces"
	"github.com/rs/zerolog"
)

func main() {
	// create pool
	pool := wpool.NewStickyPool(
		wpool.NewZerologAdapter(zerolog.Nop())
	)
	defer pool.Close()

	// add few workers
	buffSize := 100
	pool.AddWorker(wpool.NewWorker(buffSize))
	pool.AddWorker(wpool.NewWorker(buffSize))

	// put some tasks to pool
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		if err := pool.AddTask(&SomeTask{&wg, "task"+fmt.Sprint(i)}); err != nil {
			panic(err)
		}
	}

	// wait for tasks to be completed
	wg.Wait()
}

Documentation

Overview

Package wpool contains structs and functions for making a pool of workers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type PipelinePool

type PipelinePool struct {
	// contains filtered or unexported fields
}

PipelinePool is a pool of workers with pipeline.

func NewPipelinePool

func NewPipelinePool(pipeline chan interfaces.Worker, logger interfaces.Logger) *PipelinePool

NewPipelinePool creates pool of workers with pipeline.

func (*PipelinePool) AddTask

func (p *PipelinePool) AddTask(task interfaces.Task) (err error)

AddTask is putting task into pool.

func (*PipelinePool) AddWorker

func (p *PipelinePool) AddWorker(worker interfaces.Worker) (err error)

AddWorker is registering worker in the pool.

func (*PipelinePool) Close

func (p *PipelinePool) Close() (err error)

Close is stopping pool and workers.

type PipelineWorker

type PipelineWorker struct {
	// contains filtered or unexported fields
}

PipelineWorker struct for handling tasks.

func NewPipelineWorker

func NewPipelineWorker(pipeline chan<- interfaces.Worker) *PipelineWorker

NewPipelineWorker creates workers for pool pipeline.

func (*PipelineWorker) Close

func (w *PipelineWorker) Close() (err error)

Close is stopping worker.

func (*PipelineWorker) Do

func (w *PipelineWorker) Do(task interfaces.Task) (err error)

Do is putting task to worker queue.

type StickyPool

type StickyPool struct {
	// contains filtered or unexported fields
}

StickyPool is a pool of "sticky" workers.

func NewStickyPool

func NewStickyPool(logger interfaces.Logger) *StickyPool

NewStickyPool creates pool of "sticky" workers.

func (*StickyPool) AddTask

func (p *StickyPool) AddTask(task interfaces.Task) (err error)

AddTask is putting task into pool.

func (*StickyPool) AddWorker

func (p *StickyPool) AddWorker(worker interfaces.Worker) (err error)

AddWorker is registering worker in the pool.

func (*StickyPool) Close

func (p *StickyPool) Close() (err error)

Close is stopping pool and workers.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker struct for handling tasks.

func NewWorker

func NewWorker(buffSize int) *Worker

NewWorker creates workers with tasks queue.

func (*Worker) Close

func (w *Worker) Close() error

Close is stopping worker.

func (*Worker) Do

func (w *Worker) Do(task interfaces.Task) (err error)

Do is putting task to worker queue.

type ZerologAdapter

type ZerologAdapter struct {
	// contains filtered or unexported fields
}

ZerologAdapter adapter for zerolog logger.

func NewZerologAdapter

func NewZerologAdapter(logger zerolog.Logger) *ZerologAdapter

NewZerologAdapter creates adapter for zerolog logger.

func (ZerologAdapter) Errorf

func (l ZerologAdapter) Errorf(err error, msg string, args ...interface{})

Errorf logging error.

func (ZerologAdapter) Infof

func (l ZerologAdapter) Infof(msg string, args ...interface{})

Infof logging info.

Directories

Path Synopsis
Package interfaces contains interfaces for wpool package.
Package interfaces contains interfaces for wpool package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL