remotecluster

package
v6.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2022 License: AGPL-3.0, Apache-2.0 Imports: 22 Imported by: 4

Documentation

Index

Constants

View Source
const (
	SendChanBuffer                = 50
	RecvChanBuffer                = 50
	ResultsChanBuffer             = 50
	ResultQueueDrainTimeoutMillis = 10000
	MaxConcurrentSends            = 10
	SendMsgURL                    = "api/v4/remotecluster/msg"
	SendTimeout                   = time.Minute
	SendFileTimeout               = time.Minute * 5
	PingURL                       = "api/v4/remotecluster/ping"
	PingFreq                      = time.Minute
	PingTimeout                   = time.Second * 15
	ConfirmInviteURL              = "api/v4/remotecluster/confirm_invite"
	InvitationTopic               = "invitation"
	PingTopic                     = "ping"
	ResponseStatusOK              = model.StatusOk
	ResponseStatusFail            = model.StatusFail
	InviteExpiresAfter            = time.Hour * 48
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BufferFullError

type BufferFullError struct {
	// contains filtered or unexported fields
}

func NewBufferFullError

func NewBufferFullError(capacity int) BufferFullError

func (BufferFullError) Capacity

func (e BufferFullError) Capacity() int

func (BufferFullError) Error

func (e BufferFullError) Error() string

type ConnectionStateListener

type ConnectionStateListener func(rc *model.RemoteCluster, online bool)

ConnectionStateListener is used to listen to remote cluster connection state changes.

type ProfileImageProvider

type ProfileImageProvider interface {
	GetProfileImage(user *model.User) ([]byte, bool, *model.AppError)
}

type ReaderProvider

type ReaderProvider interface {
	FileReader(path string) (filestore.ReadCloseSeeker, *model.AppError)
}

type RemoteClusterServiceIFace

type RemoteClusterServiceIFace interface {
	Shutdown() error
	Start() error
	Active() bool
	AddTopicListener(topic string, listener TopicListener) string
	RemoveTopicListener(listenerId string)
	AddConnectionStateListener(listener ConnectionStateListener) string
	RemoveConnectionStateListener(listenerId string)
	SendMsg(ctx context.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, f SendMsgResultFunc) error
	SendFile(ctx context.Context, us *model.UploadSession, fi *model.FileInfo, rc *model.RemoteCluster, rp ReaderProvider, f SendFileResultFunc) error
	SendProfileImage(ctx context.Context, userID string, rc *model.RemoteCluster, provider ProfileImageProvider, f SendProfileImageResultFunc) error
	AcceptInvitation(invite *model.RemoteClusterInvite, name string, displayName string, creatorId string, teamId string, siteURL string) (*model.RemoteCluster, error)
	ReceiveIncomingMsg(rc *model.RemoteCluster, msg model.RemoteClusterMsg) Response
}

RemoteClusterServiceIFace is used to allow mocking where a remote cluster service is used (for testing). Unfortunately it lives here because the shared channel service, app layer, and server interface all need it. Putting it in app layer means shared channel service must import app package.

type Response

type Response struct {
	Status  string          `json:"status"`
	Err     string          `json:"err"`
	Payload json.RawMessage `json:"payload"`
}

Response represents the bytes replied from a remote server when a message is sent.

func (*Response) IsSuccess

func (r *Response) IsSuccess() bool

IsSuccess returns true if the response status indicates success.

func (*Response) SetPayload

func (r *Response) SetPayload(v interface{}) error

SetPayload serializes an arbitrary struct as a RawMessage.

type SendFileResultFunc

type SendFileResultFunc func(us *model.UploadSession, rc *model.RemoteCluster, resp *Response, err error)

type SendMsgResultFunc

type SendMsgResultFunc func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response, err error)

type SendProfileImageResultFunc

type SendProfileImageResultFunc func(userId string, rc *model.RemoteCluster, resp *Response, err error)

type ServerIface

type ServerIface interface {
	Config() *model.Config
	IsLeader() bool
	AddClusterLeaderChangedListener(listener func()) string
	RemoveClusterLeaderChangedListener(id string)
	GetStore() store.Store
	GetLogger() mlog.LoggerIFace
	GetMetrics() einterfaces.MetricsInterface
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service provides inter-cluster communication via topic based messages. In product these are called "Secured Connections".

func NewRemoteClusterService

func NewRemoteClusterService(server ServerIface) (*Service, error)

NewRemoteClusterService creates a RemoteClusterService instance. In product this is called a "Secured Connection".

func (*Service) AcceptInvitation

func (rcs *Service) AcceptInvitation(invite *model.RemoteClusterInvite, name string, displayName, creatorId string, teamId string, siteURL string) (*model.RemoteCluster, error)

AcceptInvitation is called when accepting an invitation to connect with a remote cluster.

func (*Service) Active

func (rcs *Service) Active() bool

Active returns true if this instance of the remote cluster service is active. The active instance is responsible for pinging and sending messages to remotes.

func (*Service) AddConnectionStateListener

func (rcs *Service) AddConnectionStateListener(listener ConnectionStateListener) string

func (*Service) AddTopicListener

func (rcs *Service) AddTopicListener(topic string, listener TopicListener) string

AddTopicListener registers a callback

func (*Service) BroadcastMsg

func (rcs *Service) BroadcastMsg(ctx context.Context, msg model.RemoteClusterMsg, f SendMsgResultFunc) error

BroadcastMsg asynchronously sends a message to all remote clusters interested in the message's topic.

`ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.

An optional callback can be provided that receives the success or fail result of sending to each remote cluster. Success or fail is regarding message delivery only. If a callback is provided it should return quickly.

func (*Service) ReceiveIncomingMsg

func (rcs *Service) ReceiveIncomingMsg(rc *model.RemoteCluster, msg model.RemoteClusterMsg) Response

ReceiveIncomingMsg is called by the Rest API layer, or websocket layer (future), when a Remote Cluster message is received. Here we route the message to any topic listeners. `rc` and `msg` cannot be nil.

func (*Service) RemoveConnectionStateListener

func (rcs *Service) RemoveConnectionStateListener(listenerId string)

func (*Service) RemoveTopicListener

func (rcs *Service) RemoveTopicListener(listenerId string)

func (*Service) SendFile

SendFile asynchronously sends a file to a remote cluster.

`ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a BufferFullError if the task cannot be enqueued before the timeout. A background context will block indefinitely.

Nil or error return indicates success or failure of task enqueue only.

An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the callback is regarding file delivery only. The `resp` contains the decoded bytes returned from the remote. If a callback is provided it should return quickly.

func (*Service) SendMsg

SendMsg asynchronously sends a message to a remote cluster.

`ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.

Nil or error return indicates success or failure of message enqueue only.

An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the callback is regarding response decoding only. The `resp` contains the decoded bytes returned from the remote. If a callback is provided it should return quickly.

func (*Service) SendProfileImage

func (rcs *Service) SendProfileImage(ctx context.Context, userID string, rc *model.RemoteCluster, provider ProfileImageProvider, f SendProfileImageResultFunc) error

SendProfileImage asynchronously sends a user's profile image to a remote cluster.

`ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a BufferFullError if the task cannot be enqueued before the timeout. A background context will block indefinitely.

Nil or error return indicates success or failure of task enqueue only.

An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the callback is regarding image delivery only. The `resp` contains the decoded bytes returned from the remote. If a callback is provided it should return quickly.

func (*Service) Shutdown

func (rcs *Service) Shutdown() error

Shutdown is called by the server on server shutdown.

func (*Service) Start

func (rcs *Service) Start() error

Start is called by the server on server start-up.

type TopicListener

type TopicListener func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) error

TopicListener is a callback signature used to listen for incoming messages for a specific topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL