Documentation ¶
Index ¶
- Variables
- type ConnPool
- func (c *ConnPool) ClientConn() (*grpc.ClientConn, error)
- func (c *ConnPool) ConnectionPoolPipeline(conn *grpc.ClientConn, pipelineDoneChan chan interface{})
- func (c *ConnPool) EnqueConnBatch(connItems batch.BatchItems)
- func (c *ConnPool) GetConnBatch() batch.BatchItems
- func (c *ConnPool) GetConnPoolSize() uint64
- type ConnectionInterceptor
- type PoolConnOptions
- type PoolOptions
- func WithAddress(address string) PoolOptions
- func WithAuthority(authority string) PoolOptions
- func WithCodes(codes []codes.Code) PoolOptions
- func WithConnectionInterceptor(interceptor ConnectionInterceptor) PoolOptions
- func WithCredentials(credentials credentials.TransportCredentials) PoolOptions
- func WithInsecure(insecure bool) PoolOptions
- func WithRetry(retry int) PoolOptions
- func WithScheme(scheme string) PoolOptions
- type Queue
- type Semaphore
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultConnBatch : maximum number of connection instances stored for a single batch DefaultConnBatch uint64 = 20 // DefaultMaxPoolSize : the total size of the connection pool to store the gRPC connection instances DefaultMaxPoolSize uint64 = 60 // DefaultScheme : gRPC connection scheme to override the default scheme "passthrough" to "dns" DefaultScheme string = "dns" // DefaultGrpcInsecure : the authentication [enable/disable] bool flag DefaultGrpcInsecure bool = true // DefaultInterceptor : the gRPC connection library currently only supports one type of interceptors to send msg to the server that doesn't expect a response DefaultInterceptor ConnectionInterceptor = UnaryClient // DefaultRetriableCodes : possible retriable gRPC connection failure codes DefaultRetriableCodes = []codes.Code{codes.Aborted, codes.Unknown, codes.ResourceExhausted, codes.Unavailable} ConnIndex uint64 = 0 ConnPoolPipeline uint64 = 0 ConnRecreateCount uint64 = 0 IsConnRecreate bool = false )
Functions ¶
This section is empty.
Types ¶
type ConnPool ¶
type ConnPool struct { Conn *grpc.ClientConn MaxPoolSize uint64 ConnInstanceBatch *batch.Batch ConnBatchQueue *Queue Options *PoolConnOptions PipelineDoneChan chan interface{} Log grpclog.LoggerV2 }
ConnPool struct defines several fields to construct a connection pool. The gRPC connection instance replicated using batch processing and queried using reflect.SelectCase that gets the connection object from the list of cases as a pseudo-random choice.
Conn: The base gRPC connection that keeps the initial client connection instance. MaxPoolSize: The maximum number of connections created concurrently in the connection pool. ConnInstanceBatch: The batch processing will help creates multiple replicas of base gRPC connection instances concurrently. https://github.com/Deeptiman/go-batch library used to perform the batch processing. Options: The options will keep the connection pool configurations to create gRPC dialoptions for base connection instances. PipelineDoneChan: The connection pool runs a concurrency pipeline, so the "PipelineDoneChan" channel will be called after all the stages of the pipeline finishes. Log: The gRPC log will show the internal connection lifecycle that will be useful to debug the connection flow.
func NewConnPool ¶
func NewConnPool(opts ...PoolOptions) *ConnPool
NewConnPool will create the connection pool object that will instantiate the configurations for connection batch, retryOptions, interceptor.
func (*ConnPool) ClientConn ¶
func (c *ConnPool) ClientConn() (*grpc.ClientConn, error)
ClientConn will create the initial gRPC client connection instance. The connection factory works as a higher order function for gRPC retry policy in case of connection failure retries.
func (*ConnPool) ConnectionPoolPipeline ¶
func (c *ConnPool) ConnectionPoolPipeline(conn *grpc.ClientConn, pipelineDoneChan chan interface{})
ConnectionPoolPipeline follows the concurrency pipeline technique to create a connection pool in a higher concurrent scenarios. The pipeline has several stages that use the Fan-In, Fan-Out technique to process the data pipeline using channels.
The entire process of creating the connection pool becomes a powerful function using the pipeline technique. There are four different stages in this pipeline that works as a generator pattern to create a connection pool.
func (*ConnPool) EnqueConnBatch ¶
func (c *ConnPool) EnqueConnBatch(connItems batch.BatchItems)
EnqueConnBatch will enqueue the batchItems received from the go-batch supply channel.
func (*ConnPool) GetConnBatch ¶
func (c *ConnPool) GetConnBatch() batch.BatchItems
GetConnBatch will retrieve the batch item from the connection queue that dequeues the items using the pseudo-random technique.
func (*ConnPool) GetConnPoolSize ¶
GetConnPoolSize loads the number of connections created by the connection pool
type ConnectionInterceptor ¶
type ConnectionInterceptor int
ConnectionInterceptor defines the interceptors[UnaryServer:UnaryClient] type for a gRPC connection.
const ( // ConnectionInterceptor constants UnaryServer ConnectionInterceptor = iota UnaryClient )
type PoolConnOptions ¶
type PoolConnOptions struct {
// contains filtered or unexported fields
}
type PoolOptions ¶
type PoolOptions func(*ConnPool)
func WithAddress ¶
func WithAddress(address string) PoolOptions
func WithAuthority ¶
func WithAuthority(authority string) PoolOptions
func WithCodes ¶
func WithCodes(codes []codes.Code) PoolOptions
func WithConnectionInterceptor ¶
func WithConnectionInterceptor(interceptor ConnectionInterceptor) PoolOptions
func WithCredentials ¶
func WithCredentials(credentials credentials.TransportCredentials) PoolOptions
func WithInsecure ¶
func WithInsecure(insecure bool) PoolOptions
func WithRetry ¶
func WithRetry(retry int) PoolOptions
func WithScheme ¶
func WithScheme(scheme string) PoolOptions
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue struct defines the constructors to enqueue/dequeue of batch processing items.
size: The total size of the connection queue is similar to the value of MaxPoolSize. itemSelect: The list of enqueued cases that gets stored as channels and dequeued using pseudo-random selection. enqueCh: The enqueCh will store the array of channel batchItems and get added to the reflect.SelectCase. sem: The semaphore will establish the synchronization between the enqueue/dequeue process by creating Acquire/Release blocking channels. log: The log uses the loggrus library to provide logging into the library.
func NewQueue ¶
NewQueue will instantiate a new Queue with the given size and initialize the connection array of different configurations like []channel and reflect.SelectCase.
func (*Queue) Dequeue ¶
func (q *Queue) Dequeue() batch.BatchItems
Dequeue function will pick a batch item from the channel cases using the pseudo-random technique.
func (*Queue) Enqueue ¶
func (q *Queue) Enqueue(item batch.BatchItems)
Enqueue function will append the received batch item to the array of channel queue and also gets added to the []reflect.SelectCase as a channel case.
func (*Queue) GetEnqueCh ¶
GetEnqueCh will get the enqueued batch item for an index.
type Semaphore ¶
type Semaphore struct {
// contains filtered or unexported fields
}