bus

package
v0.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2024 License: MIT Imports: 18 Imported by: 2

Documentation

Overview

Package bus implements a thin wrapper around nsq so that you can create publishers for topics and consumers for channels.

Publisher/Consumer

`Consumer` and `Publisher` are thin wrappers around the basic concept of nsq with small additions.

Functions

A `Function` abstracts an asynchronous function which will be called by marshalling the parameters as JSON and invoke the corresponding function with nsq. You can start many services which implement the same function (identified by its name), so you will have something like a balancing. Note: As the functions are asynchronuous, you cannot return a result. Only errors are used to signal if the function was successful.

When you create a function with `Unique` the consumer will be connected to a unique, ephemeral topic and channel. Create unique functions if you want your services to respond with values. You create a unique function, transport this function name to the wellknown service and this service will call the unique function with the result.

If the process with the unique function ends, the topic and the channels will be removed from nsq because they are ephemeral.

Creating a named function

ep := NewEndpoints(...)
f, err := ep.Function("hello-service", func (s string) error {
   fmt.Printf("Hello %s\n", s)
   return nil
})
f("world"); // prints "Hello world"

creates a client and server for the function, depending on the endpoint. If there is a consumer in the endpoint, a server is registered; if there is a publisher a client is also created. So using an endpoint with consumer and publisher creates a function which will be hosted by the same process which creates the function.

When you only need a client for a wellknown function, use the `Client` function:

ep := NewEndpoints(...)
f, err := ep.Client("hello-service")
f("world")

In this case there should be another process which registered a consuming function.

Last but not least you can use `Unique` to create a unique consumer which only exists as long as the process exists which did the registration. Unique consumers return their name and they can be used to transport responses from well known services back to clients. The client has to register a unique consumer and pass the name of this function to the service which will post back the response back to the client.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConfigureTLS

func ConfigureTLS(nsqCfg *nsq.Config, tlsCfg *TLSConfig)

ConfigureTLS configures the given NSQ configuration for TLS connections.

func CreateNSQConfig

func CreateNSQConfig(tlsCfg *TLSConfig) *nsq.Config

CreateNSQConfig creates and configures a TLS enabled (if given TLS config != nil) NSQ configuration.

func LoadCertificate

func LoadCertificate(path string) (*tls.Certificate, error)

LoadCertificate reads file, divides into key and certificates

func TTL

func TTL(ttl time.Duration) crOption

TTL specifies the maximum age of messages to accept. If a message is received that is older than the given ttl, it will be dropped.

func Timeout

func Timeout(timeout time.Duration, timeoutFunction OnTimeout) crOption

Timeout guards the event handler with a timeout, timeout 0 means no timeout. The optional timeoutFunction is called in the case of timeout while handling the event.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

A Consumer wraps the base configuration for the nsq connection

func NewConsumer

func NewConsumer(log *slog.Logger, tlsCfg *TLSConfig, lookupds ...string) (*Consumer, error)

NewConsumer returns a consumer and stores the addresses of the lookupd's.

func (*Consumer) MustRegister

func (c *Consumer) MustRegister(topic, channel string) *ConsumerRegistration

func (*Consumer) Register

func (c *Consumer) Register(topic, channel string) (*ConsumerRegistration, error)

func (*Consumer) With

func (c *Consumer) With(opts ...Option) *Consumer

type ConsumerRegistration

type ConsumerRegistration struct {
	// contains filtered or unexported fields
}

func (*ConsumerRegistration) Close added in v0.3.5

func (cr *ConsumerRegistration) Close() error

Close disconnects from all nsqd's or all nsq-lookupd's.

func (*ConsumerRegistration) Consume

func (cr *ConsumerRegistration) Consume(paramProto interface{}, recv Receiver, concurrent int, opts ...crOption) error

Consume a message

func (*ConsumerRegistration) Output

func (cr *ConsumerRegistration) Output(num int, msg string) error

FIXME: wtf is this

type Endpoints added in v0.3.5

type Endpoints struct {
	// contains filtered or unexported fields
}

Endpoints couples a consumer and a publisher to a single entity.

func DirectEndpoints added in v0.3.5

func DirectEndpoints() *Endpoints

DirectEndpoints returns endpoints which call the target function directly. You do not need a running nsq for this. This can be used with unit tests. It should not be used in production code because the invocation of functions will not be persistent and will be delegated to the current running process by forking a goroutine to call the receiving function.

func NewEndpoints added in v0.3.5

func NewEndpoints(consumer *Consumer, publisher Publisher) *Endpoints

NewEndpoints creates the Endpoints for the given publisher and consumer. If one of the values is nil, the function created by this endpoint can only be used for invoking or compute but not both. If both values are set, the function will be invoked by the same process which implements it otherwise you have some sort of client and server for the function.

func (*Endpoints) Client added in v0.3.5

func (e *Endpoints) Client(name string) (*Function, Func, error)

Client returns a new function client for the function with the registered name.

func (*Endpoints) Function added in v0.3.5

func (e *Endpoints) Function(name string, fn interface{}) (*Function, Func, error)

Function creates a Function from the the given endpoints. The name of the function will be a distributed selector for the given go function. So every function which is registered with the same name can receive the invocation inside the cluster. The function must be a normal go function with one parameter and one result of type error:

ep := NewEndpoints(...)
fn, f, err := ep.Function("hello", func (s string) error {
   fmt.Printf("Hello %s\n", s)
   return nil
})
f("world"); // prints "Hello world"
...
fn.Close()

The target function can receive structs or pointer to structs. Please notice that when using `DirectEndpoints` the parameters are not marshalled/unmarshalled via JSON, so using addresses can have side effects.

func (*Endpoints) Unique added in v0.3.5

func (e *Endpoints) Unique(name string, fn interface{}) (*Function, Func, string, error)

Unique uses an unique, ephemeral topic so the topic will be deregistered when there is no consumer any more for this function. Use this function to create a unique receiver, so function invocations will not be distributed and the topic only exists as long as the registration process is active. The computed unique name of this function is returned so it can be used with the `Function` function to invoke it. You **must** supply a fn parameter, because a Unique function creates a new unique name which must dispatch to exact one receiver. If `fn` is nil, an error is returned.

type Func added in v0.3.5

type Func func(interface{}) error

type Function added in v0.3.5

type Function struct {
	// contains filtered or unexported fields
}

A Function encapsulates a Func which can be called with an argument. The invocation will be delegated through nsq so multiple instances of the same function can run in different processes. Only one of them will be invoked.

func (*Function) Close added in v0.3.5

func (f *Function) Close() error

type Level

type Level int

Level specifies the severity of a given log message

const (
	Debug Level = iota
	Info
	Warning
	Error
)

Log levels

type OnTimeout

type OnTimeout func(err TimeoutError) error

OnTimeout function that is called in the case of timeout while handling the event

type Option

type Option func(registration *Consumer) *Consumer

func LogLevel

func LogLevel(v Level) Option

LogLevel maps between our loglevel and nsq loglevels

func MaxInFlight added in v0.3.5

func MaxInFlight(num int) Option

func NSQDs added in v0.3.5

func NSQDs(nsqds ...string) Option

type Publisher

type Publisher interface {
	Publish(topic string, data interface{}) error
	CreateTopic(topic string) error
	Stop()
}

A Publisher is used for event publishing to topics. The fields Publish and CreateTopics can be overwritten to mock this publisher.

func NewPublisher

func NewPublisher(zlog *slog.Logger, publisherCfg *PublisherConfig) (Publisher, error)

NewPublisher creates a new publisher to produce events for topics.

type PublisherConfig

type PublisherConfig struct {
	TCPAddress   string
	HTTPEndpoint string
	TLS          *TLSConfig
	NSQ          *nsq.Config
}

A PublisherConfig represents the config of an NSQ publisher.

func (*PublisherConfig) ConfigureNSQ

func (p *PublisherConfig) ConfigureNSQ()

ConfigureTLS configures the publisher regarding NSQ.

type Receiver

type Receiver func(interface{}) error

A Receiver is a callback when you receive messages from the bus.

type TLSConfig

type TLSConfig struct {
	CACertFile     string
	ClientCertFile string
}

A TLSConfig represents the TLS config of an NSQ publisher/consumer.

func (*TLSConfig) Inactive

func (cfg *TLSConfig) Inactive() bool

Inactive a TLSConfig considered inactive if neither a ca-cert nor a client-cert is present

type TimeoutError

type TimeoutError struct {
	// contains filtered or unexported fields
}

func (TimeoutError) Error

func (t TimeoutError) Error() string

func (TimeoutError) Event

func (t TimeoutError) Event() interface{}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL