Documentation ¶
Index ¶
- Constants
- type Notifier
- func (n *Notifier) CurrentPosition() types.StreamingToken
- func (n *Notifier) GetListener(req syncRequest) UserDeviceStreamListener
- func (n *Notifier) Load(ctx context.Context, db storage.Database) error
- func (n *Notifier) OnNewEvent(ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string, ...)
- func (n *Notifier) OnNewKeyChange(posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string)
- func (n *Notifier) OnNewPeek(roomID, userID, deviceID string)
- func (n *Notifier) OnNewSendToDevice(userID string, deviceIDs []string, posUpdate types.StreamingToken)
- func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice)
- type RequestPool
- type UserDeviceStream
- type UserDeviceStreamListener
Constants ¶
const DefaultTimelineLimit = 20
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Notifier ¶
type Notifier struct {
// contains filtered or unexported fields
}
Notifier will wake up sleeping requests when there is some new data. It does not tell requests what that data is, only the sync position which they can use to get at it. This is done to prevent races whereby we tell the caller the event, but the token has already advanced by the time they fetch it, resulting in missed events.
func NewNotifier ¶
func NewNotifier(pos types.StreamingToken) *Notifier
NewNotifier creates a new notifier set to the given sync position. In order for this to be of any use, the Notifier needs to be told all rooms and the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
func (*Notifier) CurrentPosition ¶
func (n *Notifier) CurrentPosition() types.StreamingToken
CurrentPosition returns the current sync position
func (*Notifier) GetListener ¶
func (n *Notifier) GetListener(req syncRequest) UserDeviceStreamListener
GetListener returns a UserStreamListener that can be used to wait for updates for a user. Must be closed. notify for anything before sincePos
func (*Notifier) OnNewEvent ¶
func (n *Notifier) OnNewEvent( ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string, posUpdate types.StreamingToken, )
OnNewEvent is called when a new event is received from the room server. Must only be called from a single goroutine, to avoid races between updates which could set the current sync position incorrectly. Chooses which user sync streams to update by a provided *gomatrixserverlib.Event (based on the users in the event's room), a roomID directly, or a list of user IDs, prioritised by parameter ordering. posUpdate contains the latest position(s) for one or more types of events. If a position in posUpdate is 0, it means no updates are available of that type. Typically a consumer supplies a posUpdate with the latest sync position for the event type it handles, leaving other fields as 0.
func (*Notifier) OnNewKeyChange ¶
func (n *Notifier) OnNewKeyChange( posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string, )
func (*Notifier) OnNewSendToDevice ¶
func (n *Notifier) OnNewSendToDevice( userID string, deviceIDs []string, posUpdate types.StreamingToken, )
func (*Notifier) PeekingDevices ¶
func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice)
Not thread-safe: must be called on the OnNewEvent goroutine only
type RequestPool ¶
type RequestPool struct {
// contains filtered or unexported fields
}
RequestPool manages HTTP long-poll connections for /sync
func NewRequestPool ¶
func NewRequestPool( db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) *RequestPool
NewRequestPool makes a new RequestPool
func (*RequestPool) OnIncomingKeyChangeRequest ¶
func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse
func (*RequestPool) OnIncomingSyncRequest ¶
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.Device) util.JSONResponse
OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be called in a dedicated goroutine for this request. This function will block the goroutine until a response is ready, or it times out.
type UserDeviceStream ¶
type UserDeviceStream struct { UserID string DeviceID string // contains filtered or unexported fields }
UserDeviceStream represents a communication mechanism between the /sync request goroutine and the underlying sync server goroutines. Goroutines can get a UserStreamListener to wait for updates, and can Broadcast() updates.
func NewUserDeviceStream ¶
func NewUserDeviceStream(userID, deviceID string, currPos types.StreamingToken) *UserDeviceStream
NewUserDeviceStream creates a new user stream
func (*UserDeviceStream) Broadcast ¶
func (s *UserDeviceStream) Broadcast(pos types.StreamingToken)
Broadcast a new sync position for this user.
func (*UserDeviceStream) GetListener ¶
func (s *UserDeviceStream) GetListener(ctx context.Context) UserDeviceStreamListener
GetListener returns UserStreamListener that a sync request can use to wait for new updates with. UserStreamListener must be closed
func (*UserDeviceStream) NumWaiting ¶
func (s *UserDeviceStream) NumWaiting() uint
NumWaiting returns the number of goroutines waiting for waiting for updates. Used for metrics and testing.
func (*UserDeviceStream) TimeOfLastNonEmpty ¶
func (s *UserDeviceStream) TimeOfLastNonEmpty() time.Time
TimeOfLastNonEmpty returns the last time that the number of waiting listeners was non-empty, may be time.Now() if number of waiting listeners is currently non-empty.
type UserDeviceStreamListener ¶
type UserDeviceStreamListener struct {
// contains filtered or unexported fields
}
UserDeviceStreamListener allows a sync request to wait for updates for a user.
func (*UserDeviceStreamListener) Close ¶
func (s *UserDeviceStreamListener) Close()
Close cleans up resources used
func (*UserDeviceStreamListener) GetNotifyChannel ¶
func (s *UserDeviceStreamListener) GetNotifyChannel(sincePos types.StreamingToken) <-chan struct{}
GetNotifyChannel returns a channel that is closed when there may be an update for the user. sincePos specifies from which point we want to be notified about. If there has already been an update after sincePos we'll return a closed channel immediately.
func (*UserDeviceStreamListener) GetSyncPosition ¶
func (s *UserDeviceStreamListener) GetSyncPosition() types.StreamingToken
GetSyncPosition returns last sync position which the UserStream was notified about