Documentation
¶
Overview ¶
Package pool provides a wrapper for several NeoFS API clients.
The main component is Pool type. It is a virtual connection to the network and provides methods for executing operations on the server. It also supports a weighted random selection of the underlying client to make requests.
Pool has an auto-session mechanism for object operations. It is enabled by default. The mechanism allows to manipulate objects like upload, download, delete, etc, without explicit session passing. This behavior may be disabled per request by calling IgnoreSession() on the appropriate Prm* argument. Note that if auto-session is disabled, the user MUST provide the appropriate session manually for PUT and DELETE object operations. The user may provide session, for another object operations.
Index ¶
- type InitParameters
- func (x *InitParameters) AddNode(nodeParam NodeParam)
- func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration)
- func (x *InitParameters) SetErrorThreshold(threshold uint32)
- func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration)
- func (x *InitParameters) SetLogger(logger *zap.Logger)
- func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration)
- func (x *InitParameters) SetNodeSessionCacheSize(cacheSize int)
- func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration)
- func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64)
- func (x *InitParameters) SetSigner(signer user.Signer)
- func (x *InitParameters) SetStatisticCallback(statisticCallback stat.OperationCallback)
- type NodeParam
- type Pool
- func (p *Pool) BalanceGet(ctx context.Context, prm client.PrmBalanceGet) (accounting.Decimal, error)
- func (p *Pool) Close()
- func (p *Pool) ContainerDelete(ctx context.Context, id cid.ID, signer neofscrypto.Signer, ...) error
- func (p *Pool) ContainerEACL(ctx context.Context, id cid.ID, prm client.PrmContainerEACL) (eacl.Table, error)
- func (p *Pool) ContainerGet(ctx context.Context, id cid.ID, prm client.PrmContainerGet) (container.Container, error)
- func (p *Pool) ContainerList(ctx context.Context, ownerID user.ID, prm client.PrmContainerList) ([]cid.ID, error)
- func (p *Pool) ContainerPut(ctx context.Context, cont container.Container, signer neofscrypto.Signer, ...) (cid.ID, error)
- func (p *Pool) ContainerSetEACL(ctx context.Context, table eacl.Table, signer user.Signer, ...) error
- func (p *Pool) Dial(ctx context.Context) error
- func (p *Pool) NetMapSnapshot(ctx context.Context, prm client.PrmNetMapSnapshot) (netmap.NetMap, error)
- func (p *Pool) NetworkInfo(ctx context.Context, prm client.PrmNetworkInfo) (netmap.NetworkInfo, error)
- func (p *Pool) ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, ...) (oid.ID, error)
- func (p *Pool) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, ...) (object.Object, *client.PayloadReader, error)
- func (p *Pool) ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, ...) ([][]byte, error)
- func (p *Pool) ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, ...) (*object.Object, error)
- func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, ...) (client.ObjectWriter, error)
- func (p *Pool) ObjectRangeInit(ctx context.Context, containerID cid.ID, objectID oid.ID, ...) (*client.ObjectRangeReader, error)
- func (p *Pool) ObjectSearchInit(ctx context.Context, containerID cid.ID, signer user.Signer, ...) (*client.ObjectListReader, error)
- func (p *Pool) RawClient() (*sdkClient.Client, error)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type InitParameters ¶
type InitParameters struct {
// contains filtered or unexported fields
}
InitParameters contains values used to initialize connection Pool.
func DefaultOptions ¶
func DefaultOptions() InitParameters
DefaultOptions returns default option preset for Pool creation. It may be used like start point for configuration or like main configuration.
func (*InitParameters) AddNode ¶
func (x *InitParameters) AddNode(nodeParam NodeParam)
AddNode append information about the node to which you want to connect.
func (*InitParameters) SetClientRebalanceInterval ¶
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration)
SetClientRebalanceInterval specifies the interval for updating nodes health status.
See also Pool.Dial.
func (*InitParameters) SetErrorThreshold ¶
func (x *InitParameters) SetErrorThreshold(threshold uint32)
SetErrorThreshold specifies the number of errors on connection after which node is considered as unhealthy.
func (*InitParameters) SetHealthcheckTimeout ¶
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration)
SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
See also Pool.Dial.
func (*InitParameters) SetLogger ¶
func (x *InitParameters) SetLogger(logger *zap.Logger)
SetLogger specifies logger.
func (*InitParameters) SetNodeDialTimeout ¶
func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration)
SetNodeDialTimeout specifies the timeout for connection to be established.
func (*InitParameters) SetNodeSessionCacheSize ¶
func (x *InitParameters) SetNodeSessionCacheSize(cacheSize int)
SetNodeSessionCacheSize sets cache size for the basic sessions for node.
func (*InitParameters) SetNodeStreamTimeout ¶
func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration)
SetNodeStreamTimeout specifies the timeout for individual operations in streaming RPC.
func (*InitParameters) SetSessionExpirationDuration ¶
func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64)
SetSessionExpirationDuration specifies the session token lifetime in epochs.
func (*InitParameters) SetSigner ¶
func (x *InitParameters) SetSigner(signer user.Signer)
SetSigner specifies default signer to be used for the protocol communication by default. MUST be of neofscrypto.ECDSA_DETERMINISTIC_SHA256 scheme, for example, neofsecdsa.SignerRFC6979 can be used.
func (*InitParameters) SetStatisticCallback ¶
func (x *InitParameters) SetStatisticCallback(statisticCallback stat.OperationCallback)
SetStatisticCallback makes the Pool to pass stat.OperationCallback for external statistic.
type NodeParam ¶
type NodeParam struct {
// contains filtered or unexported fields
}
NodeParam groups parameters of remote node.
func NewFlatNodeParams ¶
NewFlatNodeParams converts endpoints to appropriate NodeParam. It is useful for situations where all endpoints are equivalent.
func NewNodeParam ¶
NewNodeParam creates NodeParam using parameters. Address parameter MUST follow the client requirements, see sdkClient.PrmDial.SetServerURI for details.
func (*NodeParam) SetAddress ¶
SetAddress specifies address of the node.
func (*NodeParam) SetPriority ¶
SetPriority specifies priority of the node. Negative value is allowed. In the result node groups with the same priority will be sorted by descent.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool represents virtual connection to the NeoFS network to communicate with multiple NeoFS servers without thinking about switching between servers due to load balancing proportions or their unavailability. It is designed to provide a convenient abstraction from the multiple sdkClient.client types.
Pool can be created and initialized using NewPool function. Before executing the NeoFS operations using the Pool, connection to the servers MUST BE correctly established (see Dial method). Using the Pool before connecting have been established can lead to a panic. After the work, the Pool SHOULD BE closed (see Close method): it frees internal and system resources which were allocated for the period of work of the Pool. Calling Dial/Close methods during the communication process step strongly discouraged as it leads to undefined behavior.
Each method which produces a NeoFS API call may return an error. Status of underlying server response is casted to built-in error instance. Certain statuses can be checked using `sdkClient` and standard `errors` packages.
See pool package overview to get some examples.
func New ¶
New creates connection pool using simple set of endpoints and parameters.
See also pool.DefaultOptions and pool.NewFlatNodeParams for details.
Returned errors:
Example (AdjustingParameters) ¶
package main import ( "crypto/ecdsa" "crypto/elliptic" "crypto/rand" "github.com/epicchainlabs/epicchain-sdk-go/pool" "github.com/epicchainlabs/epicchain-sdk-go/user" ) func main() { // The key is generated to simplify the example, in reality it's likely to come from configuration/wallet. pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) signer := user.NewAutoIDSignerRFC6979(*pk) opts := pool.DefaultOptions() opts.SetErrorThreshold(10) p, _ := pool.New( pool.NewFlatNodeParams([]string{"grpc://localhost:8080", "grpcs://localhost:8081"}), signer, opts, ) _ = p }
Output:
Example (EasiestWay) ¶
package main import ( "crypto/ecdsa" "crypto/elliptic" "crypto/rand" "github.com/epicchainlabs/epicchain-sdk-go/pool" "github.com/epicchainlabs/epicchain-sdk-go/user" ) func main() { // The key is generated to simplify the example, in reality it's likely to come from configuration/wallet. pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) signer := user.NewAutoIDSignerRFC6979(*pk) p, _ := pool.New( pool.NewFlatNodeParams([]string{"grpc://localhost:8080", "grpcs://localhost:8081"}), signer, pool.DefaultOptions(), ) _ = p }
Output:
func NewPool ¶
func NewPool(options InitParameters) (*Pool, error)
NewPool creates connection pool using parameters.
Returned errors:
Example ¶
Create pool instance with 3 nodes connection. This InitParameters will make pool use 192.168.130.71 node while it is healthy. Otherwise, it will make the pool use 192.168.130.72 for 90% of requests and 192.168.130.73 for remaining 10%.
package main import ( "github.com/epicchainlabs/epicchain-sdk-go/pool" "github.com/epicchainlabs/epicchain-sdk-go/user" ) func main() { // import "github.com/epicchainlabs/epicchain-sdk-go/user" var signer user.Signer var prm pool.InitParameters prm.SetSigner(signer) prm.AddNode(pool.NewNodeParam(1, "192.168.130.71", 1)) prm.AddNode(pool.NewNodeParam(2, "192.168.130.72", 9)) prm.AddNode(pool.NewNodeParam(2, "192.168.130.73", 1)) // ... p, err := pool.NewPool(prm) _ = p _ = err }
Output:
func (*Pool) BalanceGet ¶
func (p *Pool) BalanceGet(ctx context.Context, prm client.PrmBalanceGet) (accounting.Decimal, error)
BalanceGet requests current balance of the NeoFS account.
See details in [client.Client.BalanceGet].
func (*Pool) Close ¶
func (p *Pool) Close()
Close closes the Pool and releases all the associated resources.
func (*Pool) ContainerDelete ¶
func (p *Pool) ContainerDelete(ctx context.Context, id cid.ID, signer neofscrypto.Signer, prm client.PrmContainerDelete) error
ContainerDelete sends request to remove the NeoFS container.
See details in [client.Client.ContainerDelete].
func (*Pool) ContainerEACL ¶
func (p *Pool) ContainerEACL(ctx context.Context, id cid.ID, prm client.PrmContainerEACL) (eacl.Table, error)
ContainerEACL reads eACL table of the NeoFS container.
See details in [client.Client.ContainerEACL].
func (*Pool) ContainerGet ¶
func (p *Pool) ContainerGet(ctx context.Context, id cid.ID, prm client.PrmContainerGet) (container.Container, error)
ContainerGet reads NeoFS container by ID.
See details in [client.Client.ContainerGet].
func (*Pool) ContainerList ¶
func (p *Pool) ContainerList(ctx context.Context, ownerID user.ID, prm client.PrmContainerList) ([]cid.ID, error)
ContainerList requests identifiers of the account-owned containers.
See details in [client.Client.ContainerList].
func (*Pool) ContainerPut ¶
func (p *Pool) ContainerPut(ctx context.Context, cont container.Container, signer neofscrypto.Signer, prm client.PrmContainerPut) (cid.ID, error)
ContainerPut sends request to save container in NeoFS.
See details in [client.Client.ContainerPut].
Example ¶
package main import ( "context" "github.com/epicchainlabs/epicchain-sdk-go/client" "github.com/epicchainlabs/epicchain-sdk-go/container" neofscrypto "github.com/epicchainlabs/epicchain-sdk-go/crypto" "github.com/epicchainlabs/epicchain-sdk-go/pool" "github.com/epicchainlabs/epicchain-sdk-go/waiter" ) func main() { // import "github.com/epicchainlabs/epicchain-sdk-go/waiter" // import "github.com/epicchainlabs/epicchain-sdk-go/container" // import neofscrypto "github.com/epicchainlabs/epicchain-sdk-go/crypto" var p pool.Pool // ... init pool // Connect to the NeoFS server _ = p.Dial(context.Background()) var cont container.Container // ... fill container var signer neofscrypto.Signer // ... create signer var prmPut client.PrmContainerPut // ... fill params, if required // waits until container created or context canceled. w := waiter.NewContainerPutWaiter(&p, waiter.DefaultPollInterval) containerID, err := w.ContainerPut(context.Background(), cont, signer, prmPut) _ = containerID _ = err }
Output:
func (*Pool) ContainerSetEACL ¶
func (p *Pool) ContainerSetEACL(ctx context.Context, table eacl.Table, signer user.Signer, prm client.PrmContainerSetEACL) error
ContainerSetEACL sends request to update eACL table of the NeoFS container.
See details in [client.Client.ContainerSetEACL].
func (*Pool) Dial ¶
Dial establishes a connection to the servers from the NeoFS network. It also starts a routine that checks the health of the nodes and updates the weights of the nodes for balancing. Returns an error describing failure reason.
If failed, the Pool SHOULD NOT be used.
See also InitParameters.SetClientRebalanceInterval.
func (*Pool) NetMapSnapshot ¶
func (p *Pool) NetMapSnapshot(ctx context.Context, prm client.PrmNetMapSnapshot) (netmap.NetMap, error)
NetMapSnapshot requests current network view of the remote server.
See details in [client.Client.NetMapSnapshot].
func (*Pool) NetworkInfo ¶
func (p *Pool) NetworkInfo(ctx context.Context, prm client.PrmNetworkInfo) (netmap.NetworkInfo, error)
NetworkInfo requests information about the NeoFS network of which the remote server is a part.
See details in [client.Client.NetworkInfo].
func (*Pool) ObjectDelete ¶
func (p *Pool) ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error)
ObjectDelete marks an object for deletion from the container using NeoFS API protocol.
Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.
See details in [client.Client.ObjectDelete].
func (*Pool) ObjectGetInit ¶
func (p *Pool) ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
ObjectGetInit initiates reading an object through a remote server using NeoFS API protocol.
Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.
See details in [client.Client.ObjectGetInit].
Example (ExplicitAutoSessionDisabling) ¶
package main import ( "context" "crypto/ecdsa" "crypto/elliptic" "crypto/rand" "github.com/epicchainlabs/epicchain-sdk-go/client" cid "github.com/epicchainlabs/epicchain-sdk-go/container/id" "github.com/epicchainlabs/epicchain-sdk-go/object" oid "github.com/epicchainlabs/epicchain-sdk-go/object/id" "github.com/epicchainlabs/epicchain-sdk-go/pool" "github.com/epicchainlabs/epicchain-sdk-go/user" ) func main() { // The key is generated to simplify the example, in reality it's likely to come from configuration/wallet. pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) signer := user.NewAutoIDSignerRFC6979(*pk) p, _ := pool.New( pool.NewFlatNodeParams([]string{"grpc://localhost:8080", "grpcs://localhost:8081"}), signer, pool.DefaultOptions(), ) _ = p.Dial(context.Background()) var prm client.PrmObjectGet // If you don't provide the session manually with prm.WithinSession, the request will be executed without session. prm.IgnoreSession() var ownerID user.ID var hdr object.Object hdr.SetContainerID(cid.ID{}) hdr.SetOwnerID(&ownerID) var containerID cid.ID // fill containerID var objetID oid.ID // fill objectID // In case of a session wasn't provided with prm.WithinSession, the signer must be for account which is a container // owner, otherwise there will be an error. // In case of a session was provided with prm.WithinSession, the signer can be ether container owner account or // third party, who can use a session token signed by container owner. _, _, _ = p.ObjectGetInit(context.Background(), containerID, objetID, signer, prm) // ... }
Output:
func (*Pool) ObjectHash ¶
func (p *Pool) ObjectHash(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHash) ([][]byte, error)
ObjectHash requests checksum of the range list of the object payload using
Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.
See details in [client.Client.ObjectHash].
func (*Pool) ObjectHead ¶
func (p *Pool) ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
ObjectHead reads object header through a remote server using NeoFS API protocol.
Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.
See details in [client.Client.ObjectHead].
Example ¶
package main import ( "context" "errors" "github.com/epicchainlabs/epicchain-sdk-go/client" apistatus "github.com/epicchainlabs/epicchain-sdk-go/client/status" cid "github.com/epicchainlabs/epicchain-sdk-go/container/id" oid "github.com/epicchainlabs/epicchain-sdk-go/object/id" "github.com/epicchainlabs/epicchain-sdk-go/pool" "github.com/epicchainlabs/epicchain-sdk-go/user" ) func main() { // import "github.com/epicchainlabs/epicchain-sdk-go/waiter" // import "github.com/epicchainlabs/epicchain-sdk-go/container" // import "github.com/epicchainlabs/epicchain-sdk-go/user" var p pool.Pool // ... init pool // Connect to the NeoFS server _ = p.Dial(context.Background()) var signer user.Signer // ... create signer var prmHead client.PrmObjectHead // ... fill params, if required hdr, err := p.ObjectHead(context.Background(), cid.ID{}, oid.ID{}, signer, prmHead) if err != nil { if errors.Is(err, apistatus.ErrObjectNotFound) { return } // ... } _ = hdr p.Close() }
Output:
func (*Pool) ObjectPutInit ¶
func (p *Pool) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error)
ObjectPutInit initiates writing an object through a remote server using NeoFS API protocol. Header length is limited to object.MaxHeaderLen.
Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.
See details in [client.Client.ObjectPutInit].
Example (AutoSessionDisabling) ¶
package main import ( "context" "crypto/ecdsa" "crypto/elliptic" "crypto/rand" "github.com/epicchainlabs/epicchain-sdk-go/client" cid "github.com/epicchainlabs/epicchain-sdk-go/container/id" "github.com/epicchainlabs/epicchain-sdk-go/object" "github.com/epicchainlabs/epicchain-sdk-go/pool" "github.com/epicchainlabs/epicchain-sdk-go/session" "github.com/epicchainlabs/epicchain-sdk-go/user" ) func main() { // The key is generated to simplify the example, in reality it's likely to come from configuration/wallet. pk, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) signer := user.NewAutoIDSignerRFC6979(*pk) p, _ := pool.New( pool.NewFlatNodeParams([]string{"grpc://localhost:8080", "grpcs://localhost:8081"}), signer, pool.DefaultOptions(), ) _ = p.Dial(context.Background()) // Session should be initialized with Client.SessionCreate function. var sess session.Object var prm client.PrmObjectPutInit // Auto-session disabled, because you provided session already. prm.WithinSession(sess) // For ObjectPutInit operation prm.IgnoreSession shouldn't be called ever, because putObject without session is not // acceptable, and it will be an error. // prm.IgnoreSession() var ownerID user.ID var hdr object.Object hdr.SetContainerID(cid.ID{}) hdr.SetOwnerID(&ownerID) // In case of a session wasn't provided with prm.WithinSession, the signer must be for account which is a container // owner, otherwise there will be an error. // In case of a session was provided with prm.WithinSession, the signer can be ether container owner account or // third party, who can use a session token signed by container owner. _, _ = p.ObjectPutInit(context.Background(), hdr, signer, prm) // ... }
Output:
func (*Pool) ObjectRangeInit ¶
func (p *Pool) ObjectRangeInit(ctx context.Context, containerID cid.ID, objectID oid.ID, offset, length uint64, signer user.Signer, prm client.PrmObjectRange) (*client.ObjectRangeReader, error)
ObjectRangeInit initiates reading an object's payload range through a remote
Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.
See details in [client.Client.ObjectRangeInit].
func (*Pool) ObjectSearchInit ¶
func (p *Pool) ObjectSearchInit(ctx context.Context, containerID cid.ID, signer user.Signer, prm client.PrmObjectSearch) (*client.ObjectListReader, error)
ObjectSearchInit initiates object selection through a remote server using NeoFS API protocol.
Operation is executed within a session automatically created by Pool unless parameters explicitly override session settings.
See details in [client.Client.ObjectSearchInit].