Documentation ¶
Index ¶
- Constants
- Variables
- func ChangedFilters(prev, next *RequestFilters) bool
- func IncludedRoomIDsInOps(ops []ResponseOp) map[string]struct{}
- type Conn
- type ConnHandler
- type ConnID
- type ConnMap
- type Dispatcher
- func (d *Dispatcher) IsUserJoined(userID, roomID string) bool
- func (d *Dispatcher) OnNewEvents(roomID string, events []json.RawMessage, latestPos int64)
- func (d *Dispatcher) Register(userID string, r Receiver)
- func (d *Dispatcher) Startup(roomToJoinedUsers map[string][]string) error
- func (d *Dispatcher) Unregister(userID string)
- type FilteredSortableRooms
- type JoinedRoomsTracker
- func (t *JoinedRoomsTracker) IsUserJoined(userID, roomID string) bool
- func (t *JoinedRoomsTracker) JoinedRoomsForUser(userID string) []string
- func (t *JoinedRoomsTracker) JoinedUsersForRoom(roomID string) []string
- func (t *JoinedRoomsTracker) UserJoinedRoom(userID, roomID string) bool
- func (t *JoinedRoomsTracker) UserLeftRoom(userID, roomID string)
- type Receiver
- type Request
- func (r *Request) ApplyDelta(nextReq *Request) (result *Request, subs, unsubs []string)
- func (r *Request) GetRequiredState(listIndex int, roomID string) [][2]string
- func (r *Request) GetTimelineLimit(listIndex int, roomID string) int64
- func (r *Request) Same(other *Request) bool
- func (r *Request) SetPos(pos int64)
- func (r *Request) SetTimeoutMSecs(timeout int)
- func (r *Request) TimeoutMSecs() int
- type RequestFilters
- type RequestList
- type Response
- type ResponseOp
- type ResponseOpRange
- type ResponseOpSingle
- type Room
- type RoomConnMetadata
- type RoomSubscription
- type SliceRanges
- func (r SliceRanges) Delta(next SliceRanges) (added SliceRanges, removed SliceRanges, same SliceRanges)
- func (r SliceRanges) Inside(i int64) bool
- func (r SliceRanges) LowerClamp(i int64) (clampIndex int64)
- func (r SliceRanges) SliceInto(slice Subslicer) []Subslicer
- func (r SliceRanges) UpperClamp(i int64) (clampIndex int64)
- func (r SliceRanges) Valid() bool
- type SortableRoomLists
- func (s *SortableRoomLists) Counts() []int
- func (s *SortableRoomLists) ForEach(fn func(index int, fsr *FilteredSortableRooms))
- func (s *SortableRoomLists) List(index int) *FilteredSortableRooms
- func (s *SortableRoomLists) ListExists(index int) bool
- func (s *SortableRoomLists) Set(index int, val *FilteredSortableRooms)
- type SortableRooms
- func (s *SortableRooms) Add(r RoomConnMetadata) bool
- func (s *SortableRooms) Get(index int) RoomConnMetadata
- func (s *SortableRooms) IndexOf(roomID string) (int, bool)
- func (s *SortableRooms) Len() int64
- func (s *SortableRooms) Remove(roomID string) int
- func (s *SortableRooms) RoomIDs() []string
- func (s *SortableRooms) Sort(sortBy []string) error
- func (s *SortableRooms) Subslice(i, j int64) Subslicer
- func (s *SortableRooms) UpdateGlobalRoomMetadata(roomMeta *internal.RoomMetadata) int
- func (s *SortableRooms) UpdateUserRoomMetadata(roomID string, userEvent *caches.UserRoomData) int
- type Subslicer
Constants ¶
const ( OpSync = "SYNC" OpInvalidate = "INVALIDATE" OpInsert = "INSERT" OpDelete = "DELETE" OpUpdate = "UPDATE" )
const DispatcherAllUsers = "-"
Variables ¶
var ( SortByName = "by_name" SortByRecency = "by_recency" SortByNotificationCount = "by_notification_count" SortByHighlightCount = "by_highlight_count" SortBy = []string{SortByHighlightCount, SortByName, SortByNotificationCount, SortByRecency} DefaultTimelineLimit = int64(20) DefaultTimeoutMSecs = 10 * 1000 // 10s )
Functions ¶
func ChangedFilters ¶
func ChangedFilters(prev, next *RequestFilters) bool
func IncludedRoomIDsInOps ¶
func IncludedRoomIDsInOps(ops []ResponseOp) map[string]struct{}
Return which room IDs these set of operations are returning information on. Information means things like SYNC/INSERT/UPDATE, and not DELETE/INVALIDATE.
Types ¶
type Conn ¶
type Conn struct { ConnID ConnID // contains filtered or unexported fields }
Conn is an abstraction of a long-poll connection. It automatically handles the position values of the /sync request, including sending cached data in the event of retries. It does not handle the contents of the data at all.
func NewConn ¶
func NewConn(connID ConnID, h ConnHandler) *Conn
func (*Conn) OnIncomingRequest ¶
func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request) (resp *Response, herr *internal.HandlerError)
OnIncomingRequest advances the clients position in the stream, returning the response position and data.
type ConnHandler ¶
type ConnHandler interface { // Callback which is allowed to block as long as the context is active. Return the response // to send back or an error. Errors of type *internal.HandlerError are inspected for the correct // status code to send back. OnIncomingRequest(ctx context.Context, cid ConnID, req *Request, isInitial bool) (*Response, error) UserID() string Destroy() Alive() bool }
type ConnMap ¶
type ConnMap struct {
// contains filtered or unexported fields
}
ConnMap stores a collection of Conns.
func NewConnMap ¶
func NewConnMap() *ConnMap
func (*ConnMap) Conn ¶
Conn returns a connection with this ConnID. Returns nil if no connection exists.
func (*ConnMap) CreateConn ¶
func (m *ConnMap) CreateConn(cid ConnID, newConnHandler func() ConnHandler) (*Conn, bool)
Atomically gets or creates a connection with this connection ID. Calls newConn if a new connection is required.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatches live events to caches
func NewDispatcher ¶
func NewDispatcher() *Dispatcher
func (*Dispatcher) IsUserJoined ¶
func (d *Dispatcher) IsUserJoined(userID, roomID string) bool
func (*Dispatcher) OnNewEvents ¶
func (d *Dispatcher) OnNewEvents( roomID string, events []json.RawMessage, latestPos int64, )
Called by v2 pollers when we receive new events
func (*Dispatcher) Register ¶
func (d *Dispatcher) Register(userID string, r Receiver)
func (*Dispatcher) Startup ¶
func (d *Dispatcher) Startup(roomToJoinedUsers map[string][]string) error
Load joined members into the dispatcher. MUST BE CALLED BEFORE V2 POLL LOOPS START.
func (*Dispatcher) Unregister ¶
func (d *Dispatcher) Unregister(userID string)
type FilteredSortableRooms ¶
type FilteredSortableRooms struct { *SortableRooms // contains filtered or unexported fields }
FilteredSortableRooms is SortableRooms but where rooms are filtered before being added to the list. Updates to room metadata may result in rooms being added/removed.
func NewFilteredSortableRooms ¶
func NewFilteredSortableRooms(rooms []RoomConnMetadata, filter *RequestFilters) *FilteredSortableRooms
func (*FilteredSortableRooms) Add ¶
func (f *FilteredSortableRooms) Add(r RoomConnMetadata) bool
func (*FilteredSortableRooms) UpdateGlobalRoomMetadata ¶
func (f *FilteredSortableRooms) UpdateGlobalRoomMetadata(r *internal.RoomMetadata) int
func (*FilteredSortableRooms) UpdateUserRoomMetadata ¶
func (f *FilteredSortableRooms) UpdateUserRoomMetadata(roomID string, userEvent *caches.UserRoomData) int
type JoinedRoomsTracker ¶
type JoinedRoomsTracker struct {
// contains filtered or unexported fields
}
Tracks who is joined to which rooms. This is critical from a security perspective in order to ensure that only the users joined to the room receive events in that room. Consider the situation where Alice and Bob are joined to room X. If Alice gets kicked from X, the proxy server will still receive messages for room X due to Bob being joined to the room. We therefore need to decide which active connections should be pushed events, which is what this tracker does.
func NewJoinedRoomsTracker ¶
func NewJoinedRoomsTracker() *JoinedRoomsTracker
func (*JoinedRoomsTracker) IsUserJoined ¶
func (t *JoinedRoomsTracker) IsUserJoined(userID, roomID string) bool
func (*JoinedRoomsTracker) JoinedRoomsForUser ¶
func (t *JoinedRoomsTracker) JoinedRoomsForUser(userID string) []string
func (*JoinedRoomsTracker) JoinedUsersForRoom ¶
func (t *JoinedRoomsTracker) JoinedUsersForRoom(roomID string) []string
func (*JoinedRoomsTracker) UserJoinedRoom ¶
func (t *JoinedRoomsTracker) UserJoinedRoom(userID, roomID string) bool
returns true if the state changed
func (*JoinedRoomsTracker) UserLeftRoom ¶
func (t *JoinedRoomsTracker) UserLeftRoom(userID, roomID string)
type Request ¶
type Request struct { Lists []RequestList `json:"lists"` RoomSubscriptions map[string]RoomSubscription `json:"room_subscriptions"` UnsubscribeRooms []string `json:"unsubscribe_rooms"` Extensions extensions.Request `json:"extensions"` // contains filtered or unexported fields }
func (*Request) ApplyDelta ¶
Apply this delta on top of the request. Returns a new Request with the combined output, ready for persisting into the database. Also returns the DELTA for rooms to subscribe and unsubscribe from.
func (*Request) GetRequiredState ¶
func (*Request) GetTimelineLimit ¶
func (*Request) SetTimeoutMSecs ¶
func (*Request) TimeoutMSecs ¶
type RequestFilters ¶
type RequestFilters struct { Spaces []string `json:"spaces"` IsDM *bool `json:"is_dm"` IsEncrypted *bool `json:"is_encrypted"` IsInvite *bool `json:"is_invite"` IsTombstoned *bool `json:"is_tombstoned"` RoomNameFilter string `json:"room_name_like"` }
func (*RequestFilters) Include ¶
func (rf *RequestFilters) Include(r *RoomConnMetadata) bool
type RequestList ¶
type RequestList struct { RoomSubscription Ranges SliceRanges `json:"ranges"` Sort []string `json:"sort"` Filters *RequestFilters `json:"filters"` }
type Response ¶
type Response struct { Ops []ResponseOp `json:"ops"` RoomSubscriptions map[string]Room `json:"room_subscriptions"` Counts []int `json:"counts"` Extensions extensions.Response `json:"extensions"` Pos string `json:"pos"` Session string `json:"session_id,omitempty"` }
func (*Response) UnmarshalJSON ¶
Custom unmarshal so we can dynamically create the right ResponseOp for Ops
type ResponseOp ¶
type ResponseOpRange ¶
type ResponseOpRange struct { Operation string `json:"op"` List int `json:"list"` Range []int64 `json:"range,omitempty"` Rooms []Room `json:"rooms,omitempty"` }
func (*ResponseOpRange) IncludedRoomIDs ¶
func (r *ResponseOpRange) IncludedRoomIDs() []string
func (*ResponseOpRange) Op ¶
func (r *ResponseOpRange) Op() string
type ResponseOpSingle ¶
type ResponseOpSingle struct { Operation string `json:"op"` List int `json:"list"` Index *int `json:"index,omitempty"` // 0 is a valid value, hence *int Room *Room `json:"room,omitempty"` }
func (*ResponseOpSingle) IncludedRoomIDs ¶
func (r *ResponseOpSingle) IncludedRoomIDs() []string
func (*ResponseOpSingle) Op ¶
func (r *ResponseOpSingle) Op() string
type Room ¶
type Room struct { RoomID string `json:"room_id,omitempty"` Name string `json:"name,omitempty"` RequiredState []json.RawMessage `json:"required_state,omitempty"` Timeline []json.RawMessage `json:"timeline,omitempty"` InviteState []json.RawMessage `json:"invite_state,omitempty"` NotificationCount int64 `json:"notification_count"` HighlightCount int64 `json:"highlight_count"` Initial bool `json:"initial,omitempty"` IsDM bool `json:"is_dm,omitempty"` PrevBatch string `json:"prev_batch,omitempty"` }
type RoomConnMetadata ¶
type RoomConnMetadata struct { internal.RoomMetadata caches.UserRoomData CanonicalisedName string // stripped leading symbols like #, all in lower case }
type RoomSubscription ¶
type SliceRanges ¶
type SliceRanges [][2]int64
func (SliceRanges) Delta ¶
func (r SliceRanges) Delta(next SliceRanges) (added SliceRanges, removed SliceRanges, same SliceRanges)
Delta returns the ranges which are unchanged, added and removed. Intelligently handles overlaps.
func (SliceRanges) Inside ¶
func (r SliceRanges) Inside(i int64) bool
Inside returns true if i is inside the range
func (SliceRanges) LowerClamp ¶
func (r SliceRanges) LowerClamp(i int64) (clampIndex int64)
LowerClamp returns the end-index e.g [0,99] -> 99 of the first range lower than i. This is critical to determine which index to delete when rooms move outside of the tracked range. If `i` is inside a range, returns the clamp for the lower range. Returns -1 if a clamp cannot be found e.g [0,99] i=50 -> -1 whereas [0,99][150,199] i=160 -> 99
func (SliceRanges) SliceInto ¶
func (r SliceRanges) SliceInto(slice Subslicer) []Subslicer
Slice into this range, returning subslices of slice
func (SliceRanges) UpperClamp ¶
func (r SliceRanges) UpperClamp(i int64) (clampIndex int64)
UpperClamp returns the start-index e.g [50,99] -> 50 of the first range higher than i. If `i` is inside a range, returns -1. E.g [50,99] i=30 -> 50, [50,99] i=55 -> -1
func (SliceRanges) Valid ¶
func (r SliceRanges) Valid() bool
type SortableRoomLists ¶
type SortableRoomLists struct {
// contains filtered or unexported fields
}
SortableRoomLists is a list of FilteredSortableRooms.
func (*SortableRoomLists) Counts ¶
func (s *SortableRoomLists) Counts() []int
Counts returns the counts of all lists
func (*SortableRoomLists) ForEach ¶
func (s *SortableRoomLists) ForEach(fn func(index int, fsr *FilteredSortableRooms))
func (*SortableRoomLists) List ¶
func (s *SortableRoomLists) List(index int) *FilteredSortableRooms
func (*SortableRoomLists) ListExists ¶
func (s *SortableRoomLists) ListExists(index int) bool
func (*SortableRoomLists) Set ¶
func (s *SortableRoomLists) Set(index int, val *FilteredSortableRooms)
type SortableRooms ¶
type SortableRooms struct {
// contains filtered or unexported fields
}
SortableRooms represents a list of rooms which can be sorted and updated. Maintains mappings of room IDs to current index positions after sorting.
func NewSortableRooms ¶
func NewSortableRooms(rooms []RoomConnMetadata) *SortableRooms
func (*SortableRooms) Add ¶
func (s *SortableRooms) Add(r RoomConnMetadata) bool
Add a room to the list. Returns true if the room was added.
func (*SortableRooms) Get ¶
func (s *SortableRooms) Get(index int) RoomConnMetadata
func (*SortableRooms) Len ¶
func (s *SortableRooms) Len() int64
func (*SortableRooms) Remove ¶
func (s *SortableRooms) Remove(roomID string) int
func (*SortableRooms) RoomIDs ¶
func (s *SortableRooms) RoomIDs() []string
func (*SortableRooms) Sort ¶
func (s *SortableRooms) Sort(sortBy []string) error
func (*SortableRooms) Subslice ¶
func (s *SortableRooms) Subslice(i, j int64) Subslicer
func (*SortableRooms) UpdateGlobalRoomMetadata ¶
func (s *SortableRooms) UpdateGlobalRoomMetadata(roomMeta *internal.RoomMetadata) int
func (*SortableRooms) UpdateUserRoomMetadata ¶
func (s *SortableRooms) UpdateUserRoomMetadata(roomID string, userEvent *caches.UserRoomData) int