bun

package module
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2024 License: MIT Imports: 5 Imported by: 0

README

bun.go

GoDoc

A no-nonsense ("bring your own toppings") framework for building services on top of NATS. Inspired by popular HTTP frameworks echo and gin.

Installation

go get github.com/dnbsd/bun.go

Usage

Request/Reply handling
import "github.com/dnbsd/bun.go"

b := bun.New(bun.Arguments{
    Servers: []string{
        "localhost",
    }
})

b.Subscribe("rpc.users.create", func(c *bun.Context){
    type request struct {
        Username string `json:"username"`
        Password string `json:"password"`
    }
    type response struct {
        ID uint64 `json:"id"`
    }

    var r request
    if err := c.BindJSON(&r); err != nil {
        return c.Error(err)
    }

    //
    // Do things with request
    //

    return c.JSON(response{
        ID: 1,
    })
})
Connection event handling
import "github.com/dnbsd/bun.go"

b := bun.New(bun.Arguments{
    Servers: []string{
        "localhost",
    }
})

b.ConnectedHandler = func(nc *nats.Conn) {
    println("connected")
}
b.ReconnectedHandler = func(nc *nats.Conn) {
    println("reconnected")
}
b.DisconnectedHandler = func(nc *nats.Conn, err error) {
    println("disconnected")
}
Stream handling
JetStream support is currently a work in progress.

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Arguments

type Arguments struct {
	// Name is an optional name label which will be sent to the server on CONNECT to identify the client.
	Name string
	// Servers is a configured set of servers which this client will use when attempting to connect.
	Servers []string
	// NoRandomize configures whether we will randomize the server pool.
	NoRandomize bool
	// MaxReconnect sets the number of reconnect attempts that will be tried before giving up. If negative,
	// then it will never give up trying to reconnect.
	// Defaults to 60.
	MaxReconnect int
	// ReconnectWait sets the time to backoff after attempting a reconnect to a server that we were already
	// connected to previously.
	// Defaults to 2s.
	ReconnectWait time.Duration
	// Timeout sets the timeout for a Dial operation on a connection.
	// Defaults to 2s.
	Timeout time.Duration
	// DrainTimeout sets the timeout for a Drain Operation to complete.
	// Defaults to 30s.
	DrainTimeout time.Duration
	// FlusherTimeout is the maximum time to wait for write operations to the underlying connection to complete
	// (including the flusher loop).
	// Defaults to 1m.
	FlusherTimeout time.Duration
	// PingInterval is the period at which the client will be sending ping commands to the server disabled if 0 or
	// negative.
	// Defaults to 2m.
	PingInterval time.Duration
	// MaxPingsOut is the maximum number of pending ping commands that can be awaiting a response before raising
	// an ErrStaleConnection error.
	// Defaults to 2.
	MaxPingsOut int
	// User sets the username to be used when connecting to the server.
	User string
	// Password sets the password to be used when connecting to a server.
	Password string
	// Token sets the token to be used when connecting to a server.
	Token string
	// For websocket connections, indicates to the server that the connection
	// supports compression. If the server does too, then data will be compressed.
	Compression bool
	// IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting
	// subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy).
	IgnoreAuthErrorAbort bool
	// SkipHostLookup skips the DNS lookup for the server hostname.
	SkipHostLookup bool
	// MsgChanCapacity is the size of the buffered channel used for ChanQueueSubscribe.
	// Defaults to 65536.
	MsgChanCapacity int
	// UserJWT sets the callback handler that will fetch a user's JWT.
	UserJWTHandler UserJWTHandlerFunc
	// SignatureCB designates the function used to sign the nonce presented from the server.
	SignatureHandler SignatureHandlerFunc
}

func (Arguments) GetDrainTimeout added in v0.2.0

func (a Arguments) GetDrainTimeout() time.Duration

func (Arguments) GetFlusherTimeout added in v0.2.0

func (a Arguments) GetFlusherTimeout() time.Duration

func (Arguments) GetMaxPingsOut added in v0.2.0

func (a Arguments) GetMaxPingsOut() int

func (Arguments) GetMaxReconnect added in v0.2.0

func (a Arguments) GetMaxReconnect() int

func (Arguments) GetMsgChanCapacity added in v0.2.0

func (a Arguments) GetMsgChanCapacity() int

func (Arguments) GetPingInterval added in v0.2.0

func (a Arguments) GetPingInterval() time.Duration

func (Arguments) GetReconnectWait added in v0.2.0

func (a Arguments) GetReconnectWait() time.Duration

func (Arguments) GetTimeout added in v0.2.0

func (a Arguments) GetTimeout() time.Duration

type Bun

type Bun struct {
	ConnectedHandler    ConnHandlerFunc
	ReconnectedHandler  ConnHandlerFunc
	DisconnectedHandler ConnErrHandlerFunc
	ErrorHandler        ErrorHandlerFunc
	// contains filtered or unexported fields
}

func New

func New(args Arguments) *Bun

func (*Bun) Start

func (s *Bun) Start(ctx context.Context) error

Start creates a connection to configured NATS servers, registers subscriptions, and starts the workers. The call blocks until there is a fatal error or the context is closed. Connection is drained before returning, which guarantees delivery of all received message to workers before exit.

func (*Bun) Subscribe

func (s *Bun) Subscribe(subject string, handlers ...HandlerFunc)

Subscribe subscribes a worker to a subject, passing received messages to registered handlers. Handlers are executed sequentially. Handler execution flow is interrupted if a handler returns non-nil error. It's safe to call this method after Start.

func (*Bun) SubscribeGroup

func (s *Bun) SubscribeGroup(subject, group string, handlers ...HandlerFunc)

SubscribeGroup subscribes a worker to a subject, passing received messages to registered handlers. Workers with the same group name will form a queue group and only one member of the group will be selected to receive a message. Handlers are executed sequentially. Handler execution flow is interrupted if a handler returns non-nil error. It's safe to call this method after Start.

type ConnErrHandlerFunc

type ConnErrHandlerFunc = nats.ConnErrHandler

type ConnHandlerFunc

type ConnHandlerFunc = nats.ConnHandler

type Context

type Context struct {
	// contains filtered or unexported fields
}

func (*Context) BindJSON

func (c *Context) BindJSON(v any) error

BindJSON unmarshals data from NATS message into v.

func (*Context) Blob

func (c *Context) Blob(b []byte) error

Blob sends bytes to a Reply topic.

func (*Context) Context added in v0.3.0

func (c *Context) Context() context.Context

Context returns the service context.

func (*Context) Error

func (c *Context) Error(err error) error

Error sends string value of an error to a Reply topic. If error is nil, the method has no effect.

func (*Context) JSON

func (c *Context) JSON(v any) error

JSON marshals data in v and sends the marshalled data to a Reply topic.

func (*Context) Message

func (c *Context) Message(msg *nats.Msg) error

Message sends message msg to a Reply topic.

func (*Context) String

func (c *Context) String(s string) error

String sends string to a Reply topic.

type ErrorHandlerFunc

type ErrorHandlerFunc func(error, *Context)

type HandlerFunc

type HandlerFunc func(*Context) error

type SignatureHandlerFunc added in v0.1.0

type SignatureHandlerFunc = nats.SignatureHandler

type UserJWTHandlerFunc added in v0.1.0

type UserJWTHandlerFunc = nats.UserJWTHandler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL