pool

package
v0.13.1-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package pool is a generic solution for async job dispatching from web server. While Go natively supports async jobs by using the keyword "go", but this may lead to several unwanted consequences. Suppose we have a typical http handler:

func Handle(req *http.Request, resp http.ResponseWriter) {}

If we dispatch async jobs using "go" like this:

  func Handle(req *http.Request, resp http.ResponseWriter) {
    go AsyncWork()
	   resp.Write([]byte("ok"))
  }

Let's go through all the disadvantages. First, the backpressure is lost. There is no way to limit the maximum goroutine the handler can create. clients can easily flood the server. Secondly, the graceful shutdown process is ruined. The http server can shutdown itself without losing any request, but the async jobs created with "go" are not protected by the server. You will lose all unfinished jobs once the server shuts down and program exits. lastly, the async job may want to access the original request context, maybe for tracing purposes. The request context terminates at the end of the request, so if you are not careful, the async jobs may be relying on a dead context.

Package pool creates a goroutine worker pool at beginning of the program, limits the maximum concurrency for you, shuts it down at the end of the request without losing any async jobs, and manages the context conversion for you.

Add the dependency to core:

var c *core.C = core.New()
c.Provide(pool.Providers())

Then you can inject the pool into your http handler:

type Handler struct {
    pool *pool.Pool
}

func (h *Handler) ServeHTTP(req *http.Request, resp http.ResponseWriter) {
   pool.Go(request.Context(), AsyncWork(asyncContext))
   resp.Write([]byte("ok"))
}
Example
package main

import (
	"context"
	"fmt"
	"net/http"
	"time"

	"github.com/DoNewsCode/core"
	"github.com/DoNewsCode/core/contract/lifecycle"
	"github.com/DoNewsCode/core/control/pool"

	"github.com/gorilla/mux"
)

func main() {
	c := core.Default(
		core.WithInline("http.addr", ":9777"),
		core.WithInline("log.level", "none"),
	)
	c.Provide(pool.Providers(pool.WithConcurrency(1)))

	c.Invoke(func(p *pool.Pool, dispatcher lifecycle.HTTPServerStart) {
		dispatcher.On(func(ctx context.Context, payload lifecycle.HTTPServerStartPayload) error {
			go func() {
				if _, err := http.Get("http://localhost:9777/"); err != nil {
					panic(err)
				}
			}()
			return nil
		})
		c.AddModule(core.HttpFunc(func(router *mux.Router) {
			router.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
				p.Go(request.Context(), func(asyncContext context.Context) {
					select {
					case <-asyncContext.Done():
						fmt.Println("async context cancelled")
					case <-time.After(time.Second):
						fmt.Println("async context will not be cancelled")
					}
				})
				writer.Write(nil)
			})
		}))
	})

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	c.Serve(ctx)

}
Output:

async context will not be cancelled

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Providers

func Providers(options ...ProviderOptionFunc) di.Deps

Providers provide a *pool.Pool to the core.

Types

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

Counter is a collection of metrics in pool.Pool.

func NewCounter

func NewCounter(syncCounter, asyncCounter metrics.Counter) *Counter

func (*Counter) IncAsyncJob

func (c *Counter) IncAsyncJob()

IncAsyncJob records the async jobs count.

func (*Counter) IncSyncJob

func (c *Counter) IncSyncJob()

IncSyncJob records the sync jobs count.

func (*Counter) PoolName

func (c *Counter) PoolName(name string) *Counter

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is an async worker pool. It can be used to dispatch the async jobs from web servers. See the package documentation about its advantage over creating a goroutine directly.

func NewPool

func NewPool(options ...ProviderOptionFunc) *Pool

NewPool returned func(contract.Dispatcher) *Pool

func (*Pool) Go

func (p *Pool) Go(requestContext context.Context, function func(asyncContext context.Context))

Go dispatchers a job to the async worker pool. requestContext is the context from http/grpc handler, and asyncContext is the context for async job handling. The asyncContext contains all values from requestContext, but its cancellation has nothing to do with the request. If the pool has reached max concurrency, the job will be executed in the current goroutine. In other word, the job will be executed synchronously.

func (*Pool) Module

func (p *Pool) Module() interface{}

Module implements di.Modular

func (*Pool) ProvideRunGroup

func (p *Pool) ProvideRunGroup(group *run.Group)

ProvideRunGroup implements core.RunProvider

func (*Pool) Run

func (p *Pool) Run(ctx context.Context) error

Run starts the async worker pool and block until it finishes.

type ProviderOptionFunc

type ProviderOptionFunc func(pool *Pool)

ProviderOptionFunc is the functional option to Providers.

func WithConcurrency

func WithConcurrency(concurrency int) ProviderOptionFunc

WithConcurrency sets the maximum concurrency for the pool.

func WithCounter

func WithCounter(counter *Counter) ProviderOptionFunc

WithCounter sets the counter for the pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL