pubsub

package
v0.0.0-...-a846597 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2022 License: GPL-3.0 Imports: 16 Imported by: 0

README

pubsub

The components library for building PubSub servers

Code organisation

A part of module github.com/CDennis-CR/PubSub

Implementations for interfaces will be located in their own packages under the pkg directory.

N.B. formatted with gofmt -s -w pkg/pubsub/*.goin alignment with the Go Report Card recommendations

Documentation

Index

Constants

This section is empty.

Variables

View Source
var PersistBase string

PersistBase gives the root directory location to which data should be persisted. Set by envar `STORE`

Functions

func CreateServer

func CreateServer(port int, mux *http.ServeMux) *http.Server

CreateServer provides the standard HTTP server for the application. Intended for use with CreateMux

Note: WriteTimeout has been disabled (was: 5) to allow for correct Server Sent Events functionality. In future a secondary server should be employed in a goroutine for SSE to maintain stricter timeout controls on other requests

func HTTPAuthenticate

func HTTPAuthenticate(rw http.ResponseWriter, r *http.Request, pubsub *PubSub) (*User, IncomingReq, error)

HTTPAuthenticate does the boilerplate check username and password work for incoming service queries

IncomingReq is the rolled up query including fields from body json and url query args (see getHTTPData)

func HTTPErrorResponse

func HTTPErrorResponse(err error, errType int, rw http.ResponseWriter) error

HTTPErrorResponse responds correctly to http request errors in the handler function

func RandomString

func RandomString(n int) string

func Start

func Start(port int)

Start is the super easy API from running the PubSub from code

Types

type CreateUserResp

type CreateUserResp struct {
	Error string `json:"error,omitempty"`
	UUID  string `json:"user_id,omitempty"`
	//SubscriptionCount is len(User.Subscriptions)
	SubscriptionCount int               `json:"subscription_count"`
	Subscriptions     map[string]string `json:"subscriptions,omitempty"`
	Created           string            `json:"created,omitempty"`
}

CreateUserResp is the response from a create user request

type IncomingReq

type IncomingReq struct {
	Username string `json:"username"` //Mandatory
	Password string `json:"password"` //Mandatory
	Topic    string `json:"topic,omitempty"`
	//WebhookURL for push subscription to topic
	WebhookURL string `json:"webhook_url,omitempty"`
	//Message used for writing messages to services
	Message interface{} `json:"message,omitempty"`
	//MessageID used for pulling messages from topics
	MessageID int `json:"message_id,omitempty"`
}

IncomingReq is the standard structure for message requests to the service

type ListKeysResp

type ListKeysResp struct {
	Error       string   `json:"error,omitempty"`
	Topics      []string `json:"topics,omitempty"`
	Users       []string `json:"users,omitempty"` //not in use yet
	Subscribers []string `json:"subs,omitempty"`  //not in use yet
	Count       int      `json:"count"`
}

ListKeysResp is the response from a query requesting lists of entries such as /users/fetch, /topics/fetch, etc

type Message

type Message struct {
	ID      int         `json:"id"` //sequence number
	Data    interface{} `json:"data"`
	Created string      `json:"created"`
	// contains filtered or unexported fields
}

Message is a single message structure

func (*Message) AddCreatedDatestring

func (message *Message) AddCreatedDatestring(time.Time) string

AddCreatedDatestring adds the given time to the message Created field as a formatted string

func (Message) GetCreatedDateTime

func (message Message) GetCreatedDateTime() (time.Time, error)

GetCreatedDateTime fetches the created datetime string and parses it

type MessageResp

type MessageResp struct {
	Error   string  `json:"error,omitempty"`
	Topic   string  `json:"topic_id,omitempty"`
	Message Message `json:"message,omitempty"`
}

MessageResp is the response from Message orientated requests

type Persist

type Persist interface {
	//Launch starts up goroutines
	Launch() error
	//OutputSwitchboard returns a PersistCore object
	// within which to send messages to Write...
	// and Delete... methods
	Switchboard() PersistCore
	//TidyUp is where to put the close down work. Usually
	// to close
	// files or databases. Usually used as `defer
	// Persist.TidyUp()`
	TidyUp() error
	//WriteUser adds a user to the persistence layer from
	// a User chan
	WriteUser() error
	//WriteSubscriber adds a subscriber to the persistence
	// layer from persistSubscriberStruct chan
	WriteSubscriber() error
	//WriteMessage adds a message to the persistence layer
	// with from persistMessageStruct
	WriteMessage() error
	//GetUseret returns a single user by userID string
	// (Which is also UsernameHash of the user)
	GetUser(string) (User, error)
	//GetSubscriberet returns a single subscriber by
	// subcriberID (which is also the userID attachted to
	// the subscriber),messageID and topicName
	GetSubscriber(string, int, string) (Subscriber, error)
	//GetMessage returns a single message by messageID and topicName
	GetMessage(int, string) (Message, error)
	//StreamUsers returns a chan through which it streams
	// all Users from the db
	StreamUsers() (chan Streamer, error)
	//StreamSubscribers returns a chan through which it streams all
	// Subscribers from the db
	StreamSubscribers() (chan Streamer, error)
	//StreamMessages returns a chan through which it streams all
	// Messages from the db
	StreamMessages() (chan Streamer, error)
	//DeleteUser accepts UserID which is the userhash string
	DeleteUser() error
	//DeleteSubscriber accepts subscriberID (the userID of the subscription),
	// topicName.
	//
	//Add messageID as -1 if not available. Func will they cycle through the topic and delete matches to subscriberID
	DeleteSubscriber() error
	//DeleteMessage accepts messageID and topicName
	DeleteMessage() error
}

Persist is the interface for adding persistent storage

type PersistCore

type PersistCore struct {
	// contains filtered or unexported fields
}

PersistCore is the minimum fields an implementor of Persist should have

type PersistMessageStruct

type PersistMessageStruct struct {
	Message   Message //for saving
	TopicName string
	MessageID int //for deletions
}

PersistMessageStruct is a channel object for sending // messages to be saved by the persist layer

type PersistSubscriberStruct

type PersistSubscriberStruct struct {
	Subscriber   Subscriber //for saving
	MessageID    int
	TopicName    string
	SubscriberID string //for deletions
}

PersistSubscriberStruct is a channel object for sending // messages to be saved by the persist layer

type PersistUnit

type PersistUnit int

PersistUnit is an Enum type for what needs to be persisted for the defulat Persit implementation for streamers

const (
	//PersistUser gives an enum option for User using the PersistUnit type
	PersistUser PersistUnit = iota
	//PersistSubscriber gives an enum option for Subscriber using the PersistUnit type
	PersistSubscriber
)

type PubSub

type PubSub struct {
	Topics Topics
	Users  Users
	// contains filtered or unexported fields
}

PubSub is the core holder struct for the pubsub service

func CreateMux

func CreateMux() (*http.ServeMux, *PubSub)

CreateMux builds the routing for the application. Intended for use with CreateServer

Verbs ------

Obtain : Get existing or create

Create : Create new or error if already exists

Fetch : Get existing or error if does not exists

Write : Write data to server

Pull : Read information from server by http request (pull) after subscribing to a pull agreement of event data

Subscribe/Unsubscribe : Setup or delete push/pull agreement

func (*PubSub) Close

func (pubsub *PubSub) Close() error

Close is the tidy-up script that should be used as a defer after calling getReady function

func (*PubSub) CreateTopic

func (pubsub *PubSub) CreateTopic(topicName string, user *User) (*Topic, error)

CreateTopic creates a topic or returns an error if already exists

func (*PubSub) FetchTopic

func (pubsub *PubSub) FetchTopic(topicName string, user *User) (*Topic, error)

FetchTopic fetches a topic or returns an error if not found

func (*PubSub) GetTopic

func (pubsub *PubSub) GetTopic(topicName string, user *User) (topic *Topic, err error)

GetTopic gets a topic. If it does not exist it creates a new topic using the User as the creator

func (*PubSub) GetUser

func (pubsub *PubSub) GetUser(username, password string) (*User, error)

GetUser maintains the user list

Returns existing user record if usernameHash and passwordHash match Otherwise creates new user if no match or return login error if password is no match to existing user with same username

func (*PubSub) PushWebhooks

func (pubsub *PubSub) PushWebhooks() error

PushWebhooks runs through all topics and pushes messages to the subscribers as a Webhook service. Exponential backoff for non 201/200 unacknoledged pushes up to 1 hour attempt intervals.

Should run through continuously

func (*PubSub) Tombstone

func (pubsub *PubSub) Tombstone(consideredStale, resurrectionOpportunity time.Duration) error

Tombstone cycles through and does tombstoning and deletion activities

ConsideredStale is the time duration after which an item is considered stale and okay to tombstone

resurrectionOpportunity is the time duration after which a tombstoned item can be deleted. This leaves an opportunity between tombstoning and deletion to be saved (by becoming active again)

N.B. This function blocks all PubSub activity with a PubSub Lock - so should be run conservatively and opportunistically

type Responder

type Responder interface {
	// contains filtered or unexported methods
}

Responder are handler response objects with encoding methods

type SSEAddRequester

type SSEAddRequester struct {
	ID       string //userIP+randomstring hash
	Receiver chan SSEResponse
}

SSEAddRequester is the struct sent to SSEDistro to add the client to the message updater. Used by mux to register itself for all message streams

type SSEDistro

type SSEDistro struct {
	//Intake is the incoming message chan that needs to be fanned out to the requesters
	Intake     chan SSEResponse            //Intake is the chan messages are sent before fanout to SSE connections
	Requesters map[string]chan SSEResponse //Requesters has a clientID as key
	Add        chan SSEAddRequester
	Cancel     chan string //Cancel receives ClientID
}

SSEDistro is the object that fans out Messages to SSE requesting clients

func SSENewDistro

func SSENewDistro() SSEDistro

SSENewDistro creates a new SSEDistro

func (*SSEDistro) Routine

func (distro *SSEDistro) Routine()

Routine is the goroutine that fans out Messages to SSE connections and deals with new fanout receipiants and client removals

Fans out ALL messages to SSE clients. Must be filtered at mux.

type SSEResponse

type SSEResponse struct {
	TopicName string  `json:"topic_name,omitempty"`
	Message   Message `json:"message"`
}

SSEResponse is the object sent to the client and identifies which topic the message came from

func (SSEResponse) String

func (sse SSEResponse) String() string

String implements stringer to allow for proper formatting for SSE

type Streamer

type Streamer struct {
	//Unit is User, Subscriber or Message. Topic can be
	// passed but is implicitly available in the Key
	Unit tombstoner
	//Key is the db key which contains id information for
	// proper restoration to active map
	//
	//In the format order (curly bracketed items are optional, square bracketed items are Unit type dependant):
	// {bucketName/}TopicName/MessageID[/SubscriberID]
	//or just:
	// {bucketName/}UserID
	Key string
}

Streamer is the response object from Stream restore methods of the Persist interface

type SubscribeResp

type SubscribeResp struct {
	Error  string `json:"error,omitempty"`
	User   string `json:"user_id"`
	Topic  string `json:"topic_name"`
	Status string `json:"status"`
	//CanWrite shows if the requester User can write to the topic (userID
	// matches topic.Creator.ID)
	CanWrite bool `json:"writable"`
}

SubscribeResp is the response form for Subscription orientated requests

type Subscriber

type Subscriber struct {
	ID           string //ID is the User.UUID
	UsernameHash string //UsernameHash is the User.UsernameHash to help access the user in Subscription based functions
	PushURL      string //PushURL is the webhook URL to which to push messages

	Creator bool //Creator is whether or not the subscriber is the creator. Used for `restore`
	// contains filtered or unexported fields
}

Subscriber is the setup of a subscriber to a topic

type Subscribers

type Subscribers map[string]*Subscriber //Subscriber.ID against subscriber

Subscribers is a map of subscribers

type Topic

type Topic struct {
	//ID               int                 //sequential number--unused: safe to delete--
	Creator          string              //Creater is a User ID for the creator user. Only User that can write to the topic
	Name             string              //user given name for the topic (sanitized)
	Messages         map[int]Message     //message queue
	PointerPositions map[int]Subscribers //pointer position against subscribers at that position
	PointerHead      int                 //latest/highest Messages key/ID.
	// contains filtered or unexported fields
}

Topic is the setup for topics

type TopicResp

type TopicResp struct {
	Error       string `json:"error,omitempty"`
	Topic       string `json:"topic_name"`
	Status      string `json:"status"`
	Creator     string `json:"creator"`
	PointerHead int    `json:"pointer_head"`
	//CanWrite shows if requester User can write to the topic (userID
	// matches topic.Creator.ID)
	CanWrite bool `json:"writable"`
}

TopicResp is the response form for Topic orientated requests

type Topics

type Topics map[string]*Topic

Topics is a map of topics with key as topic name

type Underwriter

type Underwriter struct {
	*PersistCore
	// contains filtered or unexported fields
}

Underwriter is the default implementation of Persist

It stores all needed persisted files in the ./store directory

func NewUnderwriter

func NewUnderwriter(pubsub *PubSub) (*Underwriter, error)

NewUnderwriter creates a new Underwriter instance that implements Persist

func (*Underwriter) DeleteMessage

func (uw *Underwriter) DeleteMessage() error

DeleteMessage accepts messageID and topicName

func (*Underwriter) DeleteSubscriber

func (uw *Underwriter) DeleteSubscriber() error

DeleteSubscriber accepts subscriberID (the userID of the subscription), messageID and topicName

Add messageID as -1 if not available. Func will they cycle through the topic and delete matches to subscriberID

func (*Underwriter) DeleteUser

func (uw *Underwriter) DeleteUser() error

DeleteUser accepts UserID which is the userhash string

func (*Underwriter) GetMessage

func (uw *Underwriter) GetMessage(messageID int, topicName string) (Message, error)

GetMessage returns a single message by messageID and topicName

func (*Underwriter) GetSubscriber

func (uw *Underwriter) GetSubscriber(subscriberID string, messageID int, topicName string) (Subscriber, error)

GetSubscriberet returns a single subscriber by subcriberID (which is also the userID attachted to the subscriber),messageID and topicName

func (*Underwriter) GetUser

func (uw *Underwriter) GetUser(userID string) (User, error)

GetUseret returns a single user by userID string (Which is also UsernameHash of the user)

func (*Underwriter) Launch

func (uw *Underwriter) Launch() error

Launch spins up all goroutines required to stream writes and deletes

func (*Underwriter) StreamMessages

func (uw *Underwriter) StreamMessages() (chan Streamer, error)

StreamMessages returns a chan through which it streams all Messages from the db

func (*Underwriter) StreamSubscribers

func (uw *Underwriter) StreamSubscribers() (chan Streamer, error)

StreamSubscribers returns a chan through which it streams all Subscribers from the db

func (*Underwriter) StreamUsers

func (uw *Underwriter) StreamUsers() (chan Streamer, error)

StreamUsers returns a chan through which it streams all Users from the db

func (*Underwriter) Switchboard

func (uw *Underwriter) Switchboard() PersistCore

Switchboard returns the PersistCore field of the underlying Underwriter object to make access to channels to goroutines simpler

func (*Underwriter) TidyUp

func (uw *Underwriter) TidyUp() error

TidyUp cleans up database connections before close. Must run after NewUnderwriter call as defer Persist.TidyUp()

func (*Underwriter) WriteMessage

func (uw *Underwriter) WriteMessage() error

WriteMessage adds a message to the persistence layer

func (*Underwriter) WriteSubscriber

func (uw *Underwriter) WriteSubscriber() error

WriteSubscriber adds a subscriber to the persistence layer

func (*Underwriter) WriteUser

func (uw *Underwriter) WriteUser() error

WriteUser adds a user to the persistence layer

type User

type User struct {
	UUID          string //hash of Username+Password
	UsernameHash  string
	PasswordHash  string
	Subscriptions map[string]string //Topic Names key against pushURL
	Created       string            //Created is date user was created
	// contains filtered or unexported fields
}

User is the struct of a user able to make a subscription

func (*User) AddCreatedDatestring

func (user *User) AddCreatedDatestring(time.Time) string

AddCreatedDatestring adds the given time to the message Created field as a formatted string

func (User) GetCreatedDateTime

func (user User) GetCreatedDateTime() (time.Time, error)

GetCreatedDateTime fetches the created datetime string and parses it

func (*User) PullMessage

func (user *User) PullMessage(topic *Topic, messageID int) (Message, error)

PullMessage retrieves a message from the Topic message queue if the user is subscibed

func (*User) Subscribe

func (user *User) Subscribe(topic *Topic, pushURL string) error

Subscribe method subscribes the user to the given topic using the given pushURL. If no pushURL, subscription is pull type using the topic ID.

func (*User) Unsubscribe

func (user *User) Unsubscribe(topic *Topic) error

Unsubscribe helper function to unsubscribe a user from a topic

func (*User) WriteToTopic

func (user *User) WriteToTopic(topic *Topic, message Message) (Message, error)

WriteToTopic manages the user writing to a topic it is a creator of

type Users

type Users map[string]*User

Users is a map of User by "UsernameHash" key with value of User

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL