sordini

package module
v0.0.0-...-3c545b3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2023 License: MIT Imports: 13 Imported by: 0

README

sordini

This project started as fork of Jocko.

The changes remove functionality that was present in Jocko, and extends it in very specific ways to make it fit the needs of having an embeddable kafka protocol message sink.

It allows receiving messages from a Kafka producer, and exposes them to the embedding program via a callback.

All message persistence, or attempts to make it work as a distributed log have been removed.

What's in a name

Sordini is a character in Franz Kafka's novel "The Castle," who serves as an intermediary between various parties. Like Sordini, this project facilitates the exchange of messages and information, acting as an intermediary by receiving messages from a Kafka producer and making them available through a callback function. Though Sordini is part of a complex bureaucratic system, his role in the novel highlights the importance of communication and the flow of information, which is central to the functionality of the project.

Additionally in italian, "sordini" is the plural form of "sordino," which translates to "mute" or "damper" in English. These terms are often used in the context of musical instruments to muffle or dampen the sound. This is a nod to the fact that the project receives messages from the Kafka protocol but does not send information back to kafka consumers, thereby acting as a "mute" or "one-way" communication system.

License

sordini is released under the MIT license, see the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTopicExists            = errors.New("topic exists already")
	ErrInvalidArgument        = errors.New("no logger set")
	OffsetsTopicName          = "__consumer_offsets"
	OffsetsTopicNumPartitions = 50
)

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker represents a broker in a Jocko cluster, like a broker in a Kafka cluster.

func NewBroker

func NewBroker(addr string) (*Broker, error)

New is used to instantiate a new broker.

func (*Broker) CurrentOffset

func (b *Broker) CurrentOffset(topic string, partition int32) int64

func (*Broker) Leave

func (b *Broker) Leave() error

Leave is used to prepare for a graceful shutdown.

func (*Broker) NewestOffset

func (b *Broker) NewestOffset(topic string, partition int32) int64

func (*Broker) RegisterCallback

func (b *Broker) RegisterCallback(cb Callback)

func (*Broker) Run

func (b *Broker) Run(ctx context.Context, requests <-chan *Context, responses chan<- *Context)

Run starts a loop to handle requests send back responses.

func (*Broker) Shutdown

func (b *Broker) Shutdown() error

type Callback

type Callback func(topic string, offset int64, partition int32, key []byte, msg []byte)

Callback is the function that is called when a message is received.

type Context

type Context struct {
	// contains filtered or unexported fields
}

func (*Context) Deadline

func (ctx *Context) Deadline() (deadline time.Time, ok bool)

func (*Context) Done

func (ctx *Context) Done() <-chan struct{}

func (*Context) Err

func (ctx *Context) Err() error

func (*Context) Request

func (ctx *Context) Request() interface{}

func (*Context) Response

func (ctx *Context) Response() interface{}

func (*Context) ResponseToBytes

func (ctx *Context) ResponseToBytes() ([]byte, error)

func (*Context) Value

func (ctx *Context) Value(key interface{}) interface{}

type Handler

type Handler interface {
	Run(context.Context, <-chan *Context, chan<- *Context)
	RegisterCallback(Callback)
	Leave() error
	Shutdown() error
}

Broker is the interface that wraps the Broker's methods.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is used to handle the TCP connections, decode requests, defer to the broker, and encode the responses.

func NewServer

func NewServer(addr string) (*Server, error)

func (*Server) Addr

func (s *Server) Addr() net.Addr

Addr returns the address on which the Server is listening

func (*Server) Leave

func (s *Server) Leave() error

func (*Server) RegisterCallback

func (s *Server) RegisterCallback(cb Callback)

func (*Server) Shutdown

func (s *Server) Shutdown() error

Shutdown closes the service.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start starts the service.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL