Documentation ¶
Overview ¶
Package connpool is a general purpose object pool which can be used as a connection pool or a freelist.
Below is a demo showing how to use it.
The flowing two files can be found under github.com/marlonche/connpool/example/.
streampool.go
package main import ( "bufio" "fmt" "github.com/marlonche/connpool" "net" "sync" ) type PooledStreamErr string func (self PooledStreamErr) Error() string { return string(self) } var invalidStream = PooledStreamErr("invalid pooled stream") // Implemented PoolItem: pooled TCP stream type PooledStream struct { sync.RWMutex stream net.Conn pool *StreamPool err error container connpool.PoolItem closed bool } func NewPooledStream(stream net.Conn, pool *StreamPool) *PooledStream { return &PooledStream{ stream: stream, pool: pool, closed: false, } } // Just save the parameter passed in func (self *PooledStream) SetContainer(container connpool.PoolItem) { self.container = container } // Just return the saved parameter of SetContainer() func (self *PooledStream) GetContainer() connpool.PoolItem { return self.container } // This method is called by connpool as well as by users who encounter errors when // using PooledStream, e.g., PooledStream.Read(). // You can keep the error if it's unrecoverable and you want to discard the PooledStream, // or you can ignore the error if it doesn't affect the reuse of the PooledStream. // In PooledStream.Close(), you can check the kept error to discard or reuse PooledStream. func (self *PooledStream) SetErr(err error) { self.Lock() defer self.Unlock() if self.closed { return } if err != nil { if self.err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { return } } self.err = err } } // Return the error set by SetErr(). func (self *PooledStream) GetErr() error { self.RLock() defer self.RUnlock() return self.err } // Called after finishing using the PooledStream. // If the item is in error state, clear it by calling Pool.ClearItem(), // otherwise give it back by calling Pool.GiveBack(). func (self *PooledStream) Close() error { self.Lock() defer self.Unlock() if self.closed { return invalidStream } err := self.err if err != nil { self.pool.clearConn(self) self.pool = nil self.closed = true self.stream.Close() } else { self.pool.giveBack(self) } return nil } // Not part of PoolItem interface, just application logic. func (self *PooledStream) Write(b []byte) (int, error) { return self.stream.Write(b) } func (self *PooledStream) Read(b []byte) (int, error) { return self.stream.Read(b) } // Implemented PoolItem creator type streamCreator struct { pool *StreamPool addr string } // Called by connpool when more PoolItems are needed. func (self *streamCreator) NewItem() (connpool.PoolItem, error) { conn, err := net.Dial("tcp", self.addr) if err != nil { return nil, err } pooledStream := NewPooledStream(conn, self.pool) return pooledStream, nil } // Called by connpool every time before Pool.Get()'s return. // n = 1 means the first time. func (self *streamCreator) InitItem(item connpool.PoolItem, n uint64) error { if 1 == n { // first Get() if stream, _ := item.(*PooledStream); stream != nil { // receive from stream go func() { r := bufio.NewReader(stream) for { s, err := r.ReadString('\n') if err != nil { stream.SetErr(err) stream.Close() break } fmt.Printf("get echo from server: %v", s) } }() } } return nil } func (self *streamCreator) Close() error { self.pool = nil return nil } // pool wrapper type StreamPool struct { pool *connpool.Pool } func NewStreamPool(name string, addr string, maxTotal int, maxIdle int, idleTimeout int) *StreamPool { creator := &streamCreator{ addr: addr, } streamPool := &StreamPool{ pool: connpool.NewPool(name, creator, maxTotal, maxIdle, idleTimeout), } creator.pool = streamPool return streamPool } // wrapper of Pool.Get() func (self *StreamPool) Get() (*PooledStream, error) { item, err := self.pool.Get() if err != nil { return nil, err } if stream, ok := item.(*PooledStream); ok && stream != nil { return stream, nil } return nil, invalidStream } // wrapper of Pool.ClearItem() func (self *StreamPool) clearConn(pooledStream *PooledStream) { self.pool.ClearItem(pooledStream) } // wrapper of Pool.GiveBack() func (self *StreamPool) giveBack(pooledStream *PooledStream) { self.pool.GiveBack(pooledStream) } // wrapper of Pool.Close() func (self *StreamPool) Close() { self.pool.Close() }
main.go
package main import ( "bufio" "flag" "fmt" "net" "time" ) var flagAsServer = flag.Bool("asServer", false, "run as server demo") func main() { flag.Parse() asServer := *flagAsServer if asServer { runAsServer() } else { runAsClient() } } func runAsClient() { pool := NewStreamPool("pool-name", "127.0.0.1:9999", 10, 5, 60) for i := 0; i < 5000; i++ { go func() { conn, err := pool.Get() if err != nil { fmt.Printf("pool.Get() error: %v", err) return } defer conn.Close() content := fmt.Sprintf("Hello, my id is %v\n", time.Now().Nanosecond()) _, err = conn.Write([]byte(content)) if err != nil { fmt.Printf("conn write error: %v", err) conn.SetErr(err) } }() } time.Sleep(time.Hour) } func runAsServer() { l, err := net.Listen("tcp", ":9999") if err != nil { fmt.Printf("listen error: %v", err) return } for { conn, err := l.Accept() if err != nil { fmt.Printf("Accept error: %v", err) return } go func() { r := bufio.NewReader(conn) for { s, err := r.ReadString('\n') if err != nil { fmt.Printf("ReadString err: %v", err) return } if _, err = conn.Write([]byte(s)); err != nil { fmt.Printf("Write error: %v", err) return } } }() } }
Index ¶
- Variables
- type Creator
- type Pool
- func (self *Pool) ClearItem(item PoolItem)
- func (self *Pool) Close()
- func (self *Pool) Closed() bool
- func (self *Pool) Get() (_item PoolItem, _err error)
- func (self *Pool) GetIdleNum() int
- func (self *Pool) GetName() string
- func (self *Pool) GetTotalNum() int
- func (self *Pool) GiveBack(item PoolItem)
- func (self *Pool) IsItemActive(_item PoolItem) bool
- func (self *Pool) SetGetTimeout(timeout int)
- type PoolItem
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type Creator ¶
type Creator interface { // Used to create a new item which will be returned by Pool.Get(). // It will be called if there are not enough items // and there is still capacity space to allocate new items. NewItem() (PoolItem, error) // This method will be called every time before Pool.Get() returning a PoolItem to user. // Every item returned from Get() will be initialized with this method. // // item is the one will be returned by Pool.Get(). // n is the use count of this item. // n = 1 means the first use of this item. // // If the returned error is not nil, item.SetErr() and item.Close() will be // called sequentially by connpool, and item will not be returned by Pool.Get() InitItem(item PoolItem, n uint64) error Close() error }
Users should implement this interface to create PoolItems.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
The main pool struct.
func NewPool ¶
Create a connection pool.
name is an unique id of this pool;
creator is the Creator interface implemented by user;
maxTotalNum is the maximum total number of active and idle connections hold by this pool;
Here active refers to an item being hold by a user after Pool.Get(), while idle refers to an item in the pool waiting for Pool.Get().
maxIdleNum is the maximum number of idle connections hold by this pool;
idleTimeout is the timeout in second of idle connections, 0 means no timeout; If an item is in idle state for at least idleTimeout seconds, the item will be closed with error ErrIdleTimeout.
func (*Pool) ClearItem ¶
Call this method to clear items with error from the pool.
This method is called by user in the implementation of PoolItem.Close() when an error previously set by PoolItem.SetErr() is detected.
func (*Pool) Get ¶
Get pooled item originally created by Creator.NewItem().
If SetGetTimeout() is called with non-zero value, Get() will return with error ErrGetTimeout after timeout.
func (*Pool) GetTotalNum ¶
Get the total number of all items including active and idle.
func (*Pool) GiveBack ¶
Call this method to give normal(non-error) items back to the pool after finishing using.
This method is called by user in the implementation of PoolItem.Close() when no error with item is detected.
If idle items are full, this item will be closed with error ErrIdleFull.
func (*Pool) IsItemActive ¶
Check whether an item is active or not.
func (*Pool) SetGetTimeout ¶
Set Get()'s timeout in second, 0 means no timeout, default 0. Get() will return with error ErrGetTimeout on timeout.
This method can be called after NewPool().
type PoolItem ¶
type PoolItem interface { // Called after finishing using the PoolItem. // If the item is in error state, clear it by calling pool.ClearItem(), // otherwise give it back by calling pool.GiveBack(). Close() error // Save the error if the error is not recoverable. // This method is called by connpool as well as by users who encounter // errors when using PoolItem. SetErr(error) // Return error saved previously by SetErr(). GetErr() error // The two methods below are just called by connpool. // Implementers just need to save the parameter passed in. SetContainer(PoolItem) // Implementers just need to return the parameter saved by SetContainer(). GetContainer() PoolItem }
Pooled items should implement this interface.