handler

package
v0.99.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2024 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultSessionID = "default"

Variables

View Source
var BufferWaitTime = time.Second * 5

the amount of time to try to insert into a full buffer before giving up. Customisable for testing

Functions

This section is empty.

Types

type BuiltSubscription

type BuiltSubscription struct {
	RoomSubscription sync3.RoomSubscription
	RoomIDs          []string
}

type ConnState

type ConnState struct {
	// contains filtered or unexported fields
}

ConnState tracks all high-level connection state for this connection, like the combined request and the underlying sorted room list. It doesn't track positions of the connection.

func NewConnState

func NewConnState(
	userID, deviceID string, userCache *caches.UserCache, globalCache *caches.GlobalCache,
	ex extensions.HandlerInterface, joinChecker JoinChecker, setupHistVec *prometheus.HistogramVec, histVec *prometheus.HistogramVec,
	maxPendingEventUpdates int, maxTransactionIDDelay time.Duration,
) *ConnState

func (*ConnState) Alive

func (s *ConnState) Alive() bool

func (*ConnState) Destroy

func (s *ConnState) Destroy()

Called when the connection is torn down

func (*ConnState) OnIncomingRequest

func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req *sync3.Request, isInitial bool, start time.Time) (*sync3.Response, error)

OnIncomingRequest is guaranteed to be called sequentially (it's protected by a mutex in conn.go)

func (*ConnState) OnRoomUpdate

func (s *ConnState) OnRoomUpdate(ctx context.Context, up caches.RoomUpdate)

Called by the user cache when updates arrive

func (*ConnState) OnUpdate

func (s *ConnState) OnUpdate(ctx context.Context, up caches.Update)

func (*ConnState) PublishEventsUpTo added in v0.99.5

func (s *ConnState) PublishEventsUpTo(roomID string, nid int64)

func (*ConnState) SetCancelCallback added in v0.99.12

func (s *ConnState) SetCancelCallback(cancel context.CancelFunc)

func (*ConnState) UserID

func (s *ConnState) UserID() string

type EnsurePoller

type EnsurePoller struct {
	// contains filtered or unexported fields
}

EnsurePoller is a gadget used by the sliding sync request handler to ensure that we are running a v2 poller for a given device.

func NewEnsurePoller

func NewEnsurePoller(notifier pubsub.Notifier, enablePrometheus bool) *EnsurePoller

func (*EnsurePoller) EnsurePolling

func (p *EnsurePoller) EnsurePolling(ctx context.Context, pid sync2.PollerID, tokenHash string) bool

EnsurePolling blocks until the V2InitialSyncComplete response is received for this device. It is the caller's responsibility to call OnInitialSyncComplete when new events arrive. Returns whether or not the token is expired

func (*EnsurePoller) OnExpiredToken added in v0.99.3

func (p *EnsurePoller) OnExpiredToken(payload *pubsub.V2ExpiredToken)

func (*EnsurePoller) OnInitialSyncComplete

func (p *EnsurePoller) OnInitialSyncComplete(payload *pubsub.V2InitialSyncComplete)

func (*EnsurePoller) Teardown

func (p *EnsurePoller) Teardown()

type JoinChecker

type JoinChecker interface {
	IsUserJoined(userID, roomID string) bool
}

type LazyCache

type LazyCache struct {
	// contains filtered or unexported fields
}

func NewLazyCache

func NewLazyCache() *LazyCache

func (*LazyCache) Add

func (lc *LazyCache) Add(roomID string, userIDs ...string)

func (*LazyCache) AddUser

func (lc *LazyCache) AddUser(roomID, userID string) bool

AddUser to this room. Returns true if this is the first time this user has done so, and hence you should include the member event for this user.

func (*LazyCache) IsLazyLoading

func (lc *LazyCache) IsLazyLoading(roomID string) bool

IsLazyLoading returns true if this room is being lazy loaded.

func (*LazyCache) IsSet

func (lc *LazyCache) IsSet(roomID, userID string) bool

type RoomsBuilder

type RoomsBuilder struct {
	// contains filtered or unexported fields
}

RoomsBuilder gradually accumulates and mixes data required in order to populate the top-level rooms key in the Response. It is not thread-safe and should only be called by the ConnState thread.

The top-level `rooms` key is an amalgamation of:

  • Room subscriptions
  • Rooms within all sliding lists.

The purpose of this builder is to remember which rooms we will be returning data for, along with the room subscription for that room. This then allows efficient database accesses. For example:

  • List A will return !a, !b, !c with Room Subscription X
  • List B will return !b, !c, !d with Room Subscription Y
  • Room sub for !a with Room Subscription Z

Rather than performing each operation in isolation and query for rooms multiple times (where the response data will inevitably be dropped), we can instead amalgamate this into:

  • Room Subscription X+Z -> !a
  • Room Subscription X+Y -> !b, !c
  • Room Subscription Y -> !d

This data will not be wasted when it has been retrieved from the database.

func NewRoomsBuilder

func NewRoomsBuilder() *RoomsBuilder

func (*RoomsBuilder) AddRoomsToSubscription

func (rb *RoomsBuilder) AddRoomsToSubscription(ctx context.Context, id int, roomIDs []string)

Add rooms to the subscription ID previously added. E.g rooms from a list.

func (*RoomsBuilder) AddSubscription

func (rb *RoomsBuilder) AddSubscription(rs sync3.RoomSubscription) (id int)

Add a room subscription to the builder, e.g from a list or room subscription. This should NOT be a combined subscription.

func (*RoomsBuilder) BuildSubscriptions

func (rb *RoomsBuilder) BuildSubscriptions() (result []BuiltSubscription)

Work out which subscriptions need to be combined and produce a new set of subscriptions -> room IDs. Any given room ID will appear in exactly one BuiltSubscription.

func (*RoomsBuilder) IncludesRoom

func (rb *RoomsBuilder) IncludesRoom(roomID string) bool

type SyncLiveHandler

type SyncLiveHandler struct {
	V2           sync2.Client
	Storage      *state.Storage
	V2Store      *sync2.Storage
	V2Sub        *pubsub.V2Sub
	EnsurePoller *EnsurePoller
	ConnMap      *sync3.ConnMap
	Extensions   *extensions.Handler

	Dispatcher *sync3.Dispatcher

	GlobalCache *caches.GlobalCache
	// contains filtered or unexported fields
}

This is a net.http Handler for sync v3. It is responsible for pairing requests to Conns and to ensure that the sync v2 poller is running for this client.

func NewSync3Handler

func NewSync3Handler(
	store *state.Storage, storev2 *sync2.Storage, v2Client sync2.Client, secret string,
	pub pubsub.Notifier, sub pubsub.Listener, enablePrometheus bool, maxPendingEventUpdates int,
	maxTransactionIDDelay time.Duration,
) (*SyncLiveHandler, error)

func (*SyncLiveHandler) Accumulate

func (h *SyncLiveHandler) Accumulate(p *pubsub.V2Accumulate)

Called from the v2 poller, implements V2DataReceiver

func (*SyncLiveHandler) CacheForUser

func (h *SyncLiveHandler) CacheForUser(userID string) *caches.UserCache

func (*SyncLiveHandler) DeviceData

func (h *SyncLiveHandler) DeviceData(ctx context.Context, userID, deviceID string, isInitial bool) *internal.DeviceData

Implements E2EEFetcher DeviceData returns the latest device data for this user. isInitial should be set if this is for an initial /sync request.

func (*SyncLiveHandler) Initialise

func (h *SyncLiveHandler) Initialise(p *pubsub.V2Initialise)

Called from the v2 poller, implements V2DataReceiver

func (*SyncLiveHandler) Listen

func (h *SyncLiveHandler) Listen()

Listen starts all consumers

func (*SyncLiveHandler) OnAccountData

func (h *SyncLiveHandler) OnAccountData(p *pubsub.V2AccountData)

func (*SyncLiveHandler) OnDeviceData

func (h *SyncLiveHandler) OnDeviceData(p *pubsub.V2DeviceData)

push device data updates on waiting conns (otk counts, device list changes)

func (*SyncLiveHandler) OnDeviceMessages added in v0.98.1

func (h *SyncLiveHandler) OnDeviceMessages(p *pubsub.V2DeviceMessages)

func (*SyncLiveHandler) OnExpiredToken added in v0.99.2

func (h *SyncLiveHandler) OnExpiredToken(p *pubsub.V2ExpiredToken)

func (*SyncLiveHandler) OnInitialSyncComplete

func (h *SyncLiveHandler) OnInitialSyncComplete(p *pubsub.V2InitialSyncComplete)

func (*SyncLiveHandler) OnInvalidateRoom added in v0.99.11

func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom)

func (*SyncLiveHandler) OnInvite

func (h *SyncLiveHandler) OnInvite(p *pubsub.V2InviteRoom)

func (*SyncLiveHandler) OnLeftRoom

func (h *SyncLiveHandler) OnLeftRoom(p *pubsub.V2LeaveRoom)

func (*SyncLiveHandler) OnReceipt

func (h *SyncLiveHandler) OnReceipt(p *pubsub.V2Receipt)

func (*SyncLiveHandler) OnStateRedaction added in v0.99.12

func (h *SyncLiveHandler) OnStateRedaction(p *pubsub.V2StateRedaction)

func (*SyncLiveHandler) OnTransactionID added in v0.99.3

func (h *SyncLiveHandler) OnTransactionID(p *pubsub.V2TransactionID)

OnTransactionID is called from the v2 poller, implements V2DataReceiver.

func (*SyncLiveHandler) OnTyping

func (h *SyncLiveHandler) OnTyping(p *pubsub.V2Typing)

func (*SyncLiveHandler) OnUnreadCounts

func (h *SyncLiveHandler) OnUnreadCounts(p *pubsub.V2UnreadCounts)

func (*SyncLiveHandler) ServeHTTP

func (h *SyncLiveHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*SyncLiveHandler) Startup

func (h *SyncLiveHandler) Startup(storeSnapshot *state.StartupSnapshot) error

func (*SyncLiveHandler) Teardown

func (h *SyncLiveHandler) Teardown()

used in tests to close postgres connections

func (*SyncLiveHandler) TransactionIDForEvents

func (h *SyncLiveHandler) TransactionIDForEvents(userID string, deviceID string, eventIDs []string) (eventIDToTxnID map[string]string)

Implements TransactionIDFetcher

type TxnIDWaiter added in v0.99.5

type TxnIDWaiter struct {
	// contains filtered or unexported fields
}

func NewTxnIDWaiter added in v0.99.5

func NewTxnIDWaiter(userID string, maxDelay time.Duration, publish func(bool, caches.Update)) *TxnIDWaiter

func (*TxnIDWaiter) Ingest added in v0.99.5

func (t *TxnIDWaiter) Ingest(up caches.Update)

func (*TxnIDWaiter) PublishUpToNID added in v0.99.5

func (t *TxnIDWaiter) PublishUpToNID(roomID string, publishNID int64)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL