sync3

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2022 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	OpSync       = "SYNC"
	OpInvalidate = "INVALIDATE"
	OpInsert     = "INSERT"
	OpDelete     = "DELETE"
	OpUpdate     = "UPDATE"
)
View Source
const DispatcherAllUsers = "-"

Variables

View Source
var (
	SortByName              = "by_name"
	SortByRecency           = "by_recency"
	SortByNotificationCount = "by_notification_count"
	SortByHighlightCount    = "by_highlight_count"
	SortBy                  = []string{SortByHighlightCount, SortByName, SortByNotificationCount, SortByRecency}

	DefaultTimelineLimit = int64(20)
	DefaultTimeoutMSecs  = 10 * 1000 // 10s
)

Functions

func ChangedFilters

func ChangedFilters(prev, next *RequestFilters) bool

func IncludedRoomIDsInOps

func IncludedRoomIDsInOps(ops []ResponseOp) map[string]struct{}

Return which room IDs these set of operations are returning information on. Information means things like SYNC/INSERT/UPDATE, and not DELETE/INVALIDATE.

Types

type Conn

type Conn struct {
	ConnID ConnID
	// contains filtered or unexported fields
}

Conn is an abstraction of a long-poll connection. It automatically handles the position values of the /sync request, including sending cached data in the event of retries. It does not handle the contents of the data at all.

func NewConn

func NewConn(connID ConnID, h ConnHandler) *Conn

func (*Conn) Alive

func (c *Conn) Alive() bool

func (*Conn) OnIncomingRequest

func (c *Conn) OnIncomingRequest(ctx context.Context, req *Request) (resp *Response, herr *internal.HandlerError)

OnIncomingRequest advances the clients position in the stream, returning the response position and data.

type ConnHandler

type ConnHandler interface {
	// Callback which is allowed to block as long as the context is active. Return the response
	// to send back or an error. Errors of type *internal.HandlerError are inspected for the correct
	// status code to send back.
	OnIncomingRequest(ctx context.Context, cid ConnID, req *Request, isInitial bool) (*Response, error)
	UserID() string
	Destroy()
	Alive() bool
}

type ConnID

type ConnID struct {
	DeviceID string
}

func (*ConnID) String

func (c *ConnID) String() string

type ConnMap

type ConnMap struct {
	// contains filtered or unexported fields
}

ConnMap stores a collection of Conns.

func NewConnMap

func NewConnMap() *ConnMap

func (*ConnMap) CloseConn

func (m *ConnMap) CloseConn(connID ConnID)

func (*ConnMap) Conn

func (m *ConnMap) Conn(cid ConnID) *Conn

Conn returns a connection with this ConnID. Returns nil if no connection exists.

func (*ConnMap) CreateConn

func (m *ConnMap) CreateConn(cid ConnID, newConnHandler func() ConnHandler) (*Conn, bool)

Atomically gets or creates a connection with this connection ID. Calls newConn if a new connection is required.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatches live events to caches

func NewDispatcher

func NewDispatcher() *Dispatcher

func (*Dispatcher) IsUserJoined

func (d *Dispatcher) IsUserJoined(userID, roomID string) bool

func (*Dispatcher) OnNewEvents

func (d *Dispatcher) OnNewEvents(
	roomID string, events []json.RawMessage, latestPos int64,
)

Called by v2 pollers when we receive new events

func (*Dispatcher) Register

func (d *Dispatcher) Register(userID string, r Receiver)

func (*Dispatcher) Startup

func (d *Dispatcher) Startup(roomToJoinedUsers map[string][]string) error

Load joined members into the dispatcher. MUST BE CALLED BEFORE V2 POLL LOOPS START.

func (*Dispatcher) Unregister

func (d *Dispatcher) Unregister(userID string)

type FilteredSortableRooms

type FilteredSortableRooms struct {
	*SortableRooms
	// contains filtered or unexported fields
}

FilteredSortableRooms is SortableRooms but where rooms are filtered before being added to the list. Updates to room metadata may result in rooms being added/removed.

func NewFilteredSortableRooms

func NewFilteredSortableRooms(rooms []RoomConnMetadata, filter *RequestFilters) *FilteredSortableRooms

func (*FilteredSortableRooms) Add

func (*FilteredSortableRooms) UpdateGlobalRoomMetadata

func (f *FilteredSortableRooms) UpdateGlobalRoomMetadata(r *internal.RoomMetadata) int

func (*FilteredSortableRooms) UpdateUserRoomMetadata

func (f *FilteredSortableRooms) UpdateUserRoomMetadata(roomID string, userEvent *caches.UserRoomData) int

type JoinedRoomsTracker

type JoinedRoomsTracker struct {
	// contains filtered or unexported fields
}

Tracks who is joined to which rooms. This is critical from a security perspective in order to ensure that only the users joined to the room receive events in that room. Consider the situation where Alice and Bob are joined to room X. If Alice gets kicked from X, the proxy server will still receive messages for room X due to Bob being joined to the room. We therefore need to decide which active connections should be pushed events, which is what this tracker does.

func NewJoinedRoomsTracker

func NewJoinedRoomsTracker() *JoinedRoomsTracker

func (*JoinedRoomsTracker) IsUserJoined

func (t *JoinedRoomsTracker) IsUserJoined(userID, roomID string) bool

func (*JoinedRoomsTracker) JoinedRoomsForUser

func (t *JoinedRoomsTracker) JoinedRoomsForUser(userID string) []string

func (*JoinedRoomsTracker) JoinedUsersForRoom

func (t *JoinedRoomsTracker) JoinedUsersForRoom(roomID string) []string

func (*JoinedRoomsTracker) UserJoinedRoom

func (t *JoinedRoomsTracker) UserJoinedRoom(userID, roomID string) bool

returns true if the state changed

func (*JoinedRoomsTracker) UserLeftRoom

func (t *JoinedRoomsTracker) UserLeftRoom(userID, roomID string)

type Receiver

type Receiver interface {
	OnNewEvent(event *caches.EventData)
}

type Request

type Request struct {
	Lists             []RequestList               `json:"lists"`
	RoomSubscriptions map[string]RoomSubscription `json:"room_subscriptions"`
	UnsubscribeRooms  []string                    `json:"unsubscribe_rooms"`
	Extensions        extensions.Request          `json:"extensions"`
	// contains filtered or unexported fields
}

func (*Request) ApplyDelta

func (r *Request) ApplyDelta(nextReq *Request) (result *Request, subs, unsubs []string)

Apply this delta on top of the request. Returns a new Request with the combined output, ready for persisting into the database. Also returns the DELTA for rooms to subscribe and unsubscribe from.

func (*Request) GetRequiredStateForList added in v0.1.4

func (r *Request) GetRequiredStateForList(listIndex int) [][2]string

func (*Request) GetRequiredStateForRoom added in v0.1.4

func (r *Request) GetRequiredStateForRoom(roomID string) [][2]string

func (*Request) GetTimelineLimit

func (r *Request) GetTimelineLimit(listIndex int, roomID string) int64

func (*Request) Same

func (r *Request) Same(other *Request) bool

func (*Request) SetPos

func (r *Request) SetPos(pos int64)

func (*Request) SetTimeoutMSecs

func (r *Request) SetTimeoutMSecs(timeout int)

func (*Request) TimeoutMSecs

func (r *Request) TimeoutMSecs() int

type RequestFilters

type RequestFilters struct {
	Spaces         []string `json:"spaces"`
	IsDM           *bool    `json:"is_dm"`
	IsEncrypted    *bool    `json:"is_encrypted"`
	IsInvite       *bool    `json:"is_invite"`
	IsTombstoned   *bool    `json:"is_tombstoned"`
	RoomNameFilter string   `json:"room_name_like"`
}

func (*RequestFilters) Include

func (rf *RequestFilters) Include(r *RoomConnMetadata) bool

type RequestList

type RequestList struct {
	RoomSubscription
	Ranges  SliceRanges     `json:"ranges"`
	Sort    []string        `json:"sort"`
	Filters *RequestFilters `json:"filters"`
}

type Response

type Response struct {
	Ops []ResponseOp `json:"ops"`

	RoomSubscriptions map[string]Room `json:"room_subscriptions"`
	Counts            []int           `json:"counts"`

	Extensions extensions.Response `json:"extensions"`

	Pos     string `json:"pos"`
	Session string `json:"session_id,omitempty"`
}

func (*Response) PosInt

func (r *Response) PosInt() int64

func (*Response) UnmarshalJSON

func (r *Response) UnmarshalJSON(b []byte) error

Custom unmarshal so we can dynamically create the right ResponseOp for Ops

type ResponseOp

type ResponseOp interface {
	Op() string
	// which rooms are we giving data about
	IncludedRoomIDs() []string
}

type ResponseOpRange

type ResponseOpRange struct {
	Operation string  `json:"op"`
	List      int     `json:"list"`
	Range     []int64 `json:"range,omitempty"`
	Rooms     []Room  `json:"rooms,omitempty"`
}

func (*ResponseOpRange) IncludedRoomIDs

func (r *ResponseOpRange) IncludedRoomIDs() []string

func (*ResponseOpRange) Op

func (r *ResponseOpRange) Op() string

type ResponseOpSingle

type ResponseOpSingle struct {
	Operation string `json:"op"`
	List      int    `json:"list"`
	Index     *int   `json:"index,omitempty"` // 0 is a valid value, hence *int
	Room      *Room  `json:"room,omitempty"`
}

func (*ResponseOpSingle) IncludedRoomIDs

func (r *ResponseOpSingle) IncludedRoomIDs() []string

func (*ResponseOpSingle) Op

func (r *ResponseOpSingle) Op() string

type Room

type Room struct {
	RoomID            string            `json:"room_id,omitempty"`
	Name              string            `json:"name,omitempty"`
	RequiredState     []json.RawMessage `json:"required_state,omitempty"`
	Timeline          []json.RawMessage `json:"timeline,omitempty"`
	InviteState       []json.RawMessage `json:"invite_state,omitempty"`
	NotificationCount int64             `json:"notification_count"`
	HighlightCount    int64             `json:"highlight_count"`
	Initial           bool              `json:"initial,omitempty"`
	IsDM              bool              `json:"is_dm,omitempty"`
	PrevBatch         string            `json:"prev_batch,omitempty"`
}

type RoomConnMetadata

type RoomConnMetadata struct {
	internal.RoomMetadata
	caches.UserRoomData

	CanonicalisedName string // stripped leading symbols like #, all in lower case
}

type RoomSubscription

type RoomSubscription struct {
	RequiredState [][2]string `json:"required_state"`
	TimelineLimit int64       `json:"timeline_limit"`
}

type SliceRanges

type SliceRanges [][2]int64

func (SliceRanges) Delta

func (r SliceRanges) Delta(next SliceRanges) (added SliceRanges, removed SliceRanges, same SliceRanges)

Delta returns the ranges which are unchanged, added and removed. Intelligently handles overlaps.

func (SliceRanges) Inside

func (r SliceRanges) Inside(i int64) bool

Inside returns true if i is inside the range

func (SliceRanges) LowerClamp

func (r SliceRanges) LowerClamp(i int64) (clampIndex int64)

LowerClamp returns the end-index e.g [0,99] -> 99 of the first range lower than i. This is critical to determine which index to delete when rooms move outside of the tracked range. If `i` is inside a range, returns the clamp for the lower range. Returns -1 if a clamp cannot be found e.g [0,99] i=50 -> -1 whereas [0,99][150,199] i=160 -> 99

func (SliceRanges) SliceInto

func (r SliceRanges) SliceInto(slice Subslicer) []Subslicer

Slice into this range, returning subslices of slice

func (SliceRanges) UpperClamp

func (r SliceRanges) UpperClamp(i int64) (clampIndex int64)

UpperClamp returns the start-index e.g [50,99] -> 50 of the first range higher than i. If `i` is inside a range, returns -1. E.g [50,99] i=30 -> 50, [50,99] i=55 -> -1

func (SliceRanges) Valid

func (r SliceRanges) Valid() bool

type SortableRoomLists

type SortableRoomLists struct {
	// contains filtered or unexported fields
}

SortableRoomLists is a list of FilteredSortableRooms.

func (*SortableRoomLists) Counts

func (s *SortableRoomLists) Counts() []int

Counts returns the counts of all lists

func (*SortableRoomLists) ForEach

func (s *SortableRoomLists) ForEach(fn func(index int, fsr *FilteredSortableRooms))

func (*SortableRoomLists) List

func (*SortableRoomLists) ListExists

func (s *SortableRoomLists) ListExists(index int) bool

func (*SortableRoomLists) Set

func (s *SortableRoomLists) Set(index int, val *FilteredSortableRooms)

type SortableRooms

type SortableRooms struct {
	// contains filtered or unexported fields
}

SortableRooms represents a list of rooms which can be sorted and updated. Maintains mappings of room IDs to current index positions after sorting.

func NewSortableRooms

func NewSortableRooms(rooms []RoomConnMetadata) *SortableRooms

func (*SortableRooms) Add

Add a room to the list. Returns true if the room was added.

func (*SortableRooms) Get

func (s *SortableRooms) Get(index int) RoomConnMetadata

func (*SortableRooms) IndexOf

func (s *SortableRooms) IndexOf(roomID string) (int, bool)

func (*SortableRooms) Len

func (s *SortableRooms) Len() int64

func (*SortableRooms) Remove

func (s *SortableRooms) Remove(roomID string) int

func (*SortableRooms) RoomIDs

func (s *SortableRooms) RoomIDs() []string

func (*SortableRooms) Sort

func (s *SortableRooms) Sort(sortBy []string) error

func (*SortableRooms) Subslice

func (s *SortableRooms) Subslice(i, j int64) Subslicer

func (*SortableRooms) UpdateGlobalRoomMetadata

func (s *SortableRooms) UpdateGlobalRoomMetadata(roomMeta *internal.RoomMetadata) int

func (*SortableRooms) UpdateUserRoomMetadata

func (s *SortableRooms) UpdateUserRoomMetadata(roomID string, userEvent *caches.UserRoomData) int

type Subslicer

type Subslicer interface {
	Len() int64
	Subslice(i, j int64) Subslicer
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL