watermill

package module
v1.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: MIT Imports: 16 Imported by: 576

README

Watermill

CI Status Go Reference Go Report Card codecov

Watermill is a Go library for working efficiently with message streams. It is intended for building event driven applications, enabling event sourcing, RPC over messages, sagas and basically whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog if that fits your use case.

Goals

  • Easy to understand.
  • Universal - event-driven architecture, messaging, stream processing, CQRS - use it for whatever you need.
  • Fast (see Benchmarks).
  • Flexible with middlewares, plugins and Pub/Sub configurations.
  • Resilient - using proven technologies and passing stress tests (see Stability).

Getting Started

Pick what you like the best or see in order:

  1. Follow the Getting Started guide.
  2. See examples below.
  3. Read the full documentation: https://watermill.io/

Our online hands-on training

Examples

Background

Building distributed and scalable services is rarely as easy as some may suggest. There is a lot of hidden knowledge that comes with writing such systems. Just like you don't need to know the whole TCP stack to create a HTTP REST server, you shouldn't need to study all of this knowledge to start with building message-driven applications.

Watermill's goal is to make communication with messages as easy to use as HTTP routers. It provides the tools needed to begin working with event-driven architecture and allows you to learn the details on the go.

At the heart of Watermill there is one simple interface:

func(*Message) ([]*Message, error)

Your handler receives a message and decides whether to publish new message(s) or return an error. What happens next is up to the middlewares you've chosen.

You can find more about our motivations in our Introducing Watermill blog post.

Pub/Subs

All publishers and subscribers have to implement an interface:

type Publisher interface {
	Publish(topic string, messages ...*Message) error
	Close() error
}

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
	Close() error
}

Supported Pub/Subs:

All Pub/Subs implementation documentation can be found in the documentation.

Unofficial libraries

Can't find your favorite Pub/Sub or library integration? Check Awesome Watermill.

If you know another library or are an author of one, please add it to the list.

Contributing

Please check our contributing guide.

Stability

Watermill v1.0.0 has been released and is production-ready. The public API is stable and will not change without changing the major version.

To ensure that all Pub/Subs are stable and safe to use in production, we created a set of tests that need to pass for each of the implementations before merging to master. All tests are also executed in stress mode - that means that we are running all the tests 20x in parallel.

All tests are run with the race condition detector enabled (-race flag in tests).

For more information about debugging tests, you should check tests troubleshooting guide.

Benchmarks

Initial tools for benchmarking Pub/Subs can be found in watermill-benchmark.

All benchmarks are being done on a single 16 CPU VM instance, running one binary and dependencies in Docker Compose.

These numbers are meant to serve as a rough estimate of how fast messages can be processed by different Pub/Subs. Keep in mind that the results can be vastly different, depending on the setup and configuration (both much lower and higher).

Here's the short version for message size of 16 bytes.

Pub/Sub Publish (messages / s) Subscribe (messages / s)
GoChannel 315,776 138,743
Redis Streams 59,158 12,134
NATS Jetstream (16 Subscribers) 50,668 34,713
Kafka (one node) 41,492 101,669
SQL (MySQL, batch size=100) 6,371 2,794
SQL (PostgreSQL, batch size=1) 2,831 9,460
Google Cloud Pub/Sub 3,027 28,589
AMQP (RabbitMQ) 2,770 14,604

Support

If you didn't find the answer to your question in the documentation, feel free to ask us directly!

Please join us on the #watermill channel on the Three Dots Labs Discord.

Why the name?

It processes streams!

License

MIT License

Documentation

Overview

Watermill is a Golang library for working efficiently with message streams.

It is intended for building event driven applications, enabling event sourcing, RPC over messages, sagas and basically whatever else comes to your mind.

You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog if that fits your use case.

Website with detailed documentation: https://watermill.io/

Getting started guide: https://watermill.io/docs/getting-started/

Index

Constants

View Source
const LevelTrace = slog.LevelDebug - 4

LevelTrace must be added, because slog package does not have one by default. Generate it by subtracting 4 levels from slog.Debug following the example of slog.LevelWarn and slog.LevelError which are set to 4 and 8.

Variables

This section is empty.

Functions

func NewShortUUID added in v0.3.0

func NewShortUUID() string

NewShortUUID returns a new short UUID.

func NewULID added in v0.3.0

func NewULID() string

NewULID returns a new ULID.

func NewUUID added in v0.3.0

func NewUUID() string

NewUUID returns a new UUID Version 4.

Types

type CaptureLoggerAdapter added in v0.3.0

type CaptureLoggerAdapter struct {
	// contains filtered or unexported fields
}

CaptureLoggerAdapter is a logger which captures all logs. This logger is mostly useful for testing logging.

func NewCaptureLogger added in v0.3.0

func NewCaptureLogger() *CaptureLoggerAdapter

func (*CaptureLoggerAdapter) Captured added in v0.3.0

func (c *CaptureLoggerAdapter) Captured() map[LogLevel][]CapturedMessage

func (*CaptureLoggerAdapter) Debug added in v0.3.0

func (c *CaptureLoggerAdapter) Debug(msg string, fields LogFields)

func (*CaptureLoggerAdapter) Error added in v0.3.0

func (c *CaptureLoggerAdapter) Error(msg string, err error, fields LogFields)

func (*CaptureLoggerAdapter) Has added in v0.3.0

func (*CaptureLoggerAdapter) HasError added in v0.3.0

func (c *CaptureLoggerAdapter) HasError(err error) bool

func (*CaptureLoggerAdapter) Info added in v0.3.0

func (c *CaptureLoggerAdapter) Info(msg string, fields LogFields)

func (*CaptureLoggerAdapter) PrintCaptured added in v1.4.3

func (c *CaptureLoggerAdapter) PrintCaptured(t Logfer)

func (*CaptureLoggerAdapter) Trace added in v0.3.0

func (c *CaptureLoggerAdapter) Trace(msg string, fields LogFields)

func (*CaptureLoggerAdapter) With added in v0.3.0

type CapturedMessage added in v0.3.0

type CapturedMessage struct {
	Level  LogLevel
	Time   time.Time
	Fields LogFields
	Msg    string
	Err    error
}

func (CapturedMessage) ContentEquals added in v1.4.3

func (c CapturedMessage) ContentEquals(other CapturedMessage) bool

type LogFields

type LogFields map[string]interface{}

LogFields is the logger's key-value list of fields.

func (LogFields) Add

func (l LogFields) Add(newFields LogFields) LogFields

Add adds new fields to the list of LogFields.

func (LogFields) Copy added in v0.3.0

func (l LogFields) Copy() LogFields

Copy copies the LogFields.

type LogLevel added in v0.3.0

type LogLevel uint
const (
	TraceLogLevel LogLevel = iota + 1
	DebugLogLevel
	InfoLogLevel
	ErrorLogLevel
)

type Logfer added in v1.4.3

type Logfer interface {
	Logf(format string, a ...interface{})
}

type LoggerAdapter

type LoggerAdapter interface {
	Error(msg string, err error, fields LogFields)
	Info(msg string, fields LogFields)
	Debug(msg string, fields LogFields)
	Trace(msg string, fields LogFields)
	With(fields LogFields) LoggerAdapter
}

LoggerAdapter is an interface, that you need to implement to support Watermill logging. You can use StdLoggerAdapter as a reference implementation.

func NewSlogLogger added in v1.3.5

func NewSlogLogger(logger *slog.Logger) LoggerAdapter

NewSlogLogger creates an adapter to the standard library's structured logging package. A `nil` logger is substituted for the result of slog.Default.

func NewSlogLoggerWithLevelMapping added in v1.4.0

func NewSlogLoggerWithLevelMapping(logger *slog.Logger, watermillLevelToSlog map[slog.Level]slog.Level) LoggerAdapter

NewSlogLoggerWithLevelMapping creates an adapter to the standard library's structured logging package. A `nil` logger is substituted for the result of slog.Default. The `watermillLevelToSlog` parameter is a map that maps Watermill's log levels to the levels of the structured logger. It's helpful, when want to for example log Watermill's info logs as debug in slog.

func NewStdLogger

func NewStdLogger(debug, trace bool) LoggerAdapter

NewStdLogger creates StdLoggerAdapter which sends all logs to stderr.

func NewStdLoggerWithOut added in v0.3.0

func NewStdLoggerWithOut(out io.Writer, debug bool, trace bool) LoggerAdapter

NewStdLoggerWithOut creates StdLoggerAdapter which sends all logs to provided io.Writer.

type NopLogger

type NopLogger struct{}

NopLogger is a logger which discards all logs.

func (NopLogger) Debug

func (NopLogger) Debug(msg string, fields LogFields)

func (NopLogger) Error

func (NopLogger) Error(msg string, err error, fields LogFields)

func (NopLogger) Info

func (NopLogger) Info(msg string, fields LogFields)

func (NopLogger) Trace

func (NopLogger) Trace(msg string, fields LogFields)

func (NopLogger) With added in v0.3.0

func (l NopLogger) With(fields LogFields) LoggerAdapter

type SlogLoggerAdapter added in v1.3.5

type SlogLoggerAdapter struct {
	// contains filtered or unexported fields
}

SlogLoggerAdapter wraps slog.Logger.

func (*SlogLoggerAdapter) Debug added in v1.3.5

func (s *SlogLoggerAdapter) Debug(msg string, fields LogFields)

Debug logs a message to slog.LevelDebug.

func (*SlogLoggerAdapter) Error added in v1.3.5

func (s *SlogLoggerAdapter) Error(msg string, err error, fields LogFields)

Error logs a message to slog.LevelError.

func (*SlogLoggerAdapter) Info added in v1.3.5

func (s *SlogLoggerAdapter) Info(msg string, fields LogFields)

Info logs a message to slog.LevelInfo.

func (*SlogLoggerAdapter) Trace added in v1.3.5

func (s *SlogLoggerAdapter) Trace(msg string, fields LogFields)

Trace logs a message to LevelTrace.

func (*SlogLoggerAdapter) With added in v1.3.5

func (s *SlogLoggerAdapter) With(fields LogFields) LoggerAdapter

With return a SlogLoggerAdapter with a set of fields injected into all consequent logging messages.

type StdLoggerAdapter

type StdLoggerAdapter struct {
	ErrorLogger *log.Logger
	InfoLogger  *log.Logger
	DebugLogger *log.Logger
	TraceLogger *log.Logger
	// contains filtered or unexported fields
}

StdLoggerAdapter is a logger implementation, which sends all logs to provided standard output.

func (*StdLoggerAdapter) Debug

func (l *StdLoggerAdapter) Debug(msg string, fields LogFields)

func (*StdLoggerAdapter) Error

func (l *StdLoggerAdapter) Error(msg string, err error, fields LogFields)

func (*StdLoggerAdapter) Info

func (l *StdLoggerAdapter) Info(msg string, fields LogFields)

func (*StdLoggerAdapter) Trace

func (l *StdLoggerAdapter) Trace(msg string, fields LogFields)

func (*StdLoggerAdapter) With added in v0.3.0

func (l *StdLoggerAdapter) With(fields LogFields) LoggerAdapter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL