Documentation ¶
Index ¶
- Constants
- Variables
- type Conn
- type ConnHandler
- type ConnID
- type ConnMap
- type Dispatcher
- func (d *Dispatcher) IsUserJoined(userID, roomID string) bool
- func (d *Dispatcher) OnNewEvents(roomID string, events []json.RawMessage, latestPos int64)
- func (d *Dispatcher) OnNewInitialRoomState(roomID string, state []json.RawMessage)
- func (d *Dispatcher) Register(userID string, r Receiver) error
- func (d *Dispatcher) Startup(roomToJoinedUsers map[string][]string) error
- func (d *Dispatcher) Unregister(userID string)
- type FilteredSortableRooms
- type InternalRequestLists
- func (s *InternalRequestLists) AssignList(index int, filters *RequestFilters, sort []string, ...) (*FilteredSortableRooms, bool)
- func (s *InternalRequestLists) Count(index int) int
- func (s *InternalRequestLists) DeleteList(index int)
- func (s *InternalRequestLists) Get(listIndex int) *FilteredSortableRooms
- func (s *InternalRequestLists) Len() int
- func (s *InternalRequestLists) RemoveRoom(roomID string)
- func (s *InternalRequestLists) Room(roomID string) *RoomConnMetadata
- func (s *InternalRequestLists) SetRoom(r RoomConnMetadata) (delta RoomDelta)
- type JoinedRoomsTracker
- func (t *JoinedRoomsTracker) IsUserJoined(userID, roomID string) bool
- func (t *JoinedRoomsTracker) JoinedRoomsForUser(userID string) []string
- func (t *JoinedRoomsTracker) JoinedUsersForRoom(roomID string, filter func(userID string) bool) (matchedUserIDs []string, joinCount int)
- func (t *JoinedRoomsTracker) NumInvitedUsersForRoom(roomID string) int
- func (t *JoinedRoomsTracker) Startup(roomToJoinedUsers map[string][]string)
- func (t *JoinedRoomsTracker) UserJoinedRoom(userID, roomID string) bool
- func (t *JoinedRoomsTracker) UserLeftRoom(userID, roomID string)
- func (t *JoinedRoomsTracker) UsersInvitedToRoom(userIDs []string, roomID string)
- func (t *JoinedRoomsTracker) UsersJoinedRoom(userIDs []string, roomID string) bool
- type List
- type ListOp
- type OverwriteVal
- type Receiver
- type Request
- func (r *Request) ApplyDelta(nextReq *Request) (result *Request, delta *RequestDelta)
- func (r *Request) GetTimelineLimit(listIndex int, roomID string) int64
- func (r *Request) Same(other *Request) bool
- func (r *Request) SetPos(pos int64)
- func (r *Request) SetTimeoutMSecs(timeout int)
- func (r *Request) TimeoutMSecs() int
- type RequestDelta
- type RequestFilters
- type RequestList
- func (rl *RequestList) CalculateMoveIndexes(fromIndex, toIndex int) (fromTos [][2]int)
- func (rl *RequestList) FiltersChanged(next *RequestList) bool
- func (rl *RequestList) ShouldGetAllRooms() bool
- func (rl *RequestList) SortOrderChanged(next *RequestList) bool
- func (rl *RequestList) WriteDeleteOp(deletedIndex int) *ResponseOpSingle
- func (rl *RequestList) WriteInsertOp(insertedIndex int, roomID string) *ResponseOpSingle
- func (rl *RequestList) WriteSwapOp(roomID string, fromIndex, toIndex int) []ResponseOp
- type RequestListDelta
- type Response
- type ResponseList
- type ResponseOp
- type ResponseOpRange
- type ResponseOpSingle
- type Room
- type RoomConnMetadata
- type RoomDelta
- type RoomFinder
- type RoomListDelta
- type RoomSubscription
- type SliceRanges
- func (r SliceRanges) ClosestInDirection(i int64, towardsZero bool) (closestIndex int64)
- func (r SliceRanges) Delta(next SliceRanges) (added SliceRanges, removed SliceRanges, same SliceRanges)
- func (r SliceRanges) Inside(i int64) ([2]int64, bool)
- func (r SliceRanges) SliceInto(slice Subslicer) []Subslicer
- func (r SliceRanges) Valid() bool
- type SortableRooms
- func (s *SortableRooms) Add(roomID string) bool
- func (s *SortableRooms) Get(index int) string
- func (s *SortableRooms) IndexOf(roomID string) (int, bool)
- func (s *SortableRooms) Len() int64
- func (s *SortableRooms) Remove(roomID string) int
- func (s *SortableRooms) RoomIDs() []string
- func (s *SortableRooms) Sort(sortBy []string) error
- func (s *SortableRooms) Subslice(i, j int64) Subslicer
- type Subslicer
Constants ¶
const ( OpSync = "SYNC" OpInvalidate = "INVALIDATE" OpInsert = "INSERT" OpDelete = "DELETE" )
const DispatcherAllUsers = "-"
Variables ¶
var ( SortByName = "by_name" SortByRecency = "by_recency" SortByNotificationCount = "by_notification_count" SortByHighlightCount = "by_highlight_count" SortBy = []string{SortByHighlightCount, SortByName, SortByNotificationCount, SortByRecency} DefaultTimelineLimit = int64(20) DefaultTimeoutMSecs = 10 * 1000 // 10s )
Functions ¶
This section is empty.
Types ¶
type Conn ¶
type Conn struct { ConnID ConnID // contains filtered or unexported fields }
Conn is an abstraction of a long-poll connection. It automatically handles the position values of the /sync request, including sending cached data in the event of retries. It does not handle the contents of the data at all.
func NewConn ¶
func NewConn(connID ConnID, h ConnHandler) *Conn
func (*Conn) OnIncomingRequest ¶
func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request) (resp *Response, herr *internal.HandlerError)
OnIncomingRequest advances the clients position in the stream, returning the response position and data.
type ConnHandler ¶
type ConnHandler interface { // Callback which is allowed to block as long as the context is active. Return the response // to send back or an error. Errors of type *internal.HandlerError are inspected for the correct // status code to send back. OnIncomingRequest(ctx context.Context, cid ConnID, req *Request, isInitial bool) (*Response, error) UserID() string Destroy() Alive() bool }
type ConnMap ¶
type ConnMap struct {
// contains filtered or unexported fields
}
ConnMap stores a collection of Conns.
func NewConnMap ¶
func NewConnMap() *ConnMap
func (*ConnMap) Conn ¶
Conn returns a connection with this ConnID. Returns nil if no connection exists.
func (*ConnMap) CreateConn ¶
func (m *ConnMap) CreateConn(cid ConnID, newConnHandler func() ConnHandler) (*Conn, bool)
Atomically gets or creates a connection with this connection ID. Calls newConn if a new connection is required.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatches live events to caches
func NewDispatcher ¶
func NewDispatcher() *Dispatcher
func (*Dispatcher) IsUserJoined ¶
func (d *Dispatcher) IsUserJoined(userID, roomID string) bool
func (*Dispatcher) OnNewEvents ¶
func (d *Dispatcher) OnNewEvents( roomID string, events []json.RawMessage, latestPos int64, )
Called by v2 pollers when we receive new events
func (*Dispatcher) OnNewInitialRoomState ¶ added in v0.5.2
func (d *Dispatcher) OnNewInitialRoomState(roomID string, state []json.RawMessage)
Called by v2 pollers when we receive an initial state block. Very similar to OnNewEvents but done in bulk for speed.
func (*Dispatcher) Startup ¶
func (d *Dispatcher) Startup(roomToJoinedUsers map[string][]string) error
Load joined members into the dispatcher. MUST BE CALLED BEFORE V2 POLL LOOPS START.
func (*Dispatcher) Unregister ¶
func (d *Dispatcher) Unregister(userID string)
type FilteredSortableRooms ¶
type FilteredSortableRooms struct { *SortableRooms // contains filtered or unexported fields }
FilteredSortableRooms is SortableRooms but where rooms are filtered before being added to the list. Updates to room metadata may result in rooms being added/removed.
func NewFilteredSortableRooms ¶
func NewFilteredSortableRooms(finder RoomFinder, roomIDs []string, filter *RequestFilters) *FilteredSortableRooms
func (*FilteredSortableRooms) Add ¶
func (f *FilteredSortableRooms) Add(roomID string) bool
type InternalRequestLists ¶ added in v0.2.0
type InternalRequestLists struct {
// contains filtered or unexported fields
}
InternalRequestLists is a list of lists which matches each index position in the request JSON 'lists'. It contains all the internal metadata for rooms and controls access and updatings of said lists.
func NewInternalRequestLists ¶ added in v0.3.1
func NewInternalRequestLists() *InternalRequestLists
func (*InternalRequestLists) AssignList ¶ added in v0.2.0
func (s *InternalRequestLists) AssignList(index int, filters *RequestFilters, sort []string, shouldOverwrite OverwriteVal) (*FilteredSortableRooms, bool)
Assign a new list at the given index. If Overwrite, any existing list is replaced. If DoNotOverwrite, the existing list is returned if one exists, else a new list is created. Returns the list and true if the list was overwritten.
func (*InternalRequestLists) Count ¶ added in v0.2.0
func (s *InternalRequestLists) Count(index int) int
Count returns the count of total rooms in this list
func (*InternalRequestLists) DeleteList ¶ added in v0.2.0
func (s *InternalRequestLists) DeleteList(index int)
func (*InternalRequestLists) Get ¶ added in v0.4.1
func (s *InternalRequestLists) Get(listIndex int) *FilteredSortableRooms
func (*InternalRequestLists) Len ¶ added in v0.2.0
func (s *InternalRequestLists) Len() int
func (*InternalRequestLists) RemoveRoom ¶ added in v0.2.0
func (s *InternalRequestLists) RemoveRoom(roomID string)
Remove a room from all lists e.g retired an invite, left a room
func (*InternalRequestLists) Room ¶ added in v0.4.1
func (s *InternalRequestLists) Room(roomID string) *RoomConnMetadata
func (*InternalRequestLists) SetRoom ¶ added in v0.4.1
func (s *InternalRequestLists) SetRoom(r RoomConnMetadata) (delta RoomDelta)
type JoinedRoomsTracker ¶
type JoinedRoomsTracker struct {
// contains filtered or unexported fields
}
Tracks who is joined to which rooms. This is critical from a security perspective in order to ensure that only the users joined to the room receive events in that room. Consider the situation where Alice and Bob are joined to room X. If Alice gets kicked from X, the proxy server will still receive messages for room X due to Bob being joined to the room. We therefore need to decide which active connections should be pushed events, which is what this tracker does.
func NewJoinedRoomsTracker ¶
func NewJoinedRoomsTracker() *JoinedRoomsTracker
func (*JoinedRoomsTracker) IsUserJoined ¶
func (t *JoinedRoomsTracker) IsUserJoined(userID, roomID string) bool
func (*JoinedRoomsTracker) JoinedRoomsForUser ¶
func (t *JoinedRoomsTracker) JoinedRoomsForUser(userID string) []string
func (*JoinedRoomsTracker) JoinedUsersForRoom ¶
func (t *JoinedRoomsTracker) JoinedUsersForRoom(roomID string, filter func(userID string) bool) (matchedUserIDs []string, joinCount int)
JoinedUsersForRoom returns the joined users in the given room, filtered by the filter function if provided. If one is not provided, all joined users are returned. Returns the join count at the time this function was called.
func (*JoinedRoomsTracker) NumInvitedUsersForRoom ¶ added in v0.4.1
func (t *JoinedRoomsTracker) NumInvitedUsersForRoom(roomID string) int
func (*JoinedRoomsTracker) Startup ¶ added in v0.5.2
func (t *JoinedRoomsTracker) Startup(roomToJoinedUsers map[string][]string)
Startup efficiently sets up the joined rooms tracker, but isn't safe to call with live traffic, as it replaces all known in-memory state. Panics if called on a non-empty tracker.
func (*JoinedRoomsTracker) UserJoinedRoom ¶
func (t *JoinedRoomsTracker) UserJoinedRoom(userID, roomID string) bool
returns true if the state changed
func (*JoinedRoomsTracker) UserLeftRoom ¶
func (t *JoinedRoomsTracker) UserLeftRoom(userID, roomID string)
func (*JoinedRoomsTracker) UsersInvitedToRoom ¶ added in v0.5.2
func (t *JoinedRoomsTracker) UsersInvitedToRoom(userIDs []string, roomID string)
func (*JoinedRoomsTracker) UsersJoinedRoom ¶ added in v0.5.2
func (t *JoinedRoomsTracker) UsersJoinedRoom(userIDs []string, roomID string) bool
returns true if the state changed for any user in userIDs
type OverwriteVal ¶ added in v0.2.0
type OverwriteVal bool
var ( DoNotOverwrite OverwriteVal = false Overwrite OverwriteVal = true )
type Request ¶
type Request struct { TxnID string `json:"txn_id"` Lists []RequestList `json:"lists"` RoomSubscriptions map[string]RoomSubscription `json:"room_subscriptions"` UnsubscribeRooms []string `json:"unsubscribe_rooms"` Extensions extensions.Request `json:"extensions"` // contains filtered or unexported fields }
func (*Request) ApplyDelta ¶
func (r *Request) ApplyDelta(nextReq *Request) (result *Request, delta *RequestDelta)
Apply this delta on top of the request. Returns a new Request with the combined output, along with the delta operations `nextReq` cannot be nil, but `r` can be nil in the case of an initial request.
func (*Request) GetTimelineLimit ¶
func (*Request) SetTimeoutMSecs ¶
func (*Request) TimeoutMSecs ¶
type RequestDelta ¶ added in v0.2.0
type RequestDelta struct { // new room IDs to subscribe to Subs []string // room IDs to unsubscribe from Unsubs []string // The complete union of both lists (contains max(a,b) lists) Lists []RequestListDelta }
Internal struct used to represent the diffs between 2 requests
type RequestFilters ¶
type RequestFilters struct { Spaces []string `json:"spaces"` IsDM *bool `json:"is_dm"` IsEncrypted *bool `json:"is_encrypted"` IsInvite *bool `json:"is_invite"` IsTombstoned *bool `json:"is_tombstoned"` RoomTypes []*string `json:"room_types"` NotRoomTypes []*string `json:"not_room_types"` RoomNameFilter string `json:"room_name_like"` Tags []string `json:"tags"` NotTags []string `json:"not_tags"` }
func (*RequestFilters) Include ¶
func (rf *RequestFilters) Include(r *RoomConnMetadata) bool
type RequestList ¶
type RequestList struct { RoomSubscription Ranges SliceRanges `json:"ranges"` Sort []string `json:"sort"` Filters *RequestFilters `json:"filters"` SlowGetAllRooms *bool `json:"slow_get_all_rooms,omitempty"` }
func (*RequestList) CalculateMoveIndexes ¶ added in v0.2.0
func (rl *RequestList) CalculateMoveIndexes(fromIndex, toIndex int) (fromTos [][2]int)
Calculate the real from -> to index positions for the two input index positions. This takes into account the ranges on the list.
func (*RequestList) FiltersChanged ¶ added in v0.2.0
func (rl *RequestList) FiltersChanged(next *RequestList) bool
func (*RequestList) ShouldGetAllRooms ¶ added in v0.2.0
func (rl *RequestList) ShouldGetAllRooms() bool
func (*RequestList) SortOrderChanged ¶ added in v0.2.0
func (rl *RequestList) SortOrderChanged(next *RequestList) bool
func (*RequestList) WriteDeleteOp ¶ added in v0.2.0
func (rl *RequestList) WriteDeleteOp(deletedIndex int) *ResponseOpSingle
Write a delete operation for this list. Can return nil for invalid indexes or if this index isn't being tracked. Useful when rooms are removed from the list e.g left rooms.
func (*RequestList) WriteInsertOp ¶ added in v0.2.0
func (rl *RequestList) WriteInsertOp(insertedIndex int, roomID string) *ResponseOpSingle
Write an insert operation for this list. Can return nil for indexes not being tracked. Useful when rooms are added to the list e.g newly joined rooms.
func (*RequestList) WriteSwapOp ¶ added in v0.2.0
func (rl *RequestList) WriteSwapOp( roomID string, fromIndex, toIndex int, ) []ResponseOp
Move a room from an absolute index position to another absolute position. These positions do not need to be inside a valid range. Returns 0-2 operations. For example:
1,2,3,4,5 tracking range [0,4] 3 bumps to top -> 3,1,2,4,5 -> DELETE index=2, INSERT val=3 index=0 7 bumps to top -> 7,1,2,3,4 -> DELETE index=4, INSERT val=7 index=0 7 bumps to op again -> 7,1,2,3,4 -> no-op as from == to index new room 8 in i=5 -> 7,1,2,3,4,8 -> no-op as 8 is outside the range.
Returns the list of ops as well as the new toIndex if it wasn't inside a range.
type RequestListDelta ¶ added in v0.2.0
type RequestListDelta struct { // What was there before, nullable Prev *RequestList // What is there now, nullable. Combined result. Curr *RequestList }
Internal struct used to represent a single list delta.
type Response ¶
type Response struct { Lists []ResponseList `json:"lists"` Rooms map[string]Room `json:"rooms"` Extensions extensions.Response `json:"extensions"` Pos string `json:"pos"` TxnID string `json:"txn_id,omitempty"` Session string `json:"session_id,omitempty"` }
func (*Response) UnmarshalJSON ¶
Custom unmarshal so we can dynamically create the right ResponseOp for Ops
type ResponseList ¶ added in v0.2.0
type ResponseList struct { Ops []ResponseOp `json:"ops,omitempty"` Count int `json:"count"` }
type ResponseOp ¶
type ResponseOp interface { Op() string // which rooms are we giving data about IncludedRoomIDs() []string }
func CalculateListOps ¶ added in v0.4.1
func CalculateListOps(reqList *RequestList, list List, roomID string, listOp ListOp) (ops []ResponseOp, subs []string)
CalculateListOps contains the core list moving algorithm. It accepts the client's list of ranges, the underlying list on which to perform operations on, and the room which was modified and in what way. It returns a list of INSERT/DELETE operations, which may be zero length, as well as which rooms are newly added into the window.
A,B,C,D,E,F,G,H,I <-- List `----` `----` <-- RequestList.Ranges DEL E | ADD J | CHANGE C <-- ListOp RoomID
returns:
[ {op:DELETE, index:2}, {op:INSERT, index:0, room_id:A} ] <--- []ResponseOp [ "A" ] <--- []string, new room subscriptions, if it wasn't in the window before
This function will modify List to Add/Delete/Sort appropriately.
type ResponseOpRange ¶
type ResponseOpRange struct { Operation string `json:"op"` Range []int64 `json:"range,omitempty"` RoomIDs []string `json:"room_ids,omitempty"` }
func (*ResponseOpRange) IncludedRoomIDs ¶
func (r *ResponseOpRange) IncludedRoomIDs() []string
func (*ResponseOpRange) Op ¶
func (r *ResponseOpRange) Op() string
type ResponseOpSingle ¶
type ResponseOpSingle struct { Operation string `json:"op"` Index *int `json:"index,omitempty"` // 0 is a valid value, hence *int RoomID string `json:"room_id,omitempty"` }
func (*ResponseOpSingle) IncludedRoomIDs ¶
func (r *ResponseOpSingle) IncludedRoomIDs() []string
func (*ResponseOpSingle) Op ¶
func (r *ResponseOpSingle) Op() string
type Room ¶
type Room struct { Name string `json:"name,omitempty"` RequiredState []json.RawMessage `json:"required_state,omitempty"` Timeline []json.RawMessage `json:"timeline,omitempty"` InviteState []json.RawMessage `json:"invite_state,omitempty"` NotificationCount int64 `json:"notification_count"` HighlightCount int64 `json:"highlight_count"` Initial bool `json:"initial,omitempty"` IsDM bool `json:"is_dm,omitempty"` JoinedCount int `json:"joined_count,omitempty"` InvitedCount int `json:"invited_count,omitempty"` PrevBatch string `json:"prev_batch,omitempty"` }
type RoomConnMetadata ¶
type RoomConnMetadata struct { internal.RoomMetadata caches.UserRoomData }
type RoomDelta ¶ added in v0.4.1
type RoomDelta struct { RoomNameChanged bool JoinCountChanged bool InviteCountChanged bool Lists []RoomListDelta }
type RoomFinder ¶ added in v0.4.1
type RoomFinder interface {
Room(roomID string) *RoomConnMetadata
}
type RoomListDelta ¶ added in v0.4.1
type RoomSubscription ¶
type RoomSubscription struct { RequiredState [][2]string `json:"required_state"` TimelineLimit int64 `json:"timeline_limit"` }
func (RoomSubscription) Combine ¶ added in v0.2.0
func (rs RoomSubscription) Combine(other RoomSubscription) RoomSubscription
Combine this subcription with another, returning a union of both as a copy.
func (RoomSubscription) RequiredStateMap ¶ added in v0.2.0
func (rs RoomSubscription) RequiredStateMap() *internal.RequiredStateMap
Calculate the required state map for this room subscription. Given event types A,B,C and state keys 1,2,3, the following Venn diagrams are possible:
.---------[*,*]----------. | .---------. | | | A,2 | A,3 | | .----+--[B,*]--+-----. | | | | .-----. | | | | |B,1 | | B,2 | | B,3 | | | | | `[B,2]` | | | | `----+---------+-----` | | | C,2 | C,3 | | `--[*,2]--` | `------------------------`
The largest set will be used when returning the required state map. For example, [B,2] + [B,*] = [B,*] because [B,*] encompasses [B,2]. This means [*,*] encompasses everything.
type SliceRanges ¶
type SliceRanges [][2]int64
func (SliceRanges) ClosestInDirection ¶ added in v0.2.0
func (r SliceRanges) ClosestInDirection(i int64, towardsZero bool) (closestIndex int64)
ClosestInDirection returns the index position of a range bound that is closest to `i`, heading either towards 0 or towards infinity. If there is no range boundary in that direction, -1 is returned. For example:
[0,20] i=25,towardsZero=true => 20 [0,20] i=15,towardsZero=true => 0 [0,20] i=15,towardsZero=false => 20 [0,20] i=25,towardsZero=false => -1 [0,20],[40,60] i=25,towardsZero=true => 20 [0,20],[40,60] i=25,towardsZero=false => 40 [0,20],[40,60] i=40,towardsZero=true => 40 [20,40] i=40,towardsZero=true => 20
func (SliceRanges) Delta ¶
func (r SliceRanges) Delta(next SliceRanges) (added SliceRanges, removed SliceRanges, same SliceRanges)
Delta returns the ranges which are unchanged, added and removed. Intelligently handles overlaps.
func (SliceRanges) Inside ¶
func (r SliceRanges) Inside(i int64) ([2]int64, bool)
Inside returns true if i is inside the range
func (SliceRanges) SliceInto ¶
func (r SliceRanges) SliceInto(slice Subslicer) []Subslicer
Slice into this range, returning subslices of slice
func (SliceRanges) Valid ¶
func (r SliceRanges) Valid() bool
type SortableRooms ¶
type SortableRooms struct {
// contains filtered or unexported fields
}
SortableRooms represents a list of rooms which can be sorted and updated. Maintains mappings of room IDs to current index positions after sorting.
func NewSortableRooms ¶
func NewSortableRooms(finder RoomFinder, rooms []string) *SortableRooms
func (*SortableRooms) Add ¶
func (s *SortableRooms) Add(roomID string) bool
Add a room to the list. Returns true if the room was added.
func (*SortableRooms) Get ¶
func (s *SortableRooms) Get(index int) string
func (*SortableRooms) Len ¶
func (s *SortableRooms) Len() int64
func (*SortableRooms) Remove ¶
func (s *SortableRooms) Remove(roomID string) int
func (*SortableRooms) RoomIDs ¶
func (s *SortableRooms) RoomIDs() []string
func (*SortableRooms) Sort ¶
func (s *SortableRooms) Sort(sortBy []string) error
func (*SortableRooms) Subslice ¶
func (s *SortableRooms) Subslice(i, j int64) Subslicer