Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Convert ¶
func Convert(ctx context.Context, msgs chan *model.ConsumerMessage, bufferSize int, compressor Compressor) chan *model.InstructionSet
Convert takes a channel containing incoming pulsar messages and returns a channel with the corresponding InstructionSets. Each pulsar message will generate exactly one InstructionSet.
func ConvertMsg ¶
func ConvertMsg(ctx context.Context, msg *model.ConsumerMessage, compressor Compressor) *model.InstructionSet
ConvertMsg converts a pulsar message into an InstructionSet. An instructionSet will always be produced even if errors are encountered via parsing. In this case of errors, the resulting InstructionSet will contain all events that could be parsed, along with the mesageId of the original message. In the case that no events can be parsed (e.g. the message is not valid protobuf), an empty InstructionSet containing only the messageId will be returned.
Types ¶
type Compressor ¶
type Compressor interface { // Compress compresses the byte array Compress(b []byte) ([]byte, error) }
Compressor is a fast, single threaded compressor. This type allows us to reuse buffers etc for performance
type NoOpCompressor ¶
type NoOpCompressor struct { }
NoOpCompressor is a Compressor that does nothing. Useful for tests.
type ZlibCompressor ¶
type ZlibCompressor struct {
// contains filtered or unexported fields
}
ZlibCompressor compresses to Zlib, which for KB size payloads seems more (cpu) efficient than the newer formats such as zstd. The compressor will only compress if the msg is greater than minCompressSize
func NewZlibCompressor ¶
func NewZlibCompressor(minCompressSize int) (*ZlibCompressor, error)