webpush

package
v0.0.0-...-8ac4b46 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: Apache-2.0 Imports: 17 Imported by: 1

README

Webpush Messaging

Goals

  • support push and pubsub using authenticated, e2e encrypted messages
    • for data plane, abstracting other pubsub mechanisms or providing minimal storage-less delivery
    • for control plane, for distributing config without maintaining long-lived connections.
    • interop with Istio/XDS, CNCF, pubsub, etc

The main design idea is that in a 'mesh' or VPC, each workload (Pod, VM) that subscribes to a topic is directly reachable.

This is effectively the same as Webhooks - but using Webpush encryption and auth for messages that cross multiple hops.

A per-node agent - similar to Istio Ambient - can handle the crypto and security and directly send messages to the pod, with network policy restricting the port.

API

  • like CNCF Eventing, Webhooks, CloudRun pubsub - the 'core' is based on HTTP interface.
  • messages are http requests with encrypted body and JWT auth (VAPID)
  • message 'envelope' - headers, url is not encrypted, treated as a request

Receiving messages

  • each subscriber (client) implements the regular push protocol, acting as Webpush server
  • it is assumed that some infrastructure or transport can handle hanging GET equivalent, outside of the scope of this package. The requirement is that messages are injected as if the broker/infra made a HTTP call to the subscriber.

Topics

The topic is represented as a regular URL. Publishing is equivalent with posting to the URL.

URL format: /msg/TOPIC Subscribe (pull long lived connections): /sub/SUBID Upstream brokers or push clients: list of URLs with same /msg/TOPIC

Unlike HTTP, multiple handlers can be registered on a topic.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultMux = NewMux()
View Source
var ReceiveBaseUrl = "https://127.0.0.1:5228/"
View Source
var SharedWPAuth = []byte{1}

Functions

func InitMux

func InitMux(mux *Mux, hmux *http.ServeMux, auth *meshauth.MeshAuth)

Init a mux with a http.Transport Normlly there is a single mux in a server - multiple mux are used for testing.

func NewRequest

func NewRequest(dest string, key, authK []byte,
	message string, ttlSec int, vapid *meshauth.MeshAuth) (*http.Request, error)

NewVapidRequest creates a valid Web Push HTTP request for sending a message to a subscriber, using Vapid authentication.

You can add more headers to configure collapsing, TTL.

Types

type Backoff

type Backoff interface {
	BackoffSleep()
	BackoffReset()
}

type HandlerCallbackFunc

type HandlerCallbackFunc func(ctx context.Context, cmdS string, meta map[string]string, data []byte)

Adapter from func to interface

func (HandlerCallbackFunc) HandleMessage

func (f HandlerCallbackFunc) HandleMessage(ctx context.Context, cmdS string, meta map[string]string, data []byte)

ServeHTTP calls f(w, r).

type Message

type Message struct {
	MessageData

	// VIPs in the path
	Path []string `json:"path,omitempty"`

	// JSON-serializable payload.
	// Interface means will be serialized as base64 if []byte, as String if string or actual Json without encouding
	// otherwise.
	Data interface{} `json:"data,omitempty"`

	// If received from a remote, the connection it was received on.
	// nil if generated locally
	Connection *MsgConnection `json:"-"`
}

Records recent received messages and broadcasts, for debug and UI

func NewMessage

func NewMessage(cmdS string, meta map[string]string) *Message

NewMessage creates a new message, originated locally

func ParseJSON

func ParseJSON(data []byte) *Message

func (*Message) Binary

func (ev *Message) Binary() []byte

Return a binary representation of the data: either the []byte for raw data, or the marshalled json starting with {.

func (*Message) MarshalJSON

func (ev *Message) MarshalJSON() []byte

func (*Message) SetDataJSON

func (ev *Message) SetDataJSON(data interface{}) *Message

type MessageData

type MessageData struct {
	Id    string
	To    string
	Meta  map[string]string
	From  string
	Topic string

	Time int64
}

type MessageHandler

type MessageHandler interface {
	// Handle a message. Context may provide access to the actual message object
	// and mux.
	HandleMessage(ctx context.Context, cmdS string, meta map[string]string, data []byte)
}

Local processing of messages. Interface doesn't use any specific struct, to avoid creating deps.

type MsgConnection

type MsgConnection struct {

	// Key used in mux to track this connection
	Name string

	// Authenticated Vip associated with the connection. Messages will not be forwarded if
	// the VIP is in Path or From of the message.
	VIP string

	// Broadcast subscriptions to forward to the remote. Will have a 'From' set to current node.
	// VPN and upstream server use "*" to receive/pass up all events.
	// TODO: keep some messages local, by using To=., and indicate broadcasts as *.
	SubscriptionsToSend []string

	// OnMessage is called when a message for this connection is dispatched.
	// The message should be either a broadcast, have as To the vip of the connection or
	// another vip reachable from the connection.
	//
	// The topic of the message should be in the Subscription list if the destination is this vip.
	//
	// Internal handlers may use the same interface.
	SendMessageToRemote func(ev *Message) error

	Conn net.Conn
	// contains filtered or unexported fields
}

One connection - incoming or outgoing. Can send messages to the remote end, which may in turn forward messages for other nodes.

Incoming messages are dispatched to the mux, which may deliver locally or forward.

func (*MsgConnection) Close

func (mc *MsgConnection) Close()

type Mux

type Mux struct {

	// Allows regular HTTP Handlers to process messages.
	// A message is mapped to a request. Like CloudEvents, response from the
	// http request can be mapped to a Send (not supported yet).
	ServeMux *http.ServeMux

	// Auth holds the private key and muxID of this node. Used to encrypt and decrypt.
	Auth *meshauth.MeshAuth
	// contains filtered or unexported fields
}

Mux handles processing messages for this node, and sending messages from local code.

func NewMux

func NewMux() *Mux

func (*Mux) AddConnection

func (mux *Mux) AddConnection(id string, cp *MsgConnection)

id - remote id. "uds" for the primary upstream uds connection to host (android app or wifi/root app)

func (*Mux) AddHandler

func (mux *Mux) AddHandler(path string, cp MessageHandler)

StartListener a local handler for a specific message type. Special topics: *, /open, /close

func (*Mux) HTTPHandlerSend

func (mux *Mux) HTTPHandlerSend(w http.ResponseWriter, r *http.Request)

HTTPHandlerSend can send regular messages. Can be exposed without auth on 127.0.0.1, or use normal auth.

Mapped to /s/[DESTID]/topic?...

q or path can be used to pass command. Body and query string are sent. TODO: compatibility with cloud events and webpush TODO: RBAC (including admin check for system notifications)

func (*Mux) HTTPHandlerWebpush

func (mux *Mux) HTTPHandlerWebpush(w http.ResponseWriter, r *http.Request)

Webpush handler - on /push[/VIP]/topic[?params], on the HTTPS handler

Auth: VAPID or client cert - results in VIP of sender

func (*Mux) HandleMessageForNode

func (mux *Mux) HandleMessageForNode(ev *Message) error

Called for local events (host==. or empty). Called when a message is received from one of the local streams ( UDS, etc ), if the final destination is the current node.

Message will be passed to one or more of the local handlers, based on type.

func (*Mux) RemoveConnection

func (mux *Mux) RemoveConnection(id string, cp *MsgConnection)

func (*Mux) Send

func (mux *Mux) Send(msgType string, data interface{}, meta ...string) error

Send a message to the default mux. Will serialize the event and save it for debugging.

Local handlers and debug tools/admin can subscribe. Calls the internal SendMessage method.

func (*Mux) SendMessage

func (mux *Mux) SendMessage(ev *Message) error

Publish a message. Will be distributed to local and remote listeners.

TODO: routing for directed messages (to specific destination) TODO: up/down indication for multicast, subscription

func (*Mux) SendMeta

func (mux *Mux) SendMeta(msgType string, meta map[string]string, data interface{}) error

Send a message to the default mux. Will serialize the event and save it for debugging.

Local handlers and debug tools/admin can subscribe. Calls the internal SendMessage method.

func (*Mux) SubscribeHandler

func (mux *Mux) SubscribeHandler(res http.ResponseWriter, req *http.Request)

Subscribe creates a subscription. Initial version is just a random - some interface will be added later, to allow sets.

type UA

type UA struct {
	// URL of the subscribe for the push service
	PushService string
}

UA represents a "user agent" - or client using the webpush protocol

func (*UA) Subscribe

func (ua *UA) Subscribe() (sub *meshauth.Subscription, err error)

Create a subscription, using the Webpush standard protocol.

URL is "/subscribe", no header required ( but passing a VAPID or mtls), response in 'location' for read and Link for sub endpoint.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL