minikafka

package module
v0.0.0-...-3ea8f42 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2022 License: MIT Imports: 12 Imported by: 1

README

minikafka

minikafka is a durable queue under 500 LOC and within an order of magniutde of Kafka's performance.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

* Broker process is to be booted on each node of deployment.

func NewBroker

func NewBroker(opts ...BrokerConfig) *Broker

func (*Broker) Debugf

func (b *Broker) Debugf(format string, args ...interface{})

func (*Broker) Infof

func (b *Broker) Infof(format string, args ...interface{})

func (*Broker) Run

func (b *Broker) Run(ctx context.Context)

type BrokerConfig

type BrokerConfig func(b *Broker)

func BrokerPollingTimeout

func BrokerPollingTimeout(timeout time.Duration) BrokerConfig

func BrokerPublishPort

func BrokerPublishPort(port int) BrokerConfig

func BrokerStoreaDir

func BrokerStoreaDir(dir string) BrokerConfig

func BrokerSubscribePort

func BrokerSubscribePort(port int) BrokerConfig

type Connection

type Connection interface {
	SetDeadline(t time.Time) error
	Read(p []byte) (n int, err error)
	Write(b []byte) (int, error)
	Close() error
}

type Message

type Message struct {
	Topic   string
	Payload []byte
}

TODO if we can avoid gob altogether - just bytes as payload

type MessageReader

type MessageReader[T any] struct {
	// contains filtered or unexported fields
}

TODO should support copying to writer so we can copy or should just implement Reader but always return correct buckets MessageReader wraps a TCP stream to read/write valid Message.

func NewMessageReader

func NewMessageReader[T any](conn Connection) *MessageReader[T]

func (*MessageReader[_]) Close

func (r *MessageReader[_]) Close() error

func (*MessageReader[T]) Read

func (r *MessageReader[T]) Read() (*T, error)

func (*MessageReader[_]) ReadByte

func (r *MessageReader[_]) ReadByte() (byte, error)

func (*MessageReader[_]) ReadPayload

func (r *MessageReader[_]) ReadPayload() ([]byte, error)

If your message is just bytes, get just payload, no gob involved

func (*MessageReader[_]) ReadRaw

func (r *MessageReader[_]) ReadRaw() ([]byte, error)

func (*MessageReader[_]) SetDeadline

func (r *MessageReader[_]) SetDeadline(t time.Time) error

func (*MessageReader[T]) Write

func (r *MessageReader[T]) Write(msg *T) error

func (*MessageReader[T]) WriteBytes

func (r *MessageReader[T]) WriteBytes(data *[]byte) error

Directories

Path Synopsis
client module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL