instructions

package
v0.3.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2022 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Convert

func Convert(ctx context.Context, msgs chan *model.ConsumerMessage, bufferSize int, compressor Compressor) chan *model.InstructionSet

Convert takes a channel containing incoming pulsar messages and returns a channel with the corresponding InstructionSets. Each pulsar message will generate exactly one InstructionSet.

func ConvertMsg

func ConvertMsg(ctx context.Context, msg *model.ConsumerMessage, compressor Compressor) *model.InstructionSet

ConvertMsg converts a pulsar message into an InstructionSet. An instructionSet will always be produced even if errors are encountered via parsing. In this case of errors, the resulting InstructionSet will contain all events that could be parsed, along with the mesageId of the original message. In the case that no events can be parsed (e.g. the message is not valid protobuf), an empty InstructionSet containing only the messageId will be returned.

Types

type Compressor

type Compressor interface {
	// Compress compresses the byte array
	Compress(b []byte) ([]byte, error)
}

Compressor is a fast, single threaded compressor. This type allows us to reuse buffers etc for performance

type NoOpCompressor

type NoOpCompressor struct {
}

NoOpCompressor is a Compressor that does nothing. Useful for tests.

func (*NoOpCompressor) Compress

func (c *NoOpCompressor) Compress(b []byte) ([]byte, error)

type ZlibCompressor

type ZlibCompressor struct {
	// contains filtered or unexported fields
}

ZlibCompressor compresses to Zlib, which for KB size payloads seems more (cpu) efficient than the newer formats such as zstd. The compressor will only compress if the msg is greater than minCompressSize

func NewZlibCompressor

func NewZlibCompressor(minCompressSize int) (*ZlibCompressor, error)

func (*ZlibCompressor) Compress

func (c *ZlibCompressor) Compress(b []byte) ([]byte, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL