Documentation ¶
Index ¶
- Constants
- Variables
- func NewConnection(address, token string, timeout time.Duration, disableTLS, noCtx bool) (*grpc.ClientConn, context.Context, error)
- func TestConnection(cfg *Config) error
- type Config
- type IRelayBackend
- type Relay
- func (r *Relay) CallWithRetry(ctx context.Context, method string, publish func(ctx context.Context) error) error
- func (r *Relay) Run(id int32, conn *grpc.ClientConn, outboundCtx, shutdownCtx context.Context)
- func (r *Relay) StartWorkers(shutdownCtx context.Context) error
- func (r *Relay) WaitForShutdown()
Constants ¶
const ( // DefaultNumWorkers is the number of goroutine relay workers to launch DefaultNumWorkers = 10 // QueueFlushInterval is how often to flush messages to GRPC collector if we don't reach the batch size QueueFlushInterval = 10 * time.Second // DefaultBatchSize is the number of messages to send to GRPC collector in each batch DefaultBatchSize = 100 // number of messages to batch // MaxGRPCRetries is the number of times we will attempt a GRPC call before giving up MaxGRPCRetries = 5 // MaxGRPCMessageSize is the maximum message size for GRPC client in bytes MaxGRPCMessageSize = 1024 * 1024 * 100 // 100MB // GRPCRetrySleep determines how long we sleep between GRPC call retries GRPCRetrySleep = time.Second * 5 )
Variables ¶
var ( ErrNSQMissingChannel = errors.New("NSQ channel cannot be empty") ErrNSQMissingTopic = errors.New("NSQ topic cannot be empty") )
var ( ErrMissingConfig = errors.New("Relay config cannot be nil") ErrMissingToken = errors.New("Token cannot be empty") ErrMissingServiceShutdownCtx = errors.New("ServiceShutdownCtx cannot be nil") ErrMissingGRPCAddress = errors.New("GRPCAddress cannot be empty") ErrMissingRelayCh = errors.New("RelayCh cannot be nil") ErrMissingMessage = errors.New("msg cannot be nil") ErrMissingMessageValue = errors.New("msg.Value cannot be nil") ErrMissingMessageOptions = errors.New("msg.Options cannot be nil") )
Functions ¶
func NewConnection ¶
func TestConnection ¶
Types ¶
type Config ¶
type Config struct { Token string GRPCAddress string NumWorkers int32 BatchSize int32 RelayCh chan interface{} ErrorCh chan *records.ErrorRecord DisableTLS bool Timeout time.Duration // general grpc timeout (used for all grpc calls) Type string ServiceShutdownCtx context.Context MainShutdownFunc context.CancelFunc MainShutdownCtx context.Context }
type IRelayBackend ¶ added in v0.29.0
type IRelayBackend interface {
Relay() error
}
type Relay ¶
type Relay struct { *Config Workers map[int32]struct{} WorkersMutex *sync.RWMutex // contains filtered or unexported fields }
func (*Relay) CallWithRetry ¶ added in v0.16.0
func (r *Relay) CallWithRetry(ctx context.Context, method string, publish func(ctx context.Context) error) error
CallWithRetry will retry a GRPC call until it succeeds or reaches a maximum number of retries defined by MaxGRPCRetries
func (*Relay) Run ¶
Run is a GRPC worker that runs as a goroutine. outboundCtx is used for sending GRPC requests as it will contain metadata, specifically "batch-token". shutdownCtx is passed from the main plumber app to shut down workers TODO: This should also read from errorCh
func (*Relay) WaitForShutdown ¶ added in v0.29.0
func (r *Relay) WaitForShutdown()
WaitForShutdown will wait for service shutdown context to be canceled. It will then start constantly polling the workers map until it is empty. When the workers map is empty, MainShutdownFunc() is called allowing the application to exit gracefully and ensuring we have sent all relay messages to the grpc-collector