Documentation ¶
Index ¶
- Constants
- type BufferFullError
- type ConnectionStateListener
- type ReaderProvider
- type RemoteClusterServiceIFace
- type Response
- type SendFileResultFunc
- type SendMsgResultFunc
- type ServerIface
- type Service
- func (rcs *Service) AcceptInvitation(invite *model.RemoteClusterInvite, name string, displayName, creatorId string, ...) (*model.RemoteCluster, error)
- func (rcs *Service) Active() bool
- func (rcs *Service) AddConnectionStateListener(listener ConnectionStateListener) string
- func (rcs *Service) AddTopicListener(topic string, listener TopicListener) string
- func (rcs *Service) BroadcastMsg(ctx context.Context, msg model.RemoteClusterMsg, f SendMsgResultFunc) error
- func (rcs *Service) ReceiveIncomingMsg(rc *model.RemoteCluster, msg model.RemoteClusterMsg) Response
- func (rcs *Service) RemoveConnectionStateListener(listenerId string)
- func (rcs *Service) RemoveTopicListener(listenerId string)
- func (rcs *Service) SendFile(ctx context.Context, us *model.UploadSession, fi *model.FileInfo, ...) error
- func (rcs *Service) SendMsg(ctx context.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, ...) error
- func (rcs *Service) Shutdown() error
- func (rcs *Service) Start() error
- type TopicListener
Constants ¶
const ( SendChanBuffer = 50 RecvChanBuffer = 50 ResultsChanBuffer = 50 ResultQueueDrainTimeoutMillis = 10000 MaxConcurrentSends = 10 SendMsgURL = "api/v4/remotecluster/msg" SendTimeout = time.Minute SendFileTimeout = time.Minute * 5 PingURL = "api/v4/remotecluster/ping" PingFreq = time.Minute PingTimeout = time.Second * 15 ConfirmInviteURL = "api/v4/remotecluster/confirm_invite" InvitationTopic = "invitation" PingTopic = "ping" ResponseStatusOK = model.STATUS_OK ResponseStatusFail = model.STATUS_FAIL InviteExpiresAfter = time.Hour * 48 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BufferFullError ¶
type BufferFullError struct {
// contains filtered or unexported fields
}
func NewBufferFullError ¶
func NewBufferFullError(capacity int) BufferFullError
func (BufferFullError) Capacity ¶
func (e BufferFullError) Capacity() int
func (BufferFullError) Error ¶
func (e BufferFullError) Error() string
type ConnectionStateListener ¶
type ConnectionStateListener func(rc *model.RemoteCluster, online bool)
ConnectionStateListener is used to listen to remote cluster connection state changes.
type ReaderProvider ¶
type ReaderProvider interface {
FileReader(path string) (filestore.ReadCloseSeeker, *model.AppError)
}
type RemoteClusterServiceIFace ¶
type RemoteClusterServiceIFace interface { Shutdown() error Start() error Active() bool AddTopicListener(topic string, listener TopicListener) string RemoveTopicListener(listenerId string) AddConnectionStateListener(listener ConnectionStateListener) string RemoveConnectionStateListener(listenerId string) SendMsg(ctx context.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, f SendMsgResultFunc) error SendFile(ctx context.Context, us *model.UploadSession, fi *model.FileInfo, rc *model.RemoteCluster, rp ReaderProvider, f SendFileResultFunc) error AcceptInvitation(invite *model.RemoteClusterInvite, name string, displayName string, creatorId string, teamId string, siteURL string) (*model.RemoteCluster, error) ReceiveIncomingMsg(rc *model.RemoteCluster, msg model.RemoteClusterMsg) Response }
RemoteClusterServiceIFace is used to allow mocking where a remote cluster service is used (for testing). Unfortunately it lives here because the shared channel service, app layer, and server interface all need it. Putting it in app layer means shared channel service must import app package.
type Response ¶
type Response struct { Status string `json:"status"` Err string `json:"err"` Payload json.RawMessage `json:"payload"` }
Response represents the bytes replied from a remote server when a message is sent.
func (*Response) SetPayload ¶
SetPayload serializes an arbitrary struct as a RawMessage.
type SendFileResultFunc ¶
type SendFileResultFunc func(us *model.UploadSession, rc *model.RemoteCluster, resp *Response, err error)
type SendMsgResultFunc ¶
type SendMsgResultFunc func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response, err error)
type ServerIface ¶
type ServerIface interface { Config() *model.Config IsLeader() bool AddClusterLeaderChangedListener(listener func()) string RemoveClusterLeaderChangedListener(id string) GetStore() store.Store GetLogger() mlog.LoggerIFace GetMetrics() einterfaces.MetricsInterface }
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service provides inter-cluster communication via topic based messages.
func NewRemoteClusterService ¶
func NewRemoteClusterService(server ServerIface) (*Service, error)
NewRemoteClusterService creates a RemoteClusterService instance.
func (*Service) AcceptInvitation ¶
func (rcs *Service) AcceptInvitation(invite *model.RemoteClusterInvite, name string, displayName, creatorId string, teamId string, siteURL string) (*model.RemoteCluster, error)
AcceptInvitation is called when accepting an invitation to connect with a remote cluster.
func (*Service) Active ¶
Active returns true if this instance of the remote cluster service is active. The active instance is responsible for pinging and sending messages to remotes.
func (*Service) AddConnectionStateListener ¶
func (rcs *Service) AddConnectionStateListener(listener ConnectionStateListener) string
func (*Service) AddTopicListener ¶
func (rcs *Service) AddTopicListener(topic string, listener TopicListener) string
AddTopicListener registers a callback
func (*Service) BroadcastMsg ¶
func (rcs *Service) BroadcastMsg(ctx context.Context, msg model.RemoteClusterMsg, f SendMsgResultFunc) error
BroadcastMsg asynchronously sends a message to all remote clusters interested in the message's topic.
`ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.
An optional callback can be provided that receives the success or fail result of sending to each remote cluster. Success or fail is regarding message delivery only. If a callback is provided it should return quickly.
func (*Service) ReceiveIncomingMsg ¶
func (rcs *Service) ReceiveIncomingMsg(rc *model.RemoteCluster, msg model.RemoteClusterMsg) Response
ReceiveIncomingMsg is called by the Rest API layer, or websocket layer (future), when a Remote Cluster message is received. Here we route the message to any topic listeners. `rc` and `msg` cannot be nil.
func (*Service) RemoveConnectionStateListener ¶
func (*Service) RemoveTopicListener ¶
func (*Service) SendFile ¶
func (rcs *Service) SendFile(ctx context.Context, us *model.UploadSession, fi *model.FileInfo, rc *model.RemoteCluster, rp ReaderProvider, f SendFileResultFunc) error
SendFile asynchronously sends a file to a remote cluster.
`ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a BufferFullError if the file cannot be enqueued before the timeout. A background context will block indefinitely.
Nil or error return indicates success or failure of file enqueue only.
An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the callback is regarding file delivery only. The `resp` contains the decoded bytes returned from the remote. If a callback is provided it should return quickly.
func (*Service) SendMsg ¶
func (rcs *Service) SendMsg(ctx context.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, f SendMsgResultFunc) error
SendMsg asynchronously sends a message to a remote cluster.
`ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.
Nil or error return indicates success or failure of message enqueue only.
An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the callback is regarding response decoding only. The `resp` contains the decoded bytes returned from the remote. If a callback is provided it should return quickly.
type TopicListener ¶
type TopicListener func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) error
TopicListener is a callback signature used to listen for incoming messages for a specific topic.