multi

package
v0.0.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2024 License: MIT Imports: 10 Imported by: 0

README

slog: Handler chaining, fanout, routing, failover, load balancing...

tag Go Version GoDoc Build Status Go report Coverage Contributors License

Design workflows of slog handlers:

  • fanout: distribute log.Record to multiple slog.Handler in parallel
  • pipeline: rewrite log.Record on the fly (eg: for privacy reason)
  • routing: forward log.Record to all matching slog.Handler
  • failover: forward log.Record to the first available slog.Handler
  • load balancing: increase log bandwidth by sending log.Record to a pool of slog.Handler

Here a simple workflow with both pipeline and fanout:

workflow example with pipeline and fanout

See also:

HTTP middlewares:

Loggers:

Log sinks:

🚀 Install

go get github.com/samber/slog-multi

Compatibility: go >= 1.21

No breaking changes will be made to exported APIs before v2.0.0.

⚠️ Use this library carefully, log processing can be very costly (!)

💡 Usage

GoDoc: https://pkg.go.dev/github.com/samber/slog-multi

Broadcast: slogmulti.Fanout()

Distribute logs to multiple slog.Handler in parallel.

import (
    slogmulti "github.com/samber/slog-multi"
    "log/slog"
)

func main() {
    logstash, _ := net.Dial("tcp", "logstash.acme:4242")    // use github.com/netbrain/goautosocket for auto-reconnect
    stderr := os.Stderr

    logger := slog.New(
        slogmulti.Fanout(
            slog.NewJSONHandler(logstash, &slog.HandlerOptions{}),  // pass to first handler: logstash over tcp
            slog.NewTextHandler(stderr, &slog.HandlerOptions{}),    // then to second handler: stderr
            // ...
        ),
    )

    logger.
        With(
            slog.Group("user",
                slog.String("id", "user-123"),
                slog.Time("created_at", time.Now()),
            ),
        ).
        With("environment", "dev").
        With("error", fmt.Errorf("an error")).
        Error("A message")
}

Stderr output:

time=2023-04-10T14:00:0.000000+00:00 level=ERROR msg="A message" user.id=user-123 user.created_at=2023-04-10T14:00:0.000000+00:00 environment=dev error="an error"

Netcat output:

{
	"time":"2023-04-10T14:00:0.000000+00:00",
	"level":"ERROR",
	"msg":"A message",
	"user":{
		"id":"user-123",
		"created_at":"2023-04-10T14:00:0.000000+00:00"
	},
	"environment":"dev",
	"error":"an error"
}
Routing: slogmulti.Router()

Distribute logs to all matching slog.Handler in parallel.

import (
    slogmulti "github.com/samber/slog-multi"
	slogslack "github.com/samber/slog-slack"
    "log/slog"
)

func main() {
    slackChannelUS := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-us"}.NewSlackHandler()
	slackChannelEU := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-eu"}.NewSlackHandler()
	slackChannelAPAC := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-apac"}.NewSlackHandler()

	logger := slog.New(
		slogmulti.Router().
			Add(slackChannelUS, recordMatchRegion("us")).
			Add(slackChannelEU, recordMatchRegion("eu")).
			Add(slackChannelAPAC, recordMatchRegion("apac")).
			Handler(),
	)

	logger.
		With("region", "us").
		With("pool", "us-east-1").
		Error("Server desynchronized")
}

func recordMatchRegion(region string) func(ctx context.Context, r slog.Record) bool {
	return func(ctx context.Context, r slog.Record) bool {
		ok := false

		r.Attrs(func(attr slog.Attr) bool {
			if attr.Key == "region" && attr.Value.Kind() == slog.KindString && attr.Value.String() == region {
				ok = true
				return false
			}

			return true
		})

		return ok
	}
}
Failover: slogmulti.Failover()

List multiple targets for a slog.Record instead of retrying on the same unavailable log management system.

import (
	"net"
    slogmulti "github.com/samber/slog-multi"
    "log/slog"
)


func main() {
	// ncat -l 1000 -k
	// ncat -l 1001 -k
	// ncat -l 1002 -k

    // list AZs
    // use github.com/netbrain/goautosocket for auto-reconnect
	logstash1, _ := net.Dial("tcp", "logstash.eu-west-3a.internal:1000")
	logstash2, _ := net.Dial("tcp", "logstash.eu-west-3b.internal:1000")
	logstash3, _ := net.Dial("tcp", "logstash.eu-west-3c.internal:1000")

	logger := slog.New(
		slogmulti.Failover()(
			slog.HandlerOptions{}.NewJSONHandler(logstash1, nil),    // send to this instance first
			slog.HandlerOptions{}.NewJSONHandler(logstash2, nil),    // then this instance in case of failure
			slog.HandlerOptions{}.NewJSONHandler(logstash3, nil),    // and finally this instance in case of double failure
		),
	)

	logger.
		With(
			slog.Group("user",
				slog.String("id", "user-123"),
				slog.Time("created_at", time.Now()),
			),
		).
		With("environment", "dev").
		With("error", fmt.Errorf("an error")).
		Error("A message")
}
Load balancing: slogmulti.Pool()

Increase log bandwidth by sending log.Record to a pool of slog.Handler.

import (
	"net"
    slogmulti "github.com/samber/slog-multi"
    "log/slog"
)

func main() {
	// ncat -l 1000 -k
	// ncat -l 1001 -k
	// ncat -l 1002 -k

    // list AZs
    // use github.com/netbrain/goautosocket for auto-reconnect
	logstash1, _ := net.Dial("tcp", "logstash.eu-west-3a.internal:1000")
	logstash2, _ := net.Dial("tcp", "logstash.eu-west-3b.internal:1000")
	logstash3, _ := net.Dial("tcp", "logstash.eu-west-3c.internal:1000")

	logger := slog.New(
		slogmulti.Pool()(
            // a random handler will be picked
			slog.HandlerOptions{}.NewJSONHandler(logstash1, nil),
			slog.HandlerOptions{}.NewJSONHandler(logstash2, nil),
			slog.HandlerOptions{}.NewJSONHandler(logstash3, nil),
		),
	)

	logger.
		With(
			slog.Group("user",
				slog.String("id", "user-123"),
				slog.Time("created_at", time.Now()),
			),
		).
		With("environment", "dev").
		With("error", fmt.Errorf("an error")).
		Error("A message")
}
Chaining: slogmulti.Pipe()

Rewrite log.Record on the fly (eg: for privacy reason).

func main() {
    // first middleware: format go `error` type into an object {error: "*myCustomErrorType", message: "could not reach https://a.b/c"}
    errorFormattingMiddleware := slogmulti.NewHandleInlineMiddleware(errorFormattingMiddleware)

    // second middleware: remove PII
    gdprMiddleware := NewGDPRMiddleware()

    // final handler
    sink := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{})

    logger := slog.New(
        slogmulti.
            Pipe(errorFormattingMiddleware).
            Pipe(gdprMiddleware).
            // ...
            Handler(sink),
    )

    logger.
        With(
            slog.Group("user",
                slog.String("id", "user-123"),
                slog.String("email", "user-123"),
                slog.Time("created_at", time.Now()),
            ),
        ).
        With("environment", "dev").
        Error("A message",
            slog.String("foo", "bar"),
            slog.Any("error", fmt.Errorf("an error")),
        )
}

Stderr output:

{
    "time":"2023-04-10T14:00:0.000000+00:00",
    "level":"ERROR",
    "msg":"A message",
    "user":{
        "id":"*******",
        "email":"*******",
        "created_at":"*******"
    },
    "environment":"dev",
    "foo":"bar",
    "error":{
        "type":"*myCustomErrorType",
        "message":"an error"
    }
}
Custom middleware

Middleware must match the following prototype:

type Middleware func(slog.Handler) slog.Handler

The example above uses:

Note: WithAttrs and WithGroup methods of custom middleware must return a new instance, instead of this.

Inline middleware

An "inline middleware" (aka. lambda), is a shortcut to middleware implementation, that hooks a single method and proxies others.

// hook `logger.Enabled` method
mdw := slogmulti.NewEnabledInlineMiddleware(func(ctx context.Context, level slog.Level, next func(context.Context, slog.Level) bool) bool{
    // [...]
    return next(ctx, level)
})
// hook `logger.Handle` method
mdw := slogmulti.NewHandleInlineMiddleware(func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error {
    // [...]
    return next(ctx, record)
})
// hook `logger.WithAttrs` method
mdw := slogmulti.NewWithAttrsInlineMiddleware(func(attrs []slog.Attr, next func([]slog.Attr) slog.Handler) slog.Handler{
    // [...]
    return next(attrs)
})
// hook `logger.WithGroup` method
mdw := slogmulti.NewWithGroupInlineMiddleware(func(name string, next func(string) slog.Handler) slog.Handler{
    // [...]
    return next(name)
})

A super inline middleware that hooks all methods.

Warning: you would rather implement your own middleware.

mdw := slogmulti.NewInlineMiddleware(
    func(ctx context.Context, level slog.Level, next func(context.Context, slog.Level) bool) bool{
        // [...]
        return next(ctx, level)
    },
    func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error{
        // [...]
        return next(ctx, record)
    },
    func(attrs []slog.Attr, next func([]slog.Attr) slog.Handler) slog.Handler{
        // [...]
        return next(attrs)
    },
    func(name string, next func(string) slog.Handler) slog.Handler{
        // [...]
        return next(name)
    },
)

🤝 Contributing

Don't hesitate ;)

# Install some dev dependencies
make tools

# Run tests
make test
# or
make watch-test

👤 Contributors

Contributors

💫 Show your support

Give a ⭐️ if this project helped you!

GitHub Sponsors

📝 License

Copyright © 2023 Samuel Berthe.

This project is MIT licensed.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Failover

func Failover() func(...slog.Handler) slog.Handler

Failover forward record to the first available slog.Handler

func Fanout

func Fanout(handlers ...slog.Handler) slog.Handler

Fanout distributes records to multiple slog.Handler in parallel

func Pool

func Pool() func(...slog.Handler) slog.Handler

Pool balances records between multiple slog.Handler in order to increase bandwidth. Uses a round robin strategy.

func Router

func Router() *router

Router forward record to all matching slog.Handler.

Types

type EnabledInlineMiddleware

type EnabledInlineMiddleware struct {
	// contains filtered or unexported fields
}

func (*EnabledInlineMiddleware) Enabled

func (h *EnabledInlineMiddleware) Enabled(ctx context.Context, level slog.Level) bool

Implements slog.Handler

func (*EnabledInlineMiddleware) Handle

func (h *EnabledInlineMiddleware) Handle(ctx context.Context, record slog.Record) error

Implements slog.Handler

func (*EnabledInlineMiddleware) WithAttrs

func (h *EnabledInlineMiddleware) WithAttrs(attrs []slog.Attr) slog.Handler

Implements slog.Handler

func (*EnabledInlineMiddleware) WithGroup

func (h *EnabledInlineMiddleware) WithGroup(name string) slog.Handler

Implements slog.Handler

type FailoverHandler

type FailoverHandler struct {
	// contains filtered or unexported fields
}

@TODO: implement round robin strategy ?

func (*FailoverHandler) Enabled

func (h *FailoverHandler) Enabled(ctx context.Context, l slog.Level) bool

Implements slog.Handler

func (*FailoverHandler) Handle

func (h *FailoverHandler) Handle(ctx context.Context, r slog.Record) error

Implements slog.Handler

func (*FailoverHandler) WithAttrs

func (h *FailoverHandler) WithAttrs(attrs []slog.Attr) slog.Handler

Implements slog.Handler

func (*FailoverHandler) WithGroup

func (h *FailoverHandler) WithGroup(name string) slog.Handler

Implements slog.Handler

type FanoutHandler

type FanoutHandler struct {
	// contains filtered or unexported fields
}

func (*FanoutHandler) Enabled

func (h *FanoutHandler) Enabled(ctx context.Context, l slog.Level) bool

Implements slog.Handler

func (*FanoutHandler) Handle

func (h *FanoutHandler) Handle(ctx context.Context, r slog.Record) error

Implements slog.Handler

func (*FanoutHandler) WithAttrs

func (h *FanoutHandler) WithAttrs(attrs []slog.Attr) slog.Handler

Implements slog.Handler

func (*FanoutHandler) WithGroup

func (h *FanoutHandler) WithGroup(name string) slog.Handler

Implements slog.Handler

type HandleInlineMiddleware

type HandleInlineMiddleware struct {
	// contains filtered or unexported fields
}

func (*HandleInlineMiddleware) Enabled

func (h *HandleInlineMiddleware) Enabled(ctx context.Context, level slog.Level) bool

Implements slog.Handler

func (*HandleInlineMiddleware) Handle

func (h *HandleInlineMiddleware) Handle(ctx context.Context, record slog.Record) error

Implements slog.Handler

func (*HandleInlineMiddleware) WithAttrs

func (h *HandleInlineMiddleware) WithAttrs(attrs []slog.Attr) slog.Handler

Implements slog.Handler

func (*HandleInlineMiddleware) WithGroup

func (h *HandleInlineMiddleware) WithGroup(name string) slog.Handler

Implements slog.Handler

type InlineMiddleware

type InlineMiddleware struct {
	// contains filtered or unexported fields
}

func (*InlineMiddleware) Enabled

func (h *InlineMiddleware) Enabled(ctx context.Context, level slog.Level) bool

Implements slog.Handler

func (*InlineMiddleware) Handle

func (h *InlineMiddleware) Handle(ctx context.Context, record slog.Record) error

Implements slog.Handler

func (*InlineMiddleware) WithAttrs

func (h *InlineMiddleware) WithAttrs(attrs []slog.Attr) slog.Handler

Implements slog.Handler

func (*InlineMiddleware) WithGroup

func (h *InlineMiddleware) WithGroup(name string) slog.Handler

Implements slog.Handler

type Middleware

type Middleware func(slog.Handler) slog.Handler

Middleware defines the handler used by slog.Handler as return value.

func NewEnabledInlineMiddleware

func NewEnabledInlineMiddleware(enabledFunc func(ctx context.Context, level slog.Level, next func(context.Context, slog.Level) bool) bool) Middleware

NewEnabledInlineMiddleware is shortcut to a middleware that implements only the `Enable` method.

func NewHandleInlineMiddleware

func NewHandleInlineMiddleware(handleFunc func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error) Middleware

NewHandleInlineMiddleware is a shortcut to a middleware that implements only the `Handle` method.

func NewInlineMiddleware

func NewInlineMiddleware(
	enabledFunc func(ctx context.Context, level slog.Level, next func(context.Context, slog.Level) bool) bool,
	handleFunc func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error,
	withAttrsFunc func(attrs []slog.Attr, next func([]slog.Attr) slog.Handler) slog.Handler,
	withGroupFunc func(name string, next func(string) slog.Handler) slog.Handler,
) Middleware

NewInlineMiddleware is a shortcut to a middleware that implements all methods.

func NewWithAttrsInlineMiddleware

func NewWithAttrsInlineMiddleware(withAttrsFunc func(attrs []slog.Attr, next func([]slog.Attr) slog.Handler) slog.Handler) Middleware

NewWithAttrsInlineMiddleware is a shortcut to a middleware that implements only the `WithAttrs` method.

func NewWithGroupInlineMiddleware

func NewWithGroupInlineMiddleware(withGroupFunc func(name string, next func(string) slog.Handler) slog.Handler) Middleware

NewWithGroupInlineMiddleware is a shortcut to a middleware that implements only the `WithAttrs` method.

type PipeBuilder

type PipeBuilder struct {
	// contains filtered or unexported fields
}

Pipe defines a chain of Middleware.

func Pipe

func Pipe(middlewares ...Middleware) *PipeBuilder

Pipe builds a chain of Middleware. Eg: rewrite log.Record on the fly for privacy reason.

func (*PipeBuilder) Handler

func (h *PipeBuilder) Handler(handler slog.Handler) slog.Handler

Implements slog.Handler

func (*PipeBuilder) Pipe

func (h *PipeBuilder) Pipe(middleware Middleware) *PipeBuilder

Implements slog.Handler

type PoolHandler

type PoolHandler struct {
	// contains filtered or unexported fields
}

func (*PoolHandler) Enabled

func (h *PoolHandler) Enabled(ctx context.Context, l slog.Level) bool

Implements slog.Handler

func (*PoolHandler) Handle

func (h *PoolHandler) Handle(ctx context.Context, r slog.Record) error

Implements slog.Handler

func (*PoolHandler) WithAttrs

func (h *PoolHandler) WithAttrs(attrs []slog.Attr) slog.Handler

Implements slog.Handler

func (*PoolHandler) WithGroup

func (h *PoolHandler) WithGroup(name string) slog.Handler

Implements slog.Handler

type RoutableHandler

type RoutableHandler struct {
	// contains filtered or unexported fields
}

@TODO: implement round robin strategy ?

func (*RoutableHandler) Enabled

func (h *RoutableHandler) Enabled(ctx context.Context, l slog.Level) bool

Implements slog.Handler

func (*RoutableHandler) Handle

func (h *RoutableHandler) Handle(ctx context.Context, r slog.Record) error

Implements slog.Handler

func (*RoutableHandler) WithAttrs

func (h *RoutableHandler) WithAttrs(attrs []slog.Attr) slog.Handler

Implements slog.Handler

func (*RoutableHandler) WithGroup

func (h *RoutableHandler) WithGroup(name string) slog.Handler

Implements slog.Handler

type TCPClient added in v0.0.4

type TCPClient struct {
	*net.TCPConn
	// contains filtered or unexported fields
}

func Dial added in v0.0.4

func Dial(network, addr string) (*TCPClient, error)

Dial 返回一个新的 *TCPClient。

新的客户端连接到网络 `network` 上的远程地址 `raddr`, 该网络必须是 "tcp"、"tcp4" 或 "tcp6"。

func DialTCP added in v0.0.4

func DialTCP(network string, laddr, raddr *net.TCPAddr) (*TCPClient, error)

func (*TCPClient) SetMaxRetries added in v0.0.4

func (c *TCPClient) SetMaxRetries(maxRetries int)

func (*TCPClient) SetRetryInterval added in v0.0.4

func (c *TCPClient) SetRetryInterval(retryInterval time.Duration)

func (*TCPClient) Write added in v0.0.4

func (c *TCPClient) Write(b []byte) (int, error)

type WithAttrsInlineMiddleware

type WithAttrsInlineMiddleware struct {
	// contains filtered or unexported fields
}

func (*WithAttrsInlineMiddleware) Enabled

func (h *WithAttrsInlineMiddleware) Enabled(ctx context.Context, level slog.Level) bool

Implements slog.Handler

func (*WithAttrsInlineMiddleware) Handle

func (h *WithAttrsInlineMiddleware) Handle(ctx context.Context, record slog.Record) error

Implements slog.Handler

func (*WithAttrsInlineMiddleware) WithAttrs

func (h *WithAttrsInlineMiddleware) WithAttrs(attrs []slog.Attr) slog.Handler

Implements slog.Handler

func (*WithAttrsInlineMiddleware) WithGroup

func (h *WithAttrsInlineMiddleware) WithGroup(name string) slog.Handler

Implements slog.Handler

type WithGroupInlineMiddleware

type WithGroupInlineMiddleware struct {
	// contains filtered or unexported fields
}

func (*WithGroupInlineMiddleware) Enabled

func (h *WithGroupInlineMiddleware) Enabled(ctx context.Context, level slog.Level) bool

Implements slog.Handler

func (*WithGroupInlineMiddleware) Handle

func (h *WithGroupInlineMiddleware) Handle(ctx context.Context, record slog.Record) error

Implements slog.Handler

func (*WithGroupInlineMiddleware) WithAttrs

func (h *WithGroupInlineMiddleware) WithAttrs(attrs []slog.Attr) slog.Handler

Implements slog.Handler

func (*WithGroupInlineMiddleware) WithGroup

func (h *WithGroupInlineMiddleware) WithGroup(name string) slog.Handler

Implements slog.Handler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL