Documentation ¶
Overview ¶
Package bus implements a thin wrapper around nsq so that you can create publishers for topics and consumers for channels.
Publisher/Consumer
`Consumer` and `Publisher` are thin wrappers around the basic concept of nsq with small additions.
Functions ¶
A `Function` abstracts an asynchronous function which will be called by marshalling the parameters as JSON and invoke the corresponding function with nsq. You can start many services which implement the same function (identified by its name), so you will have something like a balancing. Note: As the functions are asynchronuous, you cannot return a result. Only errors are used to signal if the function was successful.
When you create a function with `Unique` the consumer will be connected to a unique, ephemeral topic and channel. Create unique functions if you want your services to respond with values. You create a unique function, transport this function name to the wellknown service and this service will call the unique function with the result.
If the process with the unique function ends, the topic and the channels will be removed from nsq because they are ephemeral.
Creating a named function
ep := NewEndpoints(...) f, err := ep.Function("hello-service", func (s string) error { fmt.Printf("Hello %s\n", s) return nil }) f("world"); // prints "Hello world"
creates a client and server for the function, depending on the endpoint. If there is a consumer in the endpoint, a server is registered; if there is a publisher a client is also created. So using an endpoint with consumer and publisher creates a function which will be hosted by the same process which creates the function.
When you only need a client for a wellknown function, use the `Client` function:
ep := NewEndpoints(...) f, err := ep.Client("hello-service") f("world")
In this case there should be another process which registered a consuming function.
Last but not least you can use `Unique` to create a unique consumer which only exists as long as the process exists which did the registration. Unique consumers return their name and they can be used to transport responses from well known services back to clients. The client has to register a unique consumer and pass the name of this function to the service which will post back the response back to the client.
Index ¶
- func ConfigureTLS(nsqCfg *nsq.Config, tlsCfg *TLSConfig)
- func CreateNSQConfig(tlsCfg *TLSConfig) *nsq.Config
- func LoadCertificate(path string) (*tls.Certificate, error)
- func TTL(ttl time.Duration) crOption
- func Timeout(timeout time.Duration, timeoutFunction OnTimeout) crOption
- type Consumer
- type ConsumerRegistration
- type Endpoints
- type Func
- type Function
- type Level
- type OnTimeout
- type Option
- type Publisher
- type PublisherConfig
- type Receiver
- type TLSConfig
- type TimeoutError
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConfigureTLS ¶
func ConfigureTLS(nsqCfg *nsq.Config, tlsCfg *TLSConfig)
ConfigureTLS configures the given NSQ configuration for TLS connections.
func CreateNSQConfig ¶
func CreateNSQConfig(tlsCfg *TLSConfig) *nsq.Config
CreateNSQConfig creates and configures a TLS enabled (if given TLS config != nil) NSQ configuration.
func LoadCertificate ¶
func LoadCertificate(path string) (*tls.Certificate, error)
LoadCertificate reads file, divides into key and certificates
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
A Consumer wraps the base configuration for the nsq connection
func NewConsumer ¶
NewConsumer returns a consumer and stores the addresses of the lookupd's.
func (*Consumer) MustRegister ¶
func (c *Consumer) MustRegister(topic, channel string) *ConsumerRegistration
type ConsumerRegistration ¶
type ConsumerRegistration struct {
// contains filtered or unexported fields
}
func (*ConsumerRegistration) Close ¶ added in v0.3.5
func (cr *ConsumerRegistration) Close() error
Close disconnects from all nsqd's or all nsq-lookupd's.
type Endpoints ¶ added in v0.3.5
type Endpoints struct {
// contains filtered or unexported fields
}
Endpoints couples a consumer and a publisher to a single entity.
func DirectEndpoints ¶ added in v0.3.5
func DirectEndpoints() *Endpoints
DirectEndpoints returns endpoints which call the target function directly. You do not need a running nsq for this. This can be used with unit tests. It should not be used in production code because the invocation of functions will not be persistent and will be delegated to the current running process by forking a goroutine to call the receiving function.
func NewEndpoints ¶ added in v0.3.5
NewEndpoints creates the Endpoints for the given publisher and consumer. If one of the values is nil, the function created by this endpoint can only be used for invoking or compute but not both. If both values are set, the function will be invoked by the same process which implements it otherwise you have some sort of client and server for the function.
func (*Endpoints) Client ¶ added in v0.3.5
Client returns a new function client for the function with the registered name.
func (*Endpoints) Function ¶ added in v0.3.5
Function creates a Function from the the given endpoints. The name of the function will be a distributed selector for the given go function. So every function which is registered with the same name can receive the invocation inside the cluster. The function must be a normal go function with one parameter and one result of type error:
ep := NewEndpoints(...) fn, f, err := ep.Function("hello", func (s string) error { fmt.Printf("Hello %s\n", s) return nil }) f("world"); // prints "Hello world" ... fn.Close()
The target function can receive structs or pointer to structs. Please notice that when using `DirectEndpoints` the parameters are not marshalled/unmarshalled via JSON, so using addresses can have side effects.
func (*Endpoints) Unique ¶ added in v0.3.5
Unique uses an unique, ephemeral topic so the topic will be deregistered when there is no consumer any more for this function. Use this function to create a unique receiver, so function invocations will not be distributed and the topic only exists as long as the registration process is active. The computed unique name of this function is returned so it can be used with the `Function` function to invoke it. You **must** supply a fn parameter, because a Unique function creates a new unique name which must dispatch to exact one receiver. If `fn` is nil, an error is returned.
type Function ¶ added in v0.3.5
type Function struct {
// contains filtered or unexported fields
}
A Function encapsulates a Func which can be called with an argument. The invocation will be delegated through nsq so multiple instances of the same function can run in different processes. Only one of them will be invoked.
type OnTimeout ¶
type OnTimeout func(err TimeoutError) error
OnTimeout function that is called in the case of timeout while handling the event
type Option ¶
func MaxInFlight ¶ added in v0.3.5
type Publisher ¶
type Publisher interface { Publish(topic string, data interface{}) error CreateTopic(topic string) error Stop() }
A Publisher is used for event publishing to topics. The fields Publish and CreateTopics can be overwritten to mock this publisher.
func NewPublisher ¶
func NewPublisher(zlog *slog.Logger, publisherCfg *PublisherConfig) (Publisher, error)
NewPublisher creates a new publisher to produce events for topics.
type PublisherConfig ¶
type PublisherConfig struct { TCPAddress string HTTPEndpoint string TLS *TLSConfig NSQ *nsq.Config }
A PublisherConfig represents the config of an NSQ publisher.
func (*PublisherConfig) ConfigureNSQ ¶
func (p *PublisherConfig) ConfigureNSQ()
ConfigureTLS configures the publisher regarding NSQ.
type Receiver ¶
type Receiver func(interface{}) error
A Receiver is a callback when you receive messages from the bus.
type TimeoutError ¶
type TimeoutError struct {
// contains filtered or unexported fields
}
func (TimeoutError) Error ¶
func (t TimeoutError) Error() string
func (TimeoutError) Event ¶
func (t TimeoutError) Event() interface{}