Documentation ¶
Index ¶
- Variables
- func Etag(data []byte) string
- func Get(url *url.URL, timeout time.Duration) (reader io.Reader, header http.Header, status int, length int64, err error)
- func ServeStreamError(writer http.ResponseWriter, status int)
- type AccessController
- type Client
- type Command
- type Connection
- type ConnectionBroker
- type ConnectionRequest
- type Proxy
- type StateManager
- type Streamer
- func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Request)
- func (streamer *Streamer) SetCollector(stats metrics.Collector)
- func (streamer *Streamer) SetInhibit(inhibit bool)
- func (streamer *Streamer) SetNotifier(events event.Notifiable)
- func (streamer *Streamer) SetPreamble(preamble []byte)
- func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidProtocol is thrown when an invalid protocol was specified. // See the docs and example config for a list of supported protocols. ErrInvalidProtocol = errors.New("restreamer: unsupported protocol") // ErrNoConnection is thrown when trying to read // from a stream that is not connected ErrNoConnection = errors.New("restreamer: socket not connected") // ErrAlreadyConnected is thrown when trying to // connect to an already established upstream socket ErrAlreadyConnected = errors.New("restreamer: socket is already connected") // ErrInvalidResponse is thrown when an unsupported // HTTP response code was received ErrInvalidResponse = errors.New("restreamer: unsupported response code") // ErrNoUrl is thrown when the list of upstream URLs was empty ErrNoUrl = errors.New("restreamer: no parseable upstream URL") )
var ( ErrNoLength = errors.New("restreamer: Fetching of remote resource with unknown length not supported") ErrLimitExceeded = errors.New("restreamer: Resource too large for cache") ErrShortRead = errors.New("restreamer: Short read, not all data was transferred in one go") )
var ( // ErrAlreadyRunning is thrown when trying to connect a stream that is already online. ErrAlreadyRunning = errors.New("restreamer: service is already active") // ErrNotRunning is thrown trying to shut down a stopped stream. ErrNotRunning = errors.New("restreamer: service is not running") // ErrOffline is thrown when receiving a connection while the stream is offline ErrOffline = errors.New("restreamer: refusing connection on an offline stream") // ErrSlowRead is logged (not thrown) when a client can not handle the bandwidth. ErrSlowRead = errors.New("restreamer: send buffer overrun, increase client bandwidth") // ErrPoolFull is logged when the connection pool is full. ErrPoolFull = errors.New("restreamer: maximum number of active connections exceeded") )
Functions ¶
func Etag ¶
Etag calculates a hash value of data and returns it as a hex string. Suitable for HTTP Etags.
func Get ¶
func Get(url *url.URL, timeout time.Duration) (reader io.Reader, header http.Header, status int, length int64, err error)
Get opens a remote or local resource specified by URL and returns a reader, upstream HTTP headers, an HTTP status code and the resource data length, or -1 if no length is available. Local resources contain guessed data. Supported protocols: file, http and https.
func ServeStreamError ¶
func ServeStreamError(writer http.ResponseWriter, status int)
ServeStreamError returns an appropriate error response to the client.
Types ¶
type AccessController ¶
type AccessController struct {
// contains filtered or unexported fields
}
AccessController implements a connection broker that limits the maximum number of concurrent connections.
func NewAccessController ¶
func NewAccessController(maxconnections uint) *AccessController
NewAccessController creates a connection broker object that handles access control according to the number of connected clients.
func (*AccessController) Accept ¶
func (control *AccessController) Accept(remoteaddr string, streamer *Streamer) bool
Accept accepts an incoming connection when the maximum number of open connections has not been reached yet.
func (*AccessController) Release ¶
func (control *AccessController) Release(streamer *Streamer)
Release decrements the open connections count.
func (*AccessController) SetInhibit ¶ added in v0.6.0
func (control *AccessController) SetInhibit(inhibit bool)
SetInhibit allows setting and clearing the inhibit flag. If it is set, no further connections are accepted, irrespective of the maxconnections limit.
type Client ¶
type Client struct { // Wait is the time before reconnecting a disconnected upstream. // This is a deadline: If a connection (or connection attempt) takes longer // than this duration, a reconnection is attempted immediately. Wait time.Duration // ReadTimeout is the timeout for individual packet reads ReadTimeout time.Duration // contains filtered or unexported fields }
Client implements a streaming HTTP client with failover support.
Logging specification:
{ "time": 1234 | unix timestamp in UTCS, "module": "client", "event": "" | error or upstream-connect or upstream-disconnect or upstream-loss or upstream-timeout or upstream-offline or client-streaming or client-stopped, when event=error: "error": "error-name" "error-specific key": "error-specific data" when event=retry: "retry: "99999" | seconds until retry when event=upstream-*: "url": "http://upstream/url" | upstream stream URL, when event=client-*: "client": "1.2.3.4:12" | client ip:port, }
func NewClient ¶
func NewClient(name string, uris []string, streamer *Streamer, timeout uint, reconnect uint, readtimeout uint, qsize uint, intf string, bufferSize uint, packetSize uint) (*Client, error)
NewClient constructs a new streaming HTTP client, without connecting the socket yet. You need to call Connect() to do that.
After a connection has been closed, the client will attempt to reconnect after a configurable delay. This delay is cumulative; if a connection has been up for longer, a reconnect will be attempted immediately.
Arguments:
name: a unique name for this streaming client, used for metrics and logging uris: a list of upstream URIs, used in random order queue: the outgoing packet queue timeout: the connect timeout reconnect: the minimal reconnect delay readtimeout: the read timeout qsize: the input queue size intf: the network interface to create multicast connections on bufferSize: the UDP socket receive buffer size packetSize: the UDP packet size
func (*Client) Close ¶
Close closes the active upstream connection.
This will cause the streaming thread to fail and try to reestablish a connection (unless reconnects are disabled).
func (*Client) Connect ¶
func (client *Client) Connect()
Connect spawns the connection loop.
Do not call this method multiple times!
func (*Client) SetCollector ¶
SetCollector assigns a stats collector.
func (*Client) SetInhibit ¶ added in v0.6.0
SetInhibit calls the SetInhibit function on the attached streamer.
func (*Client) Status ¶
Status returns the HTTP status message, or the empty string if not connected.
func (*Client) StatusCode ¶
StatusCode returns the HTTP status code, or 0 if not connected.
type Command ¶
type Command int
Command is one of several possible constants. See StreamerCommandAdd for more information.
const ( // StreamerCommandAdd signals a stream to add a connection. StreamerCommandAdd Command // StreamerCommandRemove signals a stream to remove a connection. StreamerCommandRemove // StreamerCommandInhibit signals that all connections should be closed // and not further connections should be allowed StreamerCommandInhibit // StreamerCommandAllow signals that new connections should be allowed StreamerCommandAllow )
type Connection ¶
type Connection struct { // Queue is the per-connection packet queue Queue chan protocol.MpegTsPacket // ClientAddress is the remote client address ClientAddress string // Closed is true if Serve was ended because of a closed channel. // This is simply there to avoid a double close. Closed bool // contains filtered or unexported fields }
Connection is a single active client connection.
This is meant to be called directly from a ServeHTTP handler. No separate thread is created.
func NewConnection ¶
func NewConnection(destination http.ResponseWriter, qsize int, clientaddr string, ctx context.Context) *Connection
NewConnection creates a new connection object. To start sending data to a client, call Serve().
clientaddr should point to the remote address of the connecting client and will be used for logging.
func (*Connection) Serve ¶
func (conn *Connection) Serve(preamble []byte)
Serve starts serving data to a client, continuously feeding packets from the queue. An optional preamble buffer can be passed that will be sent before streaming the live payload (but after the HTTP response headers).
type ConnectionBroker ¶
type ConnectionBroker interface { // Accept will be called on each incoming connection, // with the remote client address and the streamer that wants to accept the connection. Accept(remoteaddr string, streamer *Streamer) bool // Release will be called each time a client disconnects. // The streamer argument corresponds to a streamer that has previously called Accept(). Release(streamer *Streamer) }
ConnectionBroker represents a policy handler for new connections. It is used to determine if new connections can be accepted, based on arbitrary rules.
type ConnectionRequest ¶
type ConnectionRequest struct { // Command is the command to execute Command Command // Address is the remote client address Address string // Connection is the connection to add (if this is an Add command) Connection *Connection // Waiter is a WaitGroup that can be used to track handling of the connection // in the streaming thread. If it is non-nil, the streamer will signal // Done once the request has been handled. Waiter *sync.WaitGroup // Ok tells the caller if a connection was handled without error. // You should always wait on the Waiter before checking it. Ok bool }
ConnectionRequest encapsulates a request that new connection be added or removed.
type Proxy ¶
type Proxy struct {
// contains filtered or unexported fields
}
Proxy implements a caching HTTP proxy.
func NewProxy ¶
NewProxy constructs a new HTTP proxy. The upstream resource is not fetched until the first request. If cache is non-zero, the resource will be evicted from memory after these number of seconds. If it is zero, the resource will be fetched from upstream every time it is requested. timeout sets the upstream HTTP connection timeout.
func (*Proxy) ServeHTTP ¶
func (proxy *Proxy) ServeHTTP(writer http.ResponseWriter, request *http.Request)
ServeHTTP handles an incoming connection. Satisfies the http.Handler interface, so it can be used in an HTTP server.
func (*Proxy) SetStatistics ¶
func (proxy *Proxy) SetStatistics(stats metrics.Statistics)
SetStatistics assigns a stats collector.
type StateManager ¶
type StateManager struct {
// contains filtered or unexported fields
}
StateManager maintains a list of disconnectable objects, sending them a notification whenever state changes.
After connection closure has been notified, the list is cleared and further notifications have no effect.
func NewStateManager ¶
func NewStateManager() *StateManager
NewStateManager creates a new state manager.
Register notification channels with Register(), and submit state changes with Notify(). Channels can be removed later with Unregister(). After notify has been called, the list of registered channels is cleared.
func (*StateManager) Notify ¶
func (manager *StateManager) Notify()
Notify sends a state change to all registered notification channels and clears the list.
func (*StateManager) Register ¶
func (manager *StateManager) Register(channel chan<- bool)
Register registers a new notification channel.
It is not possible to register a channel twice. Any additional registrations will be ignored.
func (*StateManager) Unregister ¶
func (manager *StateManager) Unregister(channel chan<- bool)
Unregister removes a registered channel.
If this channel was not registered previously, no action is taken.
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer implements a TS packet multiplier, distributing received packets on the input queue to the output queues. It also handles and manages HTTP connections when added to an HTTP server.
func NewStreamer ¶
func NewStreamer(name string, qsize uint, broker ConnectionBroker, auth auth.Authenticator) *Streamer
NewStreamer creates a new packet streamer. queue is an input packet queue. qsize is the length of each connection's queue (in packets). broker handles policy enforcement stats is a statistics collector object.
func (*Streamer) ServeHTTP ¶
func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Request)
ServeHTTP handles an incoming HTTP connection. Satisfies the http.Handler interface, so it can be used in an HTTP server.
func (*Streamer) SetCollector ¶
SetCollector assigns a stats collector
func (*Streamer) SetInhibit ¶ added in v0.6.0
func (*Streamer) SetNotifier ¶ added in v0.5.0
func (streamer *Streamer) SetNotifier(events event.Notifiable)
SetNotifier assigns an event notifier
func (*Streamer) SetPreamble ¶ added in v0.10.0
func (*Streamer) Stream ¶
func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error
Stream is the main stream multiplier loop. It reads data from the input queue and distributes it to the connections.
This routine will block; you should run it asynchronously like this:
queue := make(chan protocol.MpegTsPacket, inputQueueSize)
go func() { log.Fatal(streamer.Stream(queue)) }
or simply:
go streamer.Stream(queue)