io

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2023 License: Apache-2.0, MIT Imports: 13 Imported by: 1

Documentation

Overview

Package io fork from github.com/ThreeDotsLabs/watermill-io@c24ff9d

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PayloadMarshalFunc

func PayloadMarshalFunc(topic string, msg *message.Message) ([]byte, error)

PayloadMarshalFunc dumps the message's payload, discarding the remaining fields of the message. The output is always terminated with EOL byte.

This basic unmarshaler function may be used e.g. to write just the message payloads to stdout or to a file, without cluttering the output with metadata and UUIDs.

func PayloadUnmarshalFunc

func PayloadUnmarshalFunc(topic string, b []byte) (*message.Message, error)

PayloadUnmarshalFunc puts the whole byte slice into the message's Payload. The UUID is generated from the byte slice by the SHA1 hash function.

func TimestampTopicPayloadMarshalFunc

func TimestampTopicPayloadMarshalFunc(topic string, msg *message.Message) ([]byte, error)

TimestampTopicPayloadMarshalFunc dumps the message's payload. Each message is prepended by the current timestamp and the topic. The output is always terminated with EOL byte.

This basic unmarshaler function may be used e.g. to write just the message payloads to stdout or to a file, without cluttering the output with metadata and UUIDs.

Types

type LosslessMarshaler

type LosslessMarshaler struct{}

LosslessMarshaler marshals/unmarshals messages using gob. As opposed to other (un)marshalers in this package, all the attributes of the message (UUID, metadata, ...) are preserved. However, the result is not easily readable by humans or other marshalers.

func (LosslessMarshaler) Marshal

func (m LosslessMarshaler) Marshal(topic string, msg *message.Message) ([]byte, error)

func (LosslessMarshaler) Unmarshal

func (m LosslessMarshaler) Unmarshal(topic string, b []byte) (*message.Message, error)

type MarshalMessageFunc

type MarshalMessageFunc func(topic string, msg *message.Message) ([]byte, error)

MarshalMessageFunc packages the message into a byte slice. The topic argument is there because some writers (i.e. loggers) might want to present the topic as part of their output.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher writes the messages to the underlying io.Writer. Its behaviour is highly customizable through the choice of the marshal function in config.

func NewPublisher

func NewPublisher(wc io.WriteCloser, config PublisherConfig, logger watermill.LoggerAdapter) (*Publisher, error)

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the underlying Writer. Trying to publish messages with a closed publisher will throw an error. Close is idempotent.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, topic string, messages ...*message.Message) error

Publish writes the messages to the underlying io.Writer.

type PublisherConfig

type PublisherConfig struct {
	// MarshalFunc transforms the Watermill messages into raw bytes for transport.
	// Its behavior may be dependent on the topic.
	MarshalFunc MarshalMessageFunc
}

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber reads bytes from its underlying io.Reader and interprets them as Watermill messages. It posts the messages on the output stream from Subscribe(). There are several ways in which Subscriber may interpret messages from the Reader, configurable by the unmarshal function in the config.

func NewSubscriber

func NewSubscriber(rc io.ReadCloser, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error)

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

type SubscriberConfig

type SubscriberConfig struct {
	// BufferSize configures how many bytes will be read at a time from the Subscriber's Reader.
	// Each message will be treated as having at most BufferSize bytes.
	// If 0, Subscriber works in delimiter mode - it scans for messages delimited by the MessageDelimiter byte.
	BufferSize int
	// MessageDelimiter is the byte that is expected to separate messages if BufferSize is equal to 0.
	MessageDelimiter byte

	// PollInterval is the time between polling for new messages if the last read was empty. Defaults to time.Second.
	PollInterval time.Duration

	// UnmarshalFunc transforms the raw bytes into a Watermill message. Its behavior may be dependent on the topic.
	UnmarshalFunc UnmarshalMessageFunc
}

type UnmarshalMessageFunc

type UnmarshalMessageFunc func(topic string, b []byte) (*message.Message, error)

UnmarshalMessageFunc restores the message from a byte slice. The topic argument is there to keep symmetry with MarshalMessageFunc, as some unmarshalers might restore the topic as well.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL