broadcaster

package
v1.9.68 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2023 License: BSD-3-Clause Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AllEventsFilter

type AllEventsFilter struct {
	Filter
}

An AllEventsFilter delivers all messages to a subscriber, provided that the subscriber is an administrator.

func (*AllEventsFilter) Matches

func (f *AllEventsFilter) Matches(msg *Message, subscriber *Subscriber) bool

Matches returns whether the current AllEventsFilter matches the provided subscriber.

func (*AllEventsFilter) String

func (f *AllEventsFilter) String() string

type Authorization added in v1.9.12

type Authorization struct {
	APIToken string
	Cookie   string
}

Authorization is a structure that contains the possible authentication mechanisms for the frontend. APIToken takes precedence.

type Broadcaster

type Broadcaster struct {
	// contains filtered or unexported fields
}

A Broadcaster can send messages to Subscribers.

func NewBroadcaster

func NewBroadcaster(ctx *common.Context, metrics Metrics) *Broadcaster

NewBroadcaster returns a new Broadcaster.

func (*Broadcaster) Broadcast

func (b *Broadcaster) Broadcast(message *Message) bool

Broadcast delivers the provided message to all matching Subscribers.

func (*Broadcaster) Deauthenticate

func (b *Broadcaster) Deauthenticate(user string)

Deauthenticate removes all the Subscribers belonging to the provided user from the Broadcaster.

func (*Broadcaster) Run

func (b *Broadcaster) Run()

Run is the main Broadcaster loop. It listens for subscribe/unsubscribe/deauth events to manage the Subscribers, as well as new incoming messages that will be sent to all matching Subscribers.

func (*Broadcaster) Subscribe

func (b *Broadcaster) Subscribe(subscriber *Subscriber) bool

Subscribe adds one subscriber to the Broadcaster.

func (*Broadcaster) Unsubscribe

func (b *Broadcaster) Unsubscribe(subscriber *Subscriber) bool

Unsubscribe removes one subscriber from the Broadcaster.

type ContestFilter

type ContestFilter struct {
	Filter
	// contains filtered or unexported fields
}

A ContestFilter is a Filter that only allows Messages that are associated with a particular contest.

func (*ContestFilter) Matches

func (f *ContestFilter) Matches(msg *Message, subscriber *Subscriber) bool

Matches returns whether the current ContestFilter matches the provided message/subscriber combination.

func (*ContestFilter) String

func (f *ContestFilter) String() string

type Filter

type Filter interface {
	String() string
	Matches(msg *Message, subscriber *Subscriber) bool
}

Filter is used to determine whether messages should be sent to a particular subscriber.

func NewFilter

func NewFilter(filter string) (Filter, error)

NewFilter parses the provided filter stirng and constructs a new Filter instance.

type Message

type Message struct {
	Contest    string `json:"contest,omitempty"`
	Problemset int64  `json:"problemset,omitempty"`
	Problem    string `json:"problem,omitempty"`
	User       string `json:"user,omitempty"`
	Public     bool   `json:"public"`
	Message    string `json:"message"`
}

A Message is a message that will be broadcast to Subscribers.

type Metrics

type Metrics interface {
	IncrementWebSocketsCount(delta int)
	IncrementSSECount(delta int)
	IncrementMessagesCount()
	IncrementChannelDropCount()
	ObserveDispatchMessageLatency(latency time.Duration)
	ObserveProcessMessageLatency(latency time.Duration)
}

Metrics is the interface needed to publish performance metrics.

type ProblemFilter

type ProblemFilter struct {
	Filter
	// contains filtered or unexported fields
}

A ProblemFilter is a Filter that only allows Messages that are associated with a particular problem.

func (*ProblemFilter) Matches

func (f *ProblemFilter) Matches(msg *Message, subscriber *Subscriber) bool

Matches returns whether the current ProblemFilter matches the provided message/subscriber combination.

func (*ProblemFilter) String

func (f *ProblemFilter) String() string

type ProblemsetFilter added in v1.1.0

type ProblemsetFilter struct {
	Filter
	// contains filtered or unexported fields
}

A ProblemsetFilter is a Filter that only allows Messages that are associated with a particular problemset.

func (*ProblemsetFilter) Matches added in v1.1.0

func (f *ProblemsetFilter) Matches(msg *Message, subscriber *Subscriber) bool

Matches returns whether the current ProblemsetFilter matches the provided message/subscriber combination.

func (*ProblemsetFilter) String added in v1.1.0

func (f *ProblemsetFilter) String() string

type QueuedMessage

type QueuedMessage struct {
	// contains filtered or unexported fields
}

A QueuedMessage is a message that may be broadcast to relevant Subscribers. It also performs latency analysis.

func (*QueuedMessage) Dispatched

func (m *QueuedMessage) Dispatched()

Dispatched signals that this message has been dispatched. It is used to perform latency analysis.

func (*QueuedMessage) Processed

func (m *QueuedMessage) Processed()

Processed signals that this message has been processed and has been enqueued in all the relevant Subscribers' queues.

type SSETransport

type SSETransport struct {
	// contains filtered or unexported fields
}

A SSETransport is a Transport that uses Server-Side Events to deliver events.

func (*SSETransport) Close

func (t *SSETransport) Close()

Close signals the close channel.

func (*SSETransport) Init

func (t *SSETransport) Init(close chan<- struct{})

Init sends the necessary headers to signal that this is a SSE response.

func (*SSETransport) Ping

func (t *SSETransport) Ping() error

Ping sends an empty message to prevent the underlying connection from being closed due to inactivity.

func (*SSETransport) ReadLoop

func (t *SSETransport) ReadLoop()

ReadLoop waits until the underlying connection is closed, since the subscriber cannot send anything to us.

func (*SSETransport) Send

func (t *SSETransport) Send(message *QueuedMessage) error

Send sends the provided message.

func (*SSETransport) String

func (t *SSETransport) String() string

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

A Subscriber represents a user that wishes to receive broadcast notifications.

func NewSubscriber

func NewSubscriber(
	ctx *common.Context,
	client *http.Client,
	requestURL *url.URL,
	auth Authorization,
	filterString string,
	transport Transport,
) (*Subscriber, error)

NewSubscriber creates a new Subscriber.

func (*Subscriber) Matches

func (s *Subscriber) Matches(msg *Message) bool

Matches returns whether the provided message should be sent to the current subscriber.

func (*Subscriber) Run

func (s *Subscriber) Run()

Run loops waiting for one of three events to happen: connection closure, a new message is ready to be delivered to this subscriber, and periodic ping ticks.

func (*Subscriber) Send

func (s *Subscriber) Send() chan<- *QueuedMessage

Send returns the send channel, where messages can be added to.

type Transport

type Transport interface {
	fmt.Stringer

	// Init performs any initialization and sets a channel that will be closed by
	// the Transport if the connection is closed.
	Init(close chan<- struct{})

	// Close writes the WebSockets Close message to notify the subscriber that
	// the socket is going away.
	Close()

	// Ping sends an empty message to the subscriber, to ensure that the
	// underlying channel is not closed due to inactivity.
	Ping() error

	// ReadLoop consumes any input sent by the subscriber.
	ReadLoop()

	// Send sends a message to the subscriber.
	Send(message *QueuedMessage) error
}

A Transport exposes the interface needed to broadcast events to a subscriber.

func NewSSETransport

func NewSSETransport(w http.ResponseWriter) Transport

NewSSETransport creates a new SSETransport for the provided ResponseWriter.

func NewWebSocketTransport

func NewWebSocketTransport(
	sock *websocket.Conn,
	writeDeadlineDelay time.Duration,
) Transport

NewWebSocketTransport creates a new WebSocketTransport for the provided websocket.

type UpstreamError

type UpstreamError struct {
	HTTPStatusCode int
	Contents       []byte
}

An UpstreamError represents an error that comes from the frontend.

func (*UpstreamError) Error

func (e *UpstreamError) Error() string

type UserFilter

type UserFilter struct {
	Filter
	// contains filtered or unexported fields
}

A UserFilter is a Filter that only allows Messages that are associated with a particular user.

func (*UserFilter) Matches

func (f *UserFilter) Matches(msg *Message, subscriber *Subscriber) bool

Matches returns whether the current UserFilter matches the provided message/subscriber combination.

func (*UserFilter) String

func (f *UserFilter) String() string

type ValidateFilterResponse

type ValidateFilterResponse struct {
	User            string   `json:"user"`
	Admin           bool     `json:"admin"`
	ProblemAdmin    []string `json:"problem_admin"`
	ContestAdmin    []string `json:"contest_admin"`
	ProblemsetAdmin []int64  `json:"problemset_admin"`
}

A ValidateFilterResponse holds the results of a Validate request.

type WebSocketTransport

type WebSocketTransport struct {
	// contains filtered or unexported fields
}

A WebSocketTransport is a transport that uses WebSockets to deliver events.

func (*WebSocketTransport) Close

func (t *WebSocketTransport) Close()

Close writes the WebSockets Close message to notify the subscriber that the socket is going away. It does not actually signals the close channel, since that will happen as soon as we receive the Close message from the subscriber.

func (*WebSocketTransport) Init

func (t *WebSocketTransport) Init(close chan<- struct{})

Init sets a channel that will be closed by the Transport if the connection is closed.

func (*WebSocketTransport) Ping

func (t *WebSocketTransport) Ping() error

Ping sends a WebSockets Ping message to prevent the underlying socket from being closed due to inactivity.

func (*WebSocketTransport) ReadLoop

func (t *WebSocketTransport) ReadLoop()

ReadLoop reads (and discards) all incoming messages.

func (*WebSocketTransport) Send

func (t *WebSocketTransport) Send(message *QueuedMessage) error

Send sends the provided message.

func (*WebSocketTransport) String

func (t *WebSocketTransport) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL