Documentation ¶
Index ¶
- Constants
- Variables
- type BuiltSubscription
- type ConnState
- func (s *ConnState) Alive() bool
- func (s *ConnState) Destroy()
- func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req *sync3.Request, isInitial bool, ...) (*sync3.Response, error)
- func (s *ConnState) OnRoomUpdate(ctx context.Context, up caches.RoomUpdate)
- func (s *ConnState) OnUpdate(ctx context.Context, up caches.Update)
- func (s *ConnState) PublishEventsUpTo(roomID string, nid int64)
- func (s *ConnState) SetCancelCallback(cancel context.CancelFunc)
- func (s *ConnState) UserID() string
- type EnsurePoller
- type JoinChecker
- type LazyCache
- type RoomsBuilder
- func (rb *RoomsBuilder) AddRoomsToSubscription(ctx context.Context, id int, roomIDs []string)
- func (rb *RoomsBuilder) AddSubscription(rs sync3.RoomSubscription) (id int)
- func (rb *RoomsBuilder) BuildSubscriptions() (result []BuiltSubscription)
- func (rb *RoomsBuilder) IncludesRoom(roomID string) bool
- type SyncLiveHandler
- func (h *SyncLiveHandler) Accumulate(p *pubsub.V2Accumulate)
- func (h *SyncLiveHandler) CacheForUser(userID string) *caches.UserCache
- func (h *SyncLiveHandler) DeviceData(ctx context.Context, userID, deviceID string, isInitial bool) *internal.DeviceData
- func (h *SyncLiveHandler) Initialise(p *pubsub.V2Initialise)
- func (h *SyncLiveHandler) Listen()
- func (h *SyncLiveHandler) OnAccountData(p *pubsub.V2AccountData)
- func (h *SyncLiveHandler) OnDeviceData(p *pubsub.V2DeviceData)
- func (h *SyncLiveHandler) OnDeviceMessages(p *pubsub.V2DeviceMessages)
- func (h *SyncLiveHandler) OnExpiredToken(p *pubsub.V2ExpiredToken)
- func (h *SyncLiveHandler) OnInitialSyncComplete(p *pubsub.V2InitialSyncComplete)
- func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom)
- func (h *SyncLiveHandler) OnInvite(p *pubsub.V2InviteRoom)
- func (h *SyncLiveHandler) OnLeftRoom(p *pubsub.V2LeaveRoom)
- func (h *SyncLiveHandler) OnReceipt(p *pubsub.V2Receipt)
- func (h *SyncLiveHandler) OnStateRedaction(p *pubsub.V2StateRedaction)
- func (h *SyncLiveHandler) OnTransactionID(p *pubsub.V2TransactionID)
- func (h *SyncLiveHandler) OnTyping(p *pubsub.V2Typing)
- func (h *SyncLiveHandler) OnUnreadCounts(p *pubsub.V2UnreadCounts)
- func (h *SyncLiveHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
- func (h *SyncLiveHandler) Startup(storeSnapshot *state.StartupSnapshot) error
- func (h *SyncLiveHandler) Teardown()
- func (h *SyncLiveHandler) TransactionIDForEvents(userID string, deviceID string, eventIDs []string) (eventIDToTxnID map[string]string)
- type TxnIDWaiter
Constants ¶
const DefaultSessionID = "default"
Variables ¶
var BufferWaitTime = time.Second * 5
the amount of time to try to insert into a full buffer before giving up. Customisable for testing
Functions ¶
This section is empty.
Types ¶
type BuiltSubscription ¶
type BuiltSubscription struct { RoomSubscription sync3.RoomSubscription RoomIDs []string }
type ConnState ¶
type ConnState struct {
// contains filtered or unexported fields
}
ConnState tracks all high-level connection state for this connection, like the combined request and the underlying sorted room list. It doesn't track positions of the connection.
func NewConnState ¶
func NewConnState( userID, deviceID string, userCache *caches.UserCache, globalCache *caches.GlobalCache, ex extensions.HandlerInterface, joinChecker JoinChecker, setupHistVec *prometheus.HistogramVec, histVec *prometheus.HistogramVec, maxPendingEventUpdates int, maxTransactionIDDelay time.Duration, ) *ConnState
func (*ConnState) OnIncomingRequest ¶
func (s *ConnState) OnIncomingRequest(ctx context.Context, cid sync3.ConnID, req *sync3.Request, isInitial bool, start time.Time) (*sync3.Response, error)
OnIncomingRequest is guaranteed to be called sequentially (it's protected by a mutex in conn.go)
func (*ConnState) OnRoomUpdate ¶
func (s *ConnState) OnRoomUpdate(ctx context.Context, up caches.RoomUpdate)
Called by the user cache when updates arrive
func (*ConnState) PublishEventsUpTo ¶ added in v0.99.5
func (*ConnState) SetCancelCallback ¶ added in v0.99.12
func (s *ConnState) SetCancelCallback(cancel context.CancelFunc)
type EnsurePoller ¶
type EnsurePoller struct {
// contains filtered or unexported fields
}
EnsurePoller is a gadget used by the sliding sync request handler to ensure that we are running a v2 poller for a given device.
func NewEnsurePoller ¶
func NewEnsurePoller(notifier pubsub.Notifier, enablePrometheus bool) *EnsurePoller
func (*EnsurePoller) EnsurePolling ¶
func (p *EnsurePoller) EnsurePolling(ctx context.Context, pid sync2.PollerID, tokenHash string) bool
EnsurePolling blocks until the V2InitialSyncComplete response is received for this device. It is the caller's responsibility to call OnInitialSyncComplete when new events arrive. Returns whether or not the token is expired
func (*EnsurePoller) OnExpiredToken ¶ added in v0.99.3
func (p *EnsurePoller) OnExpiredToken(payload *pubsub.V2ExpiredToken)
func (*EnsurePoller) OnInitialSyncComplete ¶
func (p *EnsurePoller) OnInitialSyncComplete(payload *pubsub.V2InitialSyncComplete)
func (*EnsurePoller) Teardown ¶
func (p *EnsurePoller) Teardown()
type JoinChecker ¶
type LazyCache ¶
type LazyCache struct {
// contains filtered or unexported fields
}
func NewLazyCache ¶
func NewLazyCache() *LazyCache
func (*LazyCache) AddUser ¶
AddUser to this room. Returns true if this is the first time this user has done so, and hence you should include the member event for this user.
func (*LazyCache) IsLazyLoading ¶
IsLazyLoading returns true if this room is being lazy loaded.
type RoomsBuilder ¶
type RoomsBuilder struct {
// contains filtered or unexported fields
}
RoomsBuilder gradually accumulates and mixes data required in order to populate the top-level rooms key in the Response. It is not thread-safe and should only be called by the ConnState thread.
The top-level `rooms` key is an amalgamation of:
- Room subscriptions
- Rooms within all sliding lists.
The purpose of this builder is to remember which rooms we will be returning data for, along with the room subscription for that room. This then allows efficient database accesses. For example:
- List A will return !a, !b, !c with Room Subscription X
- List B will return !b, !c, !d with Room Subscription Y
- Room sub for !a with Room Subscription Z
Rather than performing each operation in isolation and query for rooms multiple times (where the response data will inevitably be dropped), we can instead amalgamate this into:
- Room Subscription X+Z -> !a
- Room Subscription X+Y -> !b, !c
- Room Subscription Y -> !d
This data will not be wasted when it has been retrieved from the database.
func NewRoomsBuilder ¶
func NewRoomsBuilder() *RoomsBuilder
func (*RoomsBuilder) AddRoomsToSubscription ¶
func (rb *RoomsBuilder) AddRoomsToSubscription(ctx context.Context, id int, roomIDs []string)
Add rooms to the subscription ID previously added. E.g rooms from a list.
func (*RoomsBuilder) AddSubscription ¶
func (rb *RoomsBuilder) AddSubscription(rs sync3.RoomSubscription) (id int)
Add a room subscription to the builder, e.g from a list or room subscription. This should NOT be a combined subscription.
func (*RoomsBuilder) BuildSubscriptions ¶
func (rb *RoomsBuilder) BuildSubscriptions() (result []BuiltSubscription)
Work out which subscriptions need to be combined and produce a new set of subscriptions -> room IDs. Any given room ID will appear in exactly one BuiltSubscription.
func (*RoomsBuilder) IncludesRoom ¶
func (rb *RoomsBuilder) IncludesRoom(roomID string) bool
type SyncLiveHandler ¶
type SyncLiveHandler struct { V2 sync2.Client Storage *state.Storage V2Store *sync2.Storage V2Sub *pubsub.V2Sub EnsurePoller *EnsurePoller ConnMap *sync3.ConnMap Extensions *extensions.Handler Dispatcher *sync3.Dispatcher GlobalCache *caches.GlobalCache // contains filtered or unexported fields }
This is a net.http Handler for sync v3. It is responsible for pairing requests to Conns and to ensure that the sync v2 poller is running for this client.
func NewSync3Handler ¶
func (*SyncLiveHandler) Accumulate ¶
func (h *SyncLiveHandler) Accumulate(p *pubsub.V2Accumulate)
Called from the v2 poller, implements V2DataReceiver
func (*SyncLiveHandler) CacheForUser ¶
func (h *SyncLiveHandler) CacheForUser(userID string) *caches.UserCache
func (*SyncLiveHandler) DeviceData ¶
func (h *SyncLiveHandler) DeviceData(ctx context.Context, userID, deviceID string, isInitial bool) *internal.DeviceData
Implements E2EEFetcher DeviceData returns the latest device data for this user. isInitial should be set if this is for an initial /sync request.
func (*SyncLiveHandler) Initialise ¶
func (h *SyncLiveHandler) Initialise(p *pubsub.V2Initialise)
Called from the v2 poller, implements V2DataReceiver
func (*SyncLiveHandler) OnAccountData ¶
func (h *SyncLiveHandler) OnAccountData(p *pubsub.V2AccountData)
func (*SyncLiveHandler) OnDeviceData ¶
func (h *SyncLiveHandler) OnDeviceData(p *pubsub.V2DeviceData)
push device data updates on waiting conns (otk counts, device list changes)
func (*SyncLiveHandler) OnDeviceMessages ¶ added in v0.98.1
func (h *SyncLiveHandler) OnDeviceMessages(p *pubsub.V2DeviceMessages)
func (*SyncLiveHandler) OnExpiredToken ¶ added in v0.99.2
func (h *SyncLiveHandler) OnExpiredToken(p *pubsub.V2ExpiredToken)
func (*SyncLiveHandler) OnInitialSyncComplete ¶
func (h *SyncLiveHandler) OnInitialSyncComplete(p *pubsub.V2InitialSyncComplete)
func (*SyncLiveHandler) OnInvalidateRoom ¶ added in v0.99.11
func (h *SyncLiveHandler) OnInvalidateRoom(p *pubsub.V2InvalidateRoom)
func (*SyncLiveHandler) OnInvite ¶
func (h *SyncLiveHandler) OnInvite(p *pubsub.V2InviteRoom)
func (*SyncLiveHandler) OnLeftRoom ¶
func (h *SyncLiveHandler) OnLeftRoom(p *pubsub.V2LeaveRoom)
func (*SyncLiveHandler) OnReceipt ¶
func (h *SyncLiveHandler) OnReceipt(p *pubsub.V2Receipt)
func (*SyncLiveHandler) OnStateRedaction ¶ added in v0.99.12
func (h *SyncLiveHandler) OnStateRedaction(p *pubsub.V2StateRedaction)
func (*SyncLiveHandler) OnTransactionID ¶ added in v0.99.3
func (h *SyncLiveHandler) OnTransactionID(p *pubsub.V2TransactionID)
OnTransactionID is called from the v2 poller, implements V2DataReceiver.
func (*SyncLiveHandler) OnTyping ¶
func (h *SyncLiveHandler) OnTyping(p *pubsub.V2Typing)
func (*SyncLiveHandler) OnUnreadCounts ¶
func (h *SyncLiveHandler) OnUnreadCounts(p *pubsub.V2UnreadCounts)
func (*SyncLiveHandler) ServeHTTP ¶
func (h *SyncLiveHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
func (*SyncLiveHandler) Startup ¶
func (h *SyncLiveHandler) Startup(storeSnapshot *state.StartupSnapshot) error
func (*SyncLiveHandler) Teardown ¶
func (h *SyncLiveHandler) Teardown()
used in tests to close postgres connections
func (*SyncLiveHandler) TransactionIDForEvents ¶
func (h *SyncLiveHandler) TransactionIDForEvents(userID string, deviceID string, eventIDs []string) (eventIDToTxnID map[string]string)
Implements TransactionIDFetcher
type TxnIDWaiter ¶ added in v0.99.5
type TxnIDWaiter struct {
// contains filtered or unexported fields
}
func NewTxnIDWaiter ¶ added in v0.99.5
func (*TxnIDWaiter) Ingest ¶ added in v0.99.5
func (t *TxnIDWaiter) Ingest(up caches.Update)
func (*TxnIDWaiter) PublishUpToNID ¶ added in v0.99.5
func (t *TxnIDWaiter) PublishUpToNID(roomID string, publishNID int64)