Documentation ¶
Overview ¶
Package wsqueue provides a framework over gorilla/mux and gorilla/websocket to operate kind of AMQP but over websockets. It offers a Server, a Client and two king of messaging protocols : Topics and Queues.
Topics : Publish & Subscribe Pattern
Publish and subscribe semantics are implemented by Topics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
Start a server and handle a topic
//Server side r := mux.NewRouter() s := wsqueue.NewServer(r, "") q := s.CreateTopic("myTopic") http.Handle("/", r) go http.ListenAndServe("0.0.0.0:9000", r) ... //Publish a message q.Publish("This is a message")
Start a client and listen on a topic
//Client slide go func() { c := &wsqueue.Client{ Protocol: "ws", Host: "localhost:9000", Route: "/", } cMessage, cError, err := c.Subscribe("myTopic") if err != nil { panic(err) } for { select { case m := <-cMessage: fmt.Println(m.String()) case e := <-cError: fmt.Println(e.Error()) } } }()
Queues : Work queues Pattern
The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to the queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them. Queues implement load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
Examples ¶
see samples/queue/main.go, samples/topic/main.go
Index ¶
Constants ¶
const ( //ACLSSchemeWorld scheme is a fully open scheme ACLSSchemeWorld ACLScheme = "WORLD" //ACLSSchemeDigest scheme represents a "manually" set group of authenticated users. ACLSSchemeDigest = "DIGEST" //ACLSSchemeIP scheme represents a "manually" set group of user authenticated by their IP address ACLSSchemeIP = "IP" )
Variables ¶
var Logfunc = log.Printf
Logfunc is a function that logs the provided message with optional fmt.Sprintf-style arguments. By default, logs to the default log.Logger. setting it to nil can be used to disable logging for this package. This doesn’t enforce a coupling with any specific external package and is already widely supported by existing loggers.
var Warnfunc = log.Printf
Warnfunc is a function that logs the provided message with optional fmt.Sprintf-style arguments. By default, logs to the default log.Logger. setting it to nil can be used to disable logging for this package. This doesn’t enforce a coupling with any specific external package and is already widely supported by existing loggers.
Functions ¶
This section is empty.
Types ¶
type ACEDigest ¶
type ACEDigest struct { Username string `json:"username,omitempty"` Password string `json:"password,omitempty"` }
ACEDigest aims to authenticate a user with a username and a password
type ACEIP ¶
type ACEIP struct {
IP string `json:"ip,omitempty"`
}
ACEIP aims to authenticate a user with a IP adress
type ACL ¶
type ACL []ACE
ACL stands for Access Control List. It's a slice of permission for a queue or a topic
type Client ¶
type Client struct { Protocol string Host string Route string // contains filtered or unexported fields }
Client is the wqueue entrypoint
type Fibonacci ¶
type Fibonacci struct {
// contains filtered or unexported fields
}
Fibonacci https://en.wikipedia.org/wiki/Fibonacci_number
func (*Fibonacci) NextDuration ¶
NextDuration returns the next Fibonacci number cast in the wanted duration
type Message ¶
Message message
func (*Message) ApplicationType ¶
ApplicationType returns application-type. Empty if content-type is not application/json
func (*Message) ContentType ¶
ContentType returns content-type
type Options ¶
type Options struct { ACL ACL `json:"acl,omitempty"` Storage StorageOptions `json:"storage,omitempty"` }
Options is options on topic or queues
type Queue ¶
type Queue struct { Options *Options `json:"options,omitempty"` Queue string `json:"topic,omitempty"` // contains filtered or unexported fields }
Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
type Server ¶
type Server struct { Router *mux.Router RoutePrefix string QueuesCounter *expvar.Int TopicsCounter *expvar.Int ClientsCounter *expvar.Int MessagesCounter *expvar.Int }
Server is a server
func (*Server) CreateQueue ¶
CreateQueue create queue
func (*Server) CreateTopic ¶
CreateTopic create topic
type Stack ¶
type Stack struct {
// contains filtered or unexported fields
}
Stack is a thread-safe "First In First Out" stack
func (*Stack) Get ¶
Get peeks at the n-th item in the stack. Unlike other operations, this one costs O(n).
func (*Stack) Peek ¶
func (s *Stack) Peek() interface{}
Peek returns but doesn't remove the top of the stack
type StorageDriver ¶
type StorageDriver interface { Open(options *Options) Push(data interface{}) Pop() interface{} }
StorageDriver is in-memory Stack or Redis server
type StorageOptions ¶
type StorageOptions map[string]interface{}
StorageOptions is a collection of options, see storage documentation
type Topic ¶
type Topic struct { Options *Options `json:"options,omitempty"` Topic string `json:"topic,omitempty"` OpenedConnectionHandler func(*Conn) `json:"-"` ClosedConnectionHandler func(*Conn) `json:"-"` OnMessageHandler func(*Conn, *Message) error `json:"-"` // contains filtered or unexported fields }
Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.