subscription

package
v2.0.0-...-7724a3b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2024 License: MIT Imports: 13 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultKeepAliveInterval          = "15s"
	DefaultSubscriptionUpdateInterval = "1s"
	DefaultReadErrorTimeOut           = "5s"
	DefaultSubscriptionExecutionTries = 5
)
View Source
const (
	MessageTypeConnectionInit      = "connection_init"
	MessageTypeConnectionAck       = "connection_ack"
	MessageTypeConnectionError     = "connection_error"
	MessageTypeConnectionTerminate = "connection_terminate"
	MessageTypeConnectionKeepAlive = "ka"
	MessageTypeStart               = "start"
	MessageTypeStop                = "stop"
	MessageTypeData                = "data"
	MessageTypeError               = "error"
	MessageTypeComplete            = "complete"
)

Variables

View Source
var ErrCouldNotReadMessageFromClient = errors.New("could not read message from client")
View Source
var (
	ErrSubscriberIDAlreadyExists = errors.New("subscriber id already exists")
)
View Source
var ErrTransportClientClosedConnection = errors.New("transport client has a closed connection")

ErrTransportClientClosedConnection is an error to indicate that the transport client is using closed connection.

Functions

func TimeOutChecker

func TimeOutChecker(params TimeOutParams)

TimeOutChecker is a function that can be used in a go routine to perform a time-out action after a specific duration or prevent the time-out action by canceling the time-out context before. Use TimeOutParams for configuration.

Types

type Client deprecated

type Client interface {
	// ReadFromClient will invoke a read operation from the client connection.
	ReadFromClient() (*Message, error)
	// WriteToClient will invoke a write operation to the client connection.
	WriteToClient(Message) error
	// IsConnected will indicate if a connection is still established.
	IsConnected() bool
	// Disconnect will close the connection between server and client.
	Disconnect() error
}

Client provides an interface which can be implemented by any possible subscription client like websockets, mqtt, etc.

Deprecated: Use TransportClient instead.

type Engine

type Engine interface {
	StartOperation(ctx context.Context, id string, payload []byte, eventHandler EventHandler) error
	StopSubscription(id string, eventHandler EventHandler) error
	TerminateAllSubscriptions(eventHandler EventHandler) error
}

Engine defines the function for a subscription engine.

type ErrorTimeoutExecutingSubscription

type ErrorTimeoutExecutingSubscription struct {
	// contains filtered or unexported fields
}

func (*ErrorTimeoutExecutingSubscription) Error

func (*ErrorTimeoutExecutingSubscription) Unwrap

type EventHandler

type EventHandler interface {
	Emit(eventType EventType, id string, data []byte, err error)
}

EventHandler is an interface that handles subscription events.

type EventType

type EventType int

EventType can be used to define subscription events decoupled from any protocols.

const (
	EventTypeOnError EventType = iota
	EventTypeOnSubscriptionData
	EventTypeOnSubscriptionCompleted
	EventTypeOnNonSubscriptionExecutionResult
	EventTypeOnConnectionTerminatedByClient
	EventTypeOnConnectionTerminatedByServer
	EventTypeOnConnectionError
	EventTypeOnConnectionOpened
	EventTypeOnDuplicatedSubscriberID
)

type Executor

type Executor interface {
	Execute(writer resolve.SubscriptionResponseWriter) error
	OperationType() ast.OperationType
	SetContext(context context.Context)
	Reset()
}

Executor is an abstraction for executing a GraphQL engine

type ExecutorEngine

type ExecutorEngine struct {
	// contains filtered or unexported fields
}

ExecutorEngine is an implementation of Engine and works with subscription.Executor.

func (*ExecutorEngine) StartOperation

func (e *ExecutorEngine) StartOperation(ctx context.Context, id string, payload []byte, eventHandler EventHandler) error

StartOperation will start any operation.

func (*ExecutorEngine) StopSubscription

func (e *ExecutorEngine) StopSubscription(id string, eventHandler EventHandler) error

StopSubscription will stop an active subscription.

func (*ExecutorEngine) TerminateAllSubscriptions

func (e *ExecutorEngine) TerminateAllSubscriptions(eventHandler EventHandler) error

TerminateAllSubscriptions will cancel all active subscriptions.

type ExecutorPool

type ExecutorPool interface {
	Get(payload []byte) (Executor, error)
	Put(executor Executor) error
}

ExecutorPool is an abstraction for creating executors

type ExecutorV2

type ExecutorV2 struct {
	// contains filtered or unexported fields
}

func (*ExecutorV2) Execute

func (*ExecutorV2) OperationType

func (e *ExecutorV2) OperationType() ast.OperationType

func (*ExecutorV2) Reset

func (e *ExecutorV2) Reset()

func (*ExecutorV2) SetContext

func (e *ExecutorV2) SetContext(context context.Context)

type ExecutorV2Pool

type ExecutorV2Pool struct {
	// contains filtered or unexported fields
}

ExecutorV2Pool - provides reusable executors

func NewExecutorV2Pool

func NewExecutorV2Pool(engine *graphql.ExecutionEngineV2, connectionInitReqCtx context.Context, options ...ExecutorV2PoolOptionFunc) *ExecutorV2Pool

func (*ExecutorV2Pool) Get

func (e *ExecutorV2Pool) Get(payload []byte) (Executor, error)

func (*ExecutorV2Pool) Put

func (e *ExecutorV2Pool) Put(executor Executor) error

type ExecutorV2PoolOptionFunc

type ExecutorV2PoolOptionFunc func(options *ExecutorV2PoolOptions)

func WithExecutorV2HeaderModifier

func WithExecutorV2HeaderModifier(headerModifier postprocess.HeaderModifier) ExecutorV2PoolOptionFunc

type ExecutorV2PoolOptions

type ExecutorV2PoolOptions struct {
	HeaderModifier postprocess.HeaderModifier
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler is the actual subscription handler which will keep track on how to handle messages coming from the client.

func NewHandler

func NewHandler(logger abstractlogger.Logger, client Client, executorPool ExecutorPool) (*Handler, error)

NewHandler creates a new subscription handler.

func NewHandlerWithInitFunc

func NewHandlerWithInitFunc(
	logger abstractlogger.Logger,
	client Client,
	executorPool ExecutorPool,
	initFunc WebsocketInitFunc,
) (*Handler, error)

func (*Handler) ActiveSubscriptions

func (h *Handler) ActiveSubscriptions() int

ActiveSubscriptions will return the actual number of active subscriptions for that client.

func (*Handler) ChangeKeepAliveInterval

func (h *Handler) ChangeKeepAliveInterval(d time.Duration)

ChangeKeepAliveInterval can be used to change the keep alive interval.

func (*Handler) ChangeSubscriptionUpdateInterval

func (h *Handler) ChangeSubscriptionUpdateInterval(d time.Duration)

ChangeSubscriptionUpdateInterval can be used to change the update interval.

func (*Handler) Handle

func (h *Handler) Handle(ctx context.Context)

Handle will handle the subscription connection.

type InitPayload

type InitPayload json.RawMessage

InitPayload is a structure that is parsed from the websocket init message payload. Deprecated: Use websocket.InitPayload instead.

func (InitPayload) Authorization

func (p InitPayload) Authorization() string

Authorization is a shorthand for getting the Authorization header from the payload.

func (InitPayload) GetString

func (p InitPayload) GetString(key string) string

GetString safely gets a string value from the payload. It returns an empty string if the payload is nil or the value isn't set.

type InitialHttpRequestContext

type InitialHttpRequestContext struct {
	context.Context
	Request *http.Request
}

func NewInitialHttpRequestContext

func NewInitialHttpRequestContext(r *http.Request) *InitialHttpRequestContext

type Message deprecated

type Message struct {
	Id      string          `json:"id"`
	Type    string          `json:"type"`
	Payload json.RawMessage `json:"payload"`
}

Message defines the actual subscription message which will be passed from client to server and vice versa.

Deprecated: Prefer using TransportClient that is based on byte slices instead of this Message struct.

type Protocol

type Protocol interface {
	Handle(ctx context.Context, engine Engine, message []byte) error
	EventHandler() EventHandler
}

Protocol defines an interface for a subscription protocol decoupled from the underlying transport.

type TimeOutParams

type TimeOutParams struct {
	Name           string
	Logger         abstractlogger.Logger
	TimeOutContext context.Context
	TimeOutAction  func()
	Timer          *time.Timer
}

TimeOutParams is a struct to configure a TimeOutChecker.

type TransportClient

type TransportClient interface {
	// ReadBytesFromClient will invoke a read operation from the client connection and return a byte slice.
	// This function should return ErrTransportClientClosedConnection when reading on a closed connection.
	ReadBytesFromClient() ([]byte, error)
	// WriteBytesToClient will invoke a write operation to the client connection using a byte slice.
	// This function should return ErrTransportClientClosedConnection when writing on a closed connection.
	WriteBytesToClient([]byte) error
	// IsConnected will indicate if a connection is still established.
	IsConnected() bool
	// Disconnect will close the connection between server and client.
	Disconnect() error
	// DisconnectWithReason will close the connection but is also able to process a reason for closure.
	DisconnectWithReason(reason interface{}) error
}

TransportClient provides an interface that can be implemented by any possible subscription client like websockets, mqtt, etc. It operates with raw byte slices.

type UniversalProtocolHandler

type UniversalProtocolHandler struct {
	// contains filtered or unexported fields
}

UniversalProtocolHandler can handle any protocol by using the Protocol interface.

func NewUniversalProtocolHandler

func NewUniversalProtocolHandler(client TransportClient, protocol Protocol, executorPool ExecutorPool) (*UniversalProtocolHandler, error)

NewUniversalProtocolHandler creates a new UniversalProtocolHandler.

func NewUniversalProtocolHandlerWithOptions

func NewUniversalProtocolHandlerWithOptions(client TransportClient, protocol Protocol, executorPool ExecutorPool, options UniversalProtocolHandlerOptions) (*UniversalProtocolHandler, error)

NewUniversalProtocolHandlerWithOptions creates a new UniversalProtocolHandler. It requires an option struct.

func (*UniversalProtocolHandler) Handle

func (u *UniversalProtocolHandler) Handle(ctx context.Context)

Handle will handle the subscription logic and forward messages to the actual protocol handler.

type UniversalProtocolHandlerOptions

type UniversalProtocolHandlerOptions struct {
	Logger                           abstractlogger.Logger
	CustomSubscriptionUpdateInterval time.Duration
	CustomReadErrorTimeOut           time.Duration
	CustomSubscriptionExecutionTries int
	CustomEngine                     Engine
}

UniversalProtocolHandlerOptions is struct that defines options for the UniversalProtocolHandler.

type WebsocketInitFunc

type WebsocketInitFunc func(ctx context.Context, initPayload InitPayload) (context.Context, error)

WebsocketInitFunc is called when the server receives connection init message from the client. This can be used to check initial payload to see whether to accept the websocket connection. Deprecated: Use websocket.InitFunc instead.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL