router

package
v0.0.0-...-83e654d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2017 License: MIT Imports: 17 Imported by: 16

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrServiceNotProvided is returned when the service required is not set.
	ErrServiceNotProvided = errors.New("Service not provided.")

	// ErrInvalidRoute is returned by the `Deliver` method of a `Route` when it has been closed
	// due to slow processing
	ErrInvalidRoute = errors.New("Route is invalid. Channel is closed.")

	// ErrChannelFull is returned when trying to `Deliver` a message with a queue size of zero
	// and the channel is full
	ErrChannelFull = errors.New("Route channel is full. Route is closed.")

	// ErrQueueFull is returned when trying to `Deliver` a message in a full queued route
	ErrQueueFull = errors.New("Route queue is full. Route is closed.")
)
View Source
var (
	ErrMissingFetchRequest = errors.New("Missing FetchRequest configuration.")
)

Functions

This section is empty.

Types

type Matcher

type Matcher func(RouteConfig, RouteConfig, ...string) bool

Matcher is a func type that receives two route configurations pointers as parameters and returns true if the routes are matching

type ModuleStoppingError

type ModuleStoppingError struct {
	Name string
}

ModuleStoppingError is returned when the module is stopping

func (*ModuleStoppingError) Error

func (m *ModuleStoppingError) Error() string

type PermissionDeniedError

type PermissionDeniedError struct {

	// userId of request
	UserID string

	// accessType  requested(READ/WRITE)
	AccessType auth.AccessType

	// requested topic
	Path protocol.Path
}

PermissionDeniedError is returned when AccessManager denies a user request for a topic

func (*PermissionDeniedError) Error

func (e *PermissionDeniedError) Error() string

type Route

type Route struct {
	RouteConfig
	// contains filtered or unexported fields
}

Route represents a topic for subscription that has a channel to receive messages.

func NewRoute

func NewRoute(config RouteConfig) *Route

NewRoute creates a new route pointer

func (*Route) Close

func (r *Route) Close() error

Close closes the route channel.

func (*Route) Deliver

func (r *Route) Deliver(msg *protocol.Message, isFromStore bool) error

Deliver takes a messages and adds it to the queue to be delivered into the channel isFromStore boolean specifies if the messages are being fetched or are from the router In case they are fetched from the store the route won't close if it's full

func (*Route) Equal

func (r *Route) Equal(other *Route, keys ...string) bool

Equal will check if the route path is matched and all the parameters or just a subset of specific parameters between the routes

func (*Route) Key

func (r *Route) Key() string

Key returns a string that uniquely identifies the route by concatenating the route Path and the route params Example:

/topic user_id:user1 application_id:app1

func (*Route) MessagesChannel

func (r *Route) MessagesChannel() <-chan *protocol.Message

MessagesChannel returns the route channel to send or receive messages.

func (*Route) Provide

func (r *Route) Provide(router Router, subscribe bool) error

Provide accepts a router to use for fetching/subscribing and a boolean indicating if it should close the route after fetching without subscribing The method is blocking until fetch is finished or route is subscribed

func (*Route) String

func (r *Route) String() string

type RouteConfig

type RouteConfig struct {
	RouteParams

	Path protocol.Path

	ChannelSize int

	// Matcher if set will be used to check equality of the routes
	Matcher Matcher `json:"-"`

	// FetchRequest to fetch messages before subscribing
	// The Partition field of the FetchRequest is overrided with the Partition of the Route topic
	FetchRequest *store.FetchRequest `json:"-"`
	// contains filtered or unexported fields
}

func (*RouteConfig) Equal

func (rc *RouteConfig) Equal(other RouteConfig, keys ...string) bool

func (*RouteConfig) Filter

func (rc *RouteConfig) Filter(filters map[string]string) bool

Filter returns true if all filters are matched on the route

type RouteParams

type RouteParams map[string]string

func (*RouteParams) Copy

func (rp *RouteParams) Copy() RouteParams

func (*RouteParams) Equal

func (rp *RouteParams) Equal(other RouteParams, keys ...string) bool

Equal verifies if the `receiver` params are the same as `other` params. The `keys` param specifies which keys to check in case the match has to be done only on a separate set of keys and not on all keys.

func (*RouteParams) Get

func (rp *RouteParams) Get(key string) string

func (*RouteParams) Key

func (rp *RouteParams) Key() string

func (*RouteParams) Set

func (rp *RouteParams) Set(key, value string)

func (*RouteParams) String

func (rp *RouteParams) String() string

type Router

type Router interface {
	Subscribe(r *Route) (*Route, error)
	Unsubscribe(r *Route)
	HandleMessage(message *protocol.Message) error
	Fetch(*store.FetchRequest) error
	GetSubscribers(topic string) ([]byte, error)

	AccessManager() (auth.AccessManager, error)
	MessageStore() (store.MessageStore, error)
	KVStore() (kvstore.KVStore, error)
	Cluster() *cluster.Cluster

	Done() <-chan bool
}

Router interface provides a mechanism for PubSub messaging

func New

func New(accessManager auth.AccessManager, messageStore store.MessageStore, kvStore kvstore.KVStore, cluster *cluster.Cluster) Router

New returns a pointer to Router

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL