Documentation ¶
Overview ¶
Package pool is a generic solution for async job dispatching from web server. While Go natively supports async jobs by using the keyword "go", but this may lead to several unwanted consequences. Suppose we have a typical http handler:
func Handle(req *http.Request, resp http.ResponseWriter) {}
If we dispatch async jobs using "go" like this:
func Handle(req *http.Request, resp http.ResponseWriter) { go AsyncWork() resp.Write([]byte("ok")) }
Let's go through all the disadvantages. First, the backpressure is lost. There is no way to limit the maximum goroutine the handler can create. clients can easily flood the server. Secondly, the graceful shutdown process is ruined. The http server can shutdown itself without losing any request, but the async jobs created with "go" are not protected by the server. You will lose all unfinished jobs once the server shuts down and program exits. lastly, the async job may want to access the original request context, maybe for tracing purposes. The request context terminates at the end of the request, so if you are not careful, the async jobs may be relying on a dead context.
Package pool creates a goroutine worker pool at beginning of the program, limits the maximum concurrency for you, shuts it down at the end of the request without losing any async jobs, and manages the context conversion for you.
Add the dependency to core:
var c *core.C = core.New() c.Provide(pool.Providers())
Then you can inject the pool into your http handler:
type Handler struct { pool *pool.Pool } func (h *Handler) ServeHTTP(req *http.Request, resp http.ResponseWriter) { pool.Go(request.Context(), AsyncWork(asyncContext)) resp.Write([]byte("ok")) }
Example ¶
package main import ( "context" "fmt" "net/http" "time" "github.com/DoNewsCode/core" "github.com/DoNewsCode/core/contract/lifecycle" "github.com/DoNewsCode/core/control/pool" "github.com/gorilla/mux" ) func main() { c := core.Default( core.WithInline("http.addr", ":9777"), core.WithInline("log.level", "none"), ) c.Provide(pool.Providers(pool.WithConcurrency(1))) c.Invoke(func(p *pool.Pool, dispatcher lifecycle.HTTPServerStart) { dispatcher.On(func(ctx context.Context, payload lifecycle.HTTPServerStartPayload) error { go func() { if _, err := http.Get("http://localhost:9777/"); err != nil { panic(err) } }() return nil }) c.AddModule(core.HttpFunc(func(router *mux.Router) { router.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) { p.Go(request.Context(), func(asyncContext context.Context) { select { case <-asyncContext.Done(): fmt.Println("async context cancelled") case <-time.After(time.Second): fmt.Println("async context will not be cancelled") } }) writer.Write(nil) }) })) }) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() c.Serve(ctx) }
Output: async context will not be cancelled
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Providers ¶
func Providers(options ...ProviderOptionFunc) di.Deps
Providers provide a *pool.Pool to the core.
Types ¶
type Counter ¶
type Counter struct {
// contains filtered or unexported fields
}
Counter is a collection of metrics in pool.Pool.
func NewCounter ¶
func (*Counter) IncAsyncJob ¶
func (c *Counter) IncAsyncJob()
IncAsyncJob records the async jobs count.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is an async worker pool. It can be used to dispatch the async jobs from web servers. See the package documentation about its advantage over creating a goroutine directly.
func NewPool ¶
func NewPool(options ...ProviderOptionFunc) *Pool
NewPool returned func(contract.Dispatcher) *Pool
func (*Pool) Go ¶
Go dispatchers a job to the async worker pool. requestContext is the context from http/grpc handler, and asyncContext is the context for async job handling. The asyncContext contains all values from requestContext, but its cancellation has nothing to do with the request. If the pool has reached max concurrency, the job will be executed in the current goroutine. In other word, the job will be executed synchronously.
func (*Pool) ProvideRunGroup ¶
ProvideRunGroup implements core.RunProvider
type ProviderOptionFunc ¶
type ProviderOptionFunc func(pool *Pool)
ProviderOptionFunc is the functional option to Providers.
func WithConcurrency ¶
func WithConcurrency(concurrency int) ProviderOptionFunc
WithConcurrency sets the maximum concurrency for the pool.
func WithCounter ¶
func WithCounter(counter *Counter) ProviderOptionFunc
WithCounter sets the counter for the pool.