bus

module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2024 License: MIT

README

BUS: The Kafka event bus framework

coverage

Bus gopher logo

This framework gives you a convenient mechanism for using Kafka. You do not need more cycles and care for commits in your code, just connect and use.

Some libraries are used under the hood, but they are not exported directly, their own abstractions are used instead.

To date, libraries are used:

Examples

go get github.com/korableg/bus

This repository has a predefined codec wire. You may use this or create your own.

To use the wire codec you have to create event contracts proto file which should use grpc options with a topic and a key fields. See examples in the test package.

Producer
p, err := producer.New(wireCodec.Encoder(), brokers)
if err != nil {
    return err
}

// Async sending
err = p.Send(ctx, &events.YourEvent{})
if err != nil {
    return err
}

// Sync sending
err = p.SendSync(ctx, &events.YourEvent{})
if err != nil {
    return err
}

// You have to close producer before closing your application to prevent lose of the messages in the buffer
err = p.Close()
if err != nil {
    return err
}
Consumer
c, err := consumer.NewGroup(brokers, "test")
if err != nil {
    return err
}

err = cons.AddHandler(consumer.NewHandler(wireCodec.Decoder[*events.YourEvent](),
    func(ctx context.Context, msg *events.YourEvent, raw consumer.Message) error {
        // handle your message
        return nil
    }))
if err != nil {
    return err
}

err = c.AddHandler(consumer.NewHandlerBatch(wireCodec.Decoder[*events.YourEvent](),
    func(ctx context.Context, msgs []*events.YourEvent, raws []consumer.Message) error {
        // handle your messages
        return nil
    }))
if err != nil {
    return err
}
Options

Producer, consumer and handler have an options to manipulate them behavior.

Commit policy

The message is considered processed after exiting the handler, regardless of whether it was processed with an error or not.
The single message handler resets the offset to commit immediately after exiting the function. The batch handler does not transmit offsets until it flushes the buffer of accumulated messages.
If the group has several handlers on the same topic, then the minimum possible offset from all handlers is committed.

Retry policy

By default, all handlers have endlessly trying to process the message until err!= nil. You may change this behavior by setting the handler options or mark error as a permanent.

Contributing

You are welcome! ♥️

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL