freezer

package module
v0.0.0-...-9fca6f8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2022 License: MIT Imports: 14 Imported by: 1

README

freezer

Freezer is a simple message batch storage and replay interface, intended to provide message queue like semantics, but with a blob storage backend.

Messages are batched, optionally compressed and stored when written, and correspondingly uncompressed and unbatched when read back.

freezer uses straw as a blob storage abstraction.

Documentation

Index

Constants

View Source
const (
	DefaultMaxUnflushedTime = time.Second * 10
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CompressionType

type CompressionType int
const (
	CompressionTypeNone   CompressionType = 0
	CompressionTypeSnappy CompressionType = 1
	CompressionTypeZstd   CompressionType = 2
)

type ConsumerMessageHandler

type ConsumerMessageHandler func([]byte) error

type MessageSink

type MessageSink struct {
	// contains filtered or unexported fields
}

func NewMessageSink

func NewMessageSink(streamstore straw.StreamStore, config MessageSinkConfig) (*MessageSink, error)

func (*MessageSink) Close

func (mq *MessageSink) Close() error

func (*MessageSink) Flush

func (mq *MessageSink) Flush() error

func (*MessageSink) PutMessage

func (mq *MessageSink) PutMessage(m []byte) error

type MessageSinkAutoFlush

type MessageSinkAutoFlush struct {
	// contains filtered or unexported fields
}

func NewMessageAutoFlushSink

func NewMessageAutoFlushSink(streamstore straw.StreamStore, config MessageSinkAutoFlushConfig) (*MessageSinkAutoFlush, error)

func (*MessageSinkAutoFlush) Close

func (mq *MessageSinkAutoFlush) Close() error

func (*MessageSinkAutoFlush) PutMessage

func (mq *MessageSinkAutoFlush) PutMessage(m []byte) error

type MessageSinkAutoFlushConfig

type MessageSinkAutoFlushConfig struct {
	Path                 string
	MaxUnflushedTime     time.Duration
	MaxUnflushedMessages int
	CompressionType      CompressionType
}

type MessageSinkConfig

type MessageSinkConfig struct {
	Path            string
	CompressionType CompressionType
}

type MessageSource

type MessageSource struct {
	// contains filtered or unexported fields
}

func NewMessageSource

func NewMessageSource(streamstore straw.StreamStore, config MessageSourceConfig) *MessageSource

func (*MessageSource) ConsumeMessages

func (mq *MessageSource) ConsumeMessages(ctx context.Context, handler ConsumerMessageHandler) error

type MessageSourceConfig

type MessageSourceConfig struct {
	Path            string
	PollPeriod      time.Duration
	CompressionType CompressionType
}

type MyMessage

type MyMessage struct {
	CustomerID string
	Message    string
}

func (MyMessage) Marshal

func (m MyMessage) Marshal() ([]byte, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL