queue

package
v0.0.0-...-6251ae7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2022 License: AGPL-3.0 Imports: 10 Imported by: 0

Documentation

Overview

Package queue provides client side queueing, HTTP middleware and gRPC interceptor for request submission to a count gRPC server.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CountAddQueue

type CountAddQueue struct {
	// contains filtered or unexported fields
}

CountAddQueue provides a countv1.CountService_AddClient stream queue, with automatic reconnect on errors.

func NewCountAddClient

func NewCountAddClient(ctx context.Context, cc *grpc.ClientConn, opts ...grpc.CallOption) (*CountAddQueue, error)

NewCountAddClient initiates a new CountServiceClient.Add stream on the ClientConn. The returned CountAddClient can be used to queue and send countv1.AddRequest messages. A seperate go routine is started for queue processing and automatic reconnection on failure.

The context needs to remain available for automatic reconnection. When the context is expired or canceled, automatic reconnection will fail. However, existing entries in the queue will still be processed, as long as the stream does not break.

func (*CountAddQueue) Close

func (c *CountAddQueue) Close()

Close the stream. Blocks untill the queue is emptied.

func (*CountAddQueue) Middleware

func (c *CountAddQueue) Middleware(next http.Handler) http.Handler

Middleware for net/http which queues request data. The middleware never blocks. If the queue is full, the request message is dropped instead. Dropped messages are reported on the logger in the request context, using the Warn loglevel.

Example
cc, err := grpc.DialContext(context.TODO(), "count.muhlemmer.com:443",
	grpc.WithTransportCredentials(insecure.NewCredentials()),
	grpc.WithBlock(),
)
if err != nil {
	panic(err)
}

q, err := NewCountAddClient(context.TODO(), cc)
if err != nil {
	panic(err)
}

s := &http.Server{
	Addr:    ":8080",
	Handler: q.Middleware(http.DefaultServeMux),
}
s.ListenAndServe()
Output:

func (*CountAddQueue) Queue

func (c *CountAddQueue) Queue(ctx context.Context, req *countv1.AddRequest)

Queue a AddRequest. Blocks if the queue is full untill space is available. The context is used for logging only. Dropped messages are reported on the logger in context, using the Warn loglevel.

func (*CountAddQueue) QueueOrDrop

func (c *CountAddQueue) QueueOrDrop(ctx context.Context, req *countv1.AddRequest)

QueueOrDrop a AddRequest. Req is dropped if the queue is full, so QueueOrDrop is always a non-blocking action. The context is used for logging only. Dropped messages are reported on the logger in context, using the Warn loglevel.

Example
cc, err := grpc.DialContext(context.TODO(), "count.muhlemmer.com:443",
	grpc.WithTransportCredentials(insecure.NewCredentials()),
	grpc.WithBlock(),
)
if err != nil {
	panic(err)
}

q, err := NewCountAddClient(context.TODO(), cc)
if err != nil {
	panic(err)
}

q.QueueOrDrop(context.TODO(), &countv1.AddRequest{
	Method:           countv1.Method_GET,
	Path:             "/foo/bar",
	RequestTimestamp: timestamppb.Now(),
})
Output:

func (*CountAddQueue) UnaryInterceptor

func (c *CountAddQueue) UnaryInterceptor() grpc.UnaryServerInterceptor

UnaryInterceptor for gRPC, which queues request data. The interceptor never blocks. If the queue is full, the request message is dropped instead. Dropped messages are reported on the logger in the request context, using the Warn loglevel.

Example
cc, err := grpc.DialContext(context.TODO(), "count.muhlemmer.com:443",
	grpc.WithTransportCredentials(insecure.NewCredentials()),
	grpc.WithBlock(),
)
if err != nil {
	panic(err)
}

q, err := NewCountAddClient(context.TODO(), cc)
if err != nil {
	panic(err)
}

grpc.NewServer(grpc.ChainUnaryInterceptor(
	q.UnaryInterceptor(),
))
Output:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL