Documentation ¶
Index ¶
- Variables
- func CreateServer(port int, mux *http.ServeMux) *http.Server
- func HTTPAuthenticate(rw http.ResponseWriter, r *http.Request, pubsub *PubSub) (*User, IncomingReq, error)
- func HTTPErrorResponse(err error, errType int, rw http.ResponseWriter) error
- func RandomString(n int) string
- func Start(port int)
- type CreateUserResp
- type IncomingReq
- type ListKeysResp
- type Message
- type MessageResp
- type Persist
- type PersistCore
- type PersistMessageStruct
- type PersistSubscriberStruct
- type PersistUnit
- type PubSub
- func (pubsub *PubSub) Close() error
- func (pubsub *PubSub) CreateTopic(topicName string, user *User) (*Topic, error)
- func (pubsub *PubSub) FetchTopic(topicName string, user *User) (*Topic, error)
- func (pubsub *PubSub) GetTopic(topicName string, user *User) (topic *Topic, err error)
- func (pubsub *PubSub) GetUser(username, password string) (*User, error)
- func (pubsub *PubSub) PushWebhooks() error
- func (pubsub *PubSub) Tombstone(consideredStale, resurrectionOpportunity time.Duration) error
- type Responder
- type SSEAddRequester
- type SSEDistro
- type SSEResponse
- type Streamer
- type SubscribeResp
- type Subscriber
- type Subscribers
- type Topic
- type TopicResp
- type Topics
- type Underwriter
- func (uw *Underwriter) DeleteMessage() error
- func (uw *Underwriter) DeleteSubscriber() error
- func (uw *Underwriter) DeleteUser() error
- func (uw *Underwriter) GetMessage(messageID int, topicName string) (Message, error)
- func (uw *Underwriter) GetSubscriber(subscriberID string, messageID int, topicName string) (Subscriber, error)
- func (uw *Underwriter) GetUser(userID string) (User, error)
- func (uw *Underwriter) Launch() error
- func (uw *Underwriter) StreamMessages() (chan Streamer, error)
- func (uw *Underwriter) StreamSubscribers() (chan Streamer, error)
- func (uw *Underwriter) StreamUsers() (chan Streamer, error)
- func (uw *Underwriter) Switchboard() PersistCore
- func (uw *Underwriter) TidyUp() error
- func (uw *Underwriter) WriteMessage() error
- func (uw *Underwriter) WriteSubscriber() error
- func (uw *Underwriter) WriteUser() error
- type User
- func (user *User) AddCreatedDatestring(time.Time) string
- func (user User) GetCreatedDateTime() (time.Time, error)
- func (user *User) PullMessage(topic *Topic, messageID int) (Message, error)
- func (user *User) Subscribe(topic *Topic, pushURL string) error
- func (user *User) Unsubscribe(topic *Topic) error
- func (user *User) WriteToTopic(topic *Topic, message Message) (Message, error)
- type Users
Constants ¶
This section is empty.
Variables ¶
var PersistBase string
PersistBase gives the root directory location to which data should be persisted. Set by envar `STORE`
Functions ¶
func CreateServer ¶
CreateServer provides the standard HTTP server for the application. Intended for use with CreateMux
Note: WriteTimeout has been disabled (was: 5) to allow for correct Server Sent Events functionality. In future a secondary server should be employed in a goroutine for SSE to maintain stricter timeout controls on other requests
func HTTPAuthenticate ¶
func HTTPAuthenticate(rw http.ResponseWriter, r *http.Request, pubsub *PubSub) (*User, IncomingReq, error)
HTTPAuthenticate does the boilerplate check username and password work for incoming service queries
IncomingReq is the rolled up query including fields from body json and url query args (see getHTTPData)
func HTTPErrorResponse ¶
func HTTPErrorResponse(err error, errType int, rw http.ResponseWriter) error
HTTPErrorResponse responds correctly to http request errors in the handler function
func RandomString ¶
Types ¶
type CreateUserResp ¶
type CreateUserResp struct { Error string `json:"error,omitempty"` UUID string `json:"user_id,omitempty"` //SubscriptionCount is len(User.Subscriptions) SubscriptionCount int `json:"subscription_count"` Subscriptions map[string]string `json:"subscriptions,omitempty"` Created string `json:"created,omitempty"` }
CreateUserResp is the response from a create user request
type IncomingReq ¶
type IncomingReq struct { Username string `json:"username"` //Mandatory Password string `json:"password"` //Mandatory Topic string `json:"topic,omitempty"` //WebhookURL for push subscription to topic WebhookURL string `json:"webhook_url,omitempty"` //Message used for writing messages to services Message interface{} `json:"message,omitempty"` //MessageID used for pulling messages from topics MessageID int `json:"message_id,omitempty"` }
IncomingReq is the standard structure for message requests to the service
type ListKeysResp ¶
type ListKeysResp struct { Error string `json:"error,omitempty"` Topics []string `json:"topics,omitempty"` Users []string `json:"users,omitempty"` //not in use yet Subscribers []string `json:"subs,omitempty"` //not in use yet Count int `json:"count"` }
ListKeysResp is the response from a query requesting lists of entries such as /users/fetch, /topics/fetch, etc
type Message ¶
type Message struct { ID int `json:"id"` //sequence number Data interface{} `json:"data"` Created string `json:"created"` // contains filtered or unexported fields }
Message is a single message structure
func (*Message) AddCreatedDatestring ¶
AddCreatedDatestring adds the given time to the message Created field as a formatted string
type MessageResp ¶
type MessageResp struct { Error string `json:"error,omitempty"` Topic string `json:"topic_id,omitempty"` Message Message `json:"message,omitempty"` }
MessageResp is the response from Message orientated requests
type Persist ¶
type Persist interface { //Launch starts up goroutines Launch() error //OutputSwitchboard returns a PersistCore object // within which to send messages to Write... // and Delete... methods Switchboard() PersistCore //TidyUp is where to put the close down work. Usually // to close // files or databases. Usually used as `defer // Persist.TidyUp()` TidyUp() error //WriteUser adds a user to the persistence layer from // a User chan WriteUser() error //WriteSubscriber adds a subscriber to the persistence // layer from persistSubscriberStruct chan WriteSubscriber() error //WriteMessage adds a message to the persistence layer // with from persistMessageStruct WriteMessage() error //GetUseret returns a single user by userID string // (Which is also UsernameHash of the user) GetUser(string) (User, error) //GetSubscriberet returns a single subscriber by // subcriberID (which is also the userID attachted to // the subscriber),messageID and topicName GetSubscriber(string, int, string) (Subscriber, error) //GetMessage returns a single message by messageID and topicName GetMessage(int, string) (Message, error) //StreamUsers returns a chan through which it streams // all Users from the db StreamUsers() (chan Streamer, error) //StreamSubscribers returns a chan through which it streams all // Subscribers from the db StreamSubscribers() (chan Streamer, error) //StreamMessages returns a chan through which it streams all // Messages from the db StreamMessages() (chan Streamer, error) //DeleteUser accepts UserID which is the userhash string DeleteUser() error //DeleteSubscriber accepts subscriberID (the userID of the subscription), // topicName. // //Add messageID as -1 if not available. Func will they cycle through the topic and delete matches to subscriberID DeleteSubscriber() error //DeleteMessage accepts messageID and topicName DeleteMessage() error }
Persist is the interface for adding persistent storage
type PersistCore ¶
type PersistCore struct {
// contains filtered or unexported fields
}
PersistCore is the minimum fields an implementor of Persist should have
type PersistMessageStruct ¶
type PersistMessageStruct struct { Message Message //for saving TopicName string MessageID int //for deletions }
PersistMessageStruct is a channel object for sending // messages to be saved by the persist layer
type PersistSubscriberStruct ¶
type PersistSubscriberStruct struct { Subscriber Subscriber //for saving MessageID int TopicName string SubscriberID string //for deletions }
PersistSubscriberStruct is a channel object for sending // messages to be saved by the persist layer
type PersistUnit ¶
type PersistUnit int
PersistUnit is an Enum type for what needs to be persisted for the defulat Persit implementation for streamers
const ( //PersistUser gives an enum option for User using the PersistUnit type PersistUser PersistUnit = iota //PersistSubscriber gives an enum option for Subscriber using the PersistUnit type PersistSubscriber )
type PubSub ¶
PubSub is the core holder struct for the pubsub service
func CreateMux ¶
CreateMux builds the routing for the application. Intended for use with CreateServer
Verbs ------
Obtain : Get existing or create
Create : Create new or error if already exists
Fetch : Get existing or error if does not exists
Write : Write data to server
Pull : Read information from server by http request (pull) after subscribing to a pull agreement of event data
Subscribe/Unsubscribe : Setup or delete push/pull agreement
func (*PubSub) Close ¶
Close is the tidy-up script that should be used as a defer after calling getReady function
func (*PubSub) CreateTopic ¶
CreateTopic creates a topic or returns an error if already exists
func (*PubSub) FetchTopic ¶
FetchTopic fetches a topic or returns an error if not found
func (*PubSub) GetTopic ¶
GetTopic gets a topic. If it does not exist it creates a new topic using the User as the creator
func (*PubSub) GetUser ¶
GetUser maintains the user list
Returns existing user record if usernameHash and passwordHash match Otherwise creates new user if no match or return login error if password is no match to existing user with same username
func (*PubSub) PushWebhooks ¶
PushWebhooks runs through all topics and pushes messages to the subscribers as a Webhook service. Exponential backoff for non 201/200 unacknoledged pushes up to 1 hour attempt intervals.
Should run through continuously
func (*PubSub) Tombstone ¶
Tombstone cycles through and does tombstoning and deletion activities
ConsideredStale is the time duration after which an item is considered stale and okay to tombstone ¶
resurrectionOpportunity is the time duration after which a tombstoned item can be deleted. This leaves an opportunity between tombstoning and deletion to be saved (by becoming active again)
N.B. This function blocks all PubSub activity with a PubSub Lock - so should be run conservatively and opportunistically
type Responder ¶
type Responder interface {
// contains filtered or unexported methods
}
Responder are handler response objects with encoding methods
type SSEAddRequester ¶
type SSEAddRequester struct { ID string //userIP+randomstring hash Receiver chan SSEResponse }
SSEAddRequester is the struct sent to SSEDistro to add the client to the message updater. Used by mux to register itself for all message streams
type SSEDistro ¶
type SSEDistro struct { //Intake is the incoming message chan that needs to be fanned out to the requesters Intake chan SSEResponse //Intake is the chan messages are sent before fanout to SSE connections Requesters map[string]chan SSEResponse //Requesters has a clientID as key Add chan SSEAddRequester Cancel chan string //Cancel receives ClientID }
SSEDistro is the object that fans out Messages to SSE requesting clients
type SSEResponse ¶
type SSEResponse struct { TopicName string `json:"topic_name,omitempty"` Message Message `json:"message"` }
SSEResponse is the object sent to the client and identifies which topic the message came from
func (SSEResponse) String ¶
func (sse SSEResponse) String() string
String implements stringer to allow for proper formatting for SSE
type Streamer ¶
type Streamer struct { //Unit is User, Subscriber or Message. Topic can be // passed but is implicitly available in the Key Unit tombstoner //Key is the db key which contains id information for // proper restoration to active map // //In the format order (curly bracketed items are optional, square bracketed items are Unit type dependant): // {bucketName/}TopicName/MessageID[/SubscriberID] //or just: // {bucketName/}UserID Key string }
Streamer is the response object from Stream restore methods of the Persist interface
type SubscribeResp ¶
type SubscribeResp struct { Error string `json:"error,omitempty"` User string `json:"user_id"` Topic string `json:"topic_name"` Status string `json:"status"` //CanWrite shows if the requester User can write to the topic (userID // matches topic.Creator.ID) CanWrite bool `json:"writable"` }
SubscribeResp is the response form for Subscription orientated requests
type Subscriber ¶
type Subscriber struct { ID string //ID is the User.UUID UsernameHash string //UsernameHash is the User.UsernameHash to help access the user in Subscription based functions PushURL string //PushURL is the webhook URL to which to push messages Creator bool //Creator is whether or not the subscriber is the creator. Used for `restore` // contains filtered or unexported fields }
Subscriber is the setup of a subscriber to a topic
type Subscribers ¶
type Subscribers map[string]*Subscriber //Subscriber.ID against subscriber
Subscribers is a map of subscribers
type Topic ¶
type Topic struct { //ID int //sequential number--unused: safe to delete-- Creator string //Creater is a User ID for the creator user. Only User that can write to the topic Name string //user given name for the topic (sanitized) Messages map[int]Message //message queue PointerPositions map[int]Subscribers //pointer position against subscribers at that position PointerHead int //latest/highest Messages key/ID. // contains filtered or unexported fields }
Topic is the setup for topics
type TopicResp ¶
type TopicResp struct { Error string `json:"error,omitempty"` Topic string `json:"topic_name"` Status string `json:"status"` Creator string `json:"creator"` PointerHead int `json:"pointer_head"` //CanWrite shows if requester User can write to the topic (userID // matches topic.Creator.ID) CanWrite bool `json:"writable"` }
TopicResp is the response form for Topic orientated requests
type Underwriter ¶
type Underwriter struct { *PersistCore // contains filtered or unexported fields }
Underwriter is the default implementation of Persist
It stores all needed persisted files in the ./store directory
func NewUnderwriter ¶
func NewUnderwriter(pubsub *PubSub) (*Underwriter, error)
NewUnderwriter creates a new Underwriter instance that implements Persist
func (*Underwriter) DeleteMessage ¶
func (uw *Underwriter) DeleteMessage() error
DeleteMessage accepts messageID and topicName
func (*Underwriter) DeleteSubscriber ¶
func (uw *Underwriter) DeleteSubscriber() error
DeleteSubscriber accepts subscriberID (the userID of the subscription), messageID and topicName
Add messageID as -1 if not available. Func will they cycle through the topic and delete matches to subscriberID
func (*Underwriter) DeleteUser ¶
func (uw *Underwriter) DeleteUser() error
DeleteUser accepts UserID which is the userhash string
func (*Underwriter) GetMessage ¶
func (uw *Underwriter) GetMessage(messageID int, topicName string) (Message, error)
GetMessage returns a single message by messageID and topicName
func (*Underwriter) GetSubscriber ¶
func (uw *Underwriter) GetSubscriber(subscriberID string, messageID int, topicName string) (Subscriber, error)
GetSubscriberet returns a single subscriber by subcriberID (which is also the userID attachted to the subscriber),messageID and topicName
func (*Underwriter) GetUser ¶
func (uw *Underwriter) GetUser(userID string) (User, error)
GetUseret returns a single user by userID string (Which is also UsernameHash of the user)
func (*Underwriter) Launch ¶
func (uw *Underwriter) Launch() error
Launch spins up all goroutines required to stream writes and deletes
func (*Underwriter) StreamMessages ¶
func (uw *Underwriter) StreamMessages() (chan Streamer, error)
StreamMessages returns a chan through which it streams all Messages from the db
func (*Underwriter) StreamSubscribers ¶
func (uw *Underwriter) StreamSubscribers() (chan Streamer, error)
StreamSubscribers returns a chan through which it streams all Subscribers from the db
func (*Underwriter) StreamUsers ¶
func (uw *Underwriter) StreamUsers() (chan Streamer, error)
StreamUsers returns a chan through which it streams all Users from the db
func (*Underwriter) Switchboard ¶
func (uw *Underwriter) Switchboard() PersistCore
Switchboard returns the PersistCore field of the underlying Underwriter object to make access to channels to goroutines simpler
func (*Underwriter) TidyUp ¶
func (uw *Underwriter) TidyUp() error
TidyUp cleans up database connections before close. Must run after NewUnderwriter call as defer Persist.TidyUp()
func (*Underwriter) WriteMessage ¶
func (uw *Underwriter) WriteMessage() error
WriteMessage adds a message to the persistence layer
func (*Underwriter) WriteSubscriber ¶
func (uw *Underwriter) WriteSubscriber() error
WriteSubscriber adds a subscriber to the persistence layer
func (*Underwriter) WriteUser ¶
func (uw *Underwriter) WriteUser() error
WriteUser adds a user to the persistence layer
type User ¶
type User struct { UUID string //hash of Username+Password UsernameHash string PasswordHash string Subscriptions map[string]string //Topic Names key against pushURL Created string //Created is date user was created // contains filtered or unexported fields }
User is the struct of a user able to make a subscription
func (*User) AddCreatedDatestring ¶
AddCreatedDatestring adds the given time to the message Created field as a formatted string
func (User) GetCreatedDateTime ¶
GetCreatedDateTime fetches the created datetime string and parses it
func (*User) PullMessage ¶
PullMessage retrieves a message from the Topic message queue if the user is subscibed
func (*User) Subscribe ¶
Subscribe method subscribes the user to the given topic using the given pushURL. If no pushURL, subscription is pull type using the topic ID.
func (*User) Unsubscribe ¶
Unsubscribe helper function to unsubscribe a user from a topic