abci

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2024 License: Apache-2.0 Imports: 14 Imported by: 2

README

ABCI and State Streaming Plugin (gRPC)

The BaseApp package contains the interface for a ABCIListener service used to write state changes out from individual KVStores to external systems, as described in ADR-038.

Specific ABCIListener service implementations are written and loaded as hashicorp/go-plugin.

Implementation

In this section we describe the implementation of the ABCIListener interface as a gRPC service.

Service Protocol

The companion service protocol for the ABCIListener interface is described below. See proto/cosmos/store/streaming/abci/grpc.proto for full details.

https://github.com/cosmos/cosmos-sdk/blob/6cee22df52eb0cbb30e351fbb41f66d26c1f8300/proto/cosmos/store/streaming/abci/grpc.proto#L1-L36
Generating the Code

To generate the stubs the local client implementation can call, run the following command:

make proto-gen

For other languages you'll need to download the CosmosSDK protos into your project and compile. For language specific compilation instructions visit https://github.com/grpc and look in the examples folder of your language of choice https://github.com/grpc/grpc-{language}/tree/master/examples and https://grpc.io for the documentation.

gRPC Client and Server

Implementing the ABCIListener gRPC client and server is a simple and straight forward process.

To create the client and server we create a ListenerGRPCPlugin struct that implements the plugin.GRPCPlugin interface and a Impl property that will contain a concrete implementation of the ABCIListener plugin written in Go.

The Interface

The BaseApp ABCIListener interface will be what will define the plugins capabilities.

Boilerplate RPC implementation example of the ABCIListener interface. (store/streaming/abci/grpc.go)

https://github.com/cosmos/cosmos-sdk/blob/f851e188b3b9d46e7c63fa514ad137e6d558fdd9/store/streaming/abci/grpc.go#L13-L79

Our ABCIlistener service plugin. (store/streaming/plugins/abci/v1/interface.go)

https://github.com/cosmos/cosmos-sdk/blob/f851e188b3b9d46e7c63fa514ad137e6d558fdd9/store/streaming/abci/interface.go#L13-L45
Plugin Implementation

Plugin implementations can be in a completely separate package but will need access to the ABCIListener interface. One thing to note here is that plugin implementations defined in the ListenerGRPCPlugin.Impl property are only required when building plugins in Go. They are pre-compiled into Go modules. The GRPCServer.Impl calls methods on this out-of-process plugin.

For Go plugins this is all that is required to process data that is sent over gRPC. This provides the advantage of writing quick plugins that process data to different external systems (i.e: DB, File, DB, Kafka, etc.) without the need for implementing the gRPC server endpoints.

// MyPlugin is the implementation of the ABCIListener interface
// For Go plugins this is all that is required to process data sent over gRPC.
type MyPlugin struct {
	...
}

func (a FilePlugin) ListenFinalizeBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
	// process data
	return nil
}

func (a FilePlugin) ListenCommit(ctx context.Context, res abci.ResponseCommit, changeSet []*store.StoreKVPair) error {
    // process data
    return nil
}

func main() {
	plugin.Serve(&plugin.ServeConfig{
		HandshakeConfig: v1.Handshake,
		Plugins: map[string]plugin.Plugin{
			"abci": &ABCIListenerGRPCPlugin{Impl: &MyPlugin{}},
		},

		// A non-nil value here enables gRPC serving for this streaming...
		GRPCServer: plugin.DefaultGRPCServer,
	})
}

Plugin Loading System

A general purpose plugin loading system has been provided by the SDK to be able to load not just the ABCIListener service plugin but other protocol services as well. You can take a look at how plugins are loaded by the SDK in store/streaming/streaming.go

You'll need to add this in your app.go

// app.go

func NewApp(...) *App {

    ...

    // register streaming services
    streamingCfg := cast.ToStringMap(appOpts.Get(baseapp.StreamingTomlKey))
    for service := range streamingCfg {
        pluginKey := fmt.Sprintf("%s.%s.%s", baseapp.StreamingTomlKey, service, baseapp.StreamingABCIPluginTomlKey)
        pluginName := strings.TrimSpace(cast.ToString(appOpts.Get(pluginKey)))
        if len(pluginName) > 0 {
            logLevel := cast.ToString(appOpts.Get(flags.FlagLogLevel))
            plugin, err := streaming.NewStreamingPlugin(pluginName, logLevel)
            if err != nil {
                tmos.Exit(err.Error())
            }
            if err := baseapp.RegisterStreamingPlugin(bApp, appOpts, keys, plugin); err != nil {
                tmos.Exit(err.Error())
            }
        }
    }

    ...
}

Configuration

Update the streaming section in app.toml

# Streaming allows nodes to stream state to external systems
[streaming]

# streaming.abci specifies the configuration for the ABCI Listener streaming service
[streaming.abci]

# List of kv store keys to stream out via gRPC
# Set to ["*"] to expose all keys.
keys = ["*"]

# The plugin name used for streaming via gRPC
# Supported plugins: abci
plugin = "abci"

# stop-node-on-err specifies whether to stop the node when the 
stop-node-on-err = true

Updating the protocol

If you update the protocol buffers file, you can regenerate the file and plugins using the following commands from the project root directory. You do not need to run this if you're just trying the examples, you can skip ahead to the Testing section.

make proto-gen 
  • stdout plugin; from inside the store/ dir, run:
go build -o streaming/abci/examples/stdout/stdout streaming/abci/examples/stdout/stdout.go
  • file plugin (writes to ~/); from inside the store/ dir, run:
go build -o streaming/abci/examples/file/file streaming/abci/examples/file/file.go
Testing

Export a plugin from one of the Go or Python examples.

  • stdout plugin
export COSMOS_SDK_ABCI="{path to}/cosmos-sdk/store/streaming/abci/examples/stdout/stdout"
  • file plugin (writes to ~/)
export COSMOS_SDK_ABCI="{path to}/cosmos-sdk/store/streaming/abci/examples/file/file"

where {path to} is the parent path to the cosmos-sdk repo on you system.

Test:

make test-sim-nondeterminism-streaming

The plugin system will look for the plugin binary in the env variable COSMOS_SDK_{PLUGIN_NAME} above and if it does not find it, it will error out. The plugin UPPERCASE name is that of the streaming.abci.plugin TOML configuration setting.

Documentation

Overview

Package abci contains shared data between the host and plugins.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthGrpc        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowGrpc          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupGrpc = fmt.Errorf("proto: unexpected end of group")
)
View Source
var Handshake = plugin.HandshakeConfig{

	ProtocolVersion:  1,
	MagicCookieKey:   "ABCI_LISTENER_PLUGIN",
	MagicCookieValue: "ef78114d-7bdf-411c-868f-347c99a78345",
}

Handshake is a common handshake that is shared by streaming and host. This prevents users from executing bad plugins or executing a plugin directory. It is a UX feature, not a security feature.

Functions

func RegisterABCIListenerServiceServer

func RegisterABCIListenerServiceServer(s grpc1.Server, srv ABCIListenerServiceServer)

Types

type ABCIListenerServiceClient

type ABCIListenerServiceClient interface {
	// ListenFinalizeBlock is the corresponding endpoint for ABCIListener.ListenEndBlock
	ListenFinalizeBlock(ctx context.Context, in *ListenFinalizeBlockRequest, opts ...grpc.CallOption) (*ListenFinalizeBlockResponse, error)
	// ListenCommit is the corresponding endpoint for ABCIListener.ListenCommit
	ListenCommit(ctx context.Context, in *ListenCommitRequest, opts ...grpc.CallOption) (*ListenCommitResponse, error)
}

ABCIListenerServiceClient is the client API for ABCIListenerService service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.

func NewABCIListenerServiceClient

func NewABCIListenerServiceClient(cc grpc1.ClientConn) ABCIListenerServiceClient

type ABCIListenerServiceServer

type ABCIListenerServiceServer interface {
	// ListenFinalizeBlock is the corresponding endpoint for ABCIListener.ListenEndBlock
	ListenFinalizeBlock(context.Context, *ListenFinalizeBlockRequest) (*ListenFinalizeBlockResponse, error)
	// ListenCommit is the corresponding endpoint for ABCIListener.ListenCommit
	ListenCommit(context.Context, *ListenCommitRequest) (*ListenCommitResponse, error)
}

ABCIListenerServiceServer is the server API for ABCIListenerService service.

type GRPCClient

type GRPCClient struct {
	// contains filtered or unexported fields
}

GRPCClient is an implementation of the ABCIListener interface that talks over RPC.

func (*GRPCClient) ListenCommit

func (m *GRPCClient) ListenCommit(goCtx context.Context, res abci.ResponseCommit, changeSet []*storetypes.StoreKVPair) error

ListenCommit listens to commit responses and state changes for the current block. In addition, it retrieves a types.Context from a context.Context instance. It panics if a types.Context was not properly attached. When the node is configured to stop on listening errors, it will terminate immediately and exit with a non-zero code.

func (*GRPCClient) ListenFinalizeBlock

func (m *GRPCClient) ListenFinalizeBlock(goCtx context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error

ListenEndBlock listens to end block request and responses. In addition, it retrieves a types.Context from a context.Context instance. It panics if a types.Context was not properly attached. When the node is configured to stop on listening errors, it will terminate immediately and exit with a non-zero code.

type GRPCServer

type GRPCServer struct {
	// This is the real implementation
	Impl storetypes.ABCIListener
}

GRPCServer is the gRPC server that GRPCClient talks to.

func (GRPCServer) ListenCommit

func (m GRPCServer) ListenCommit(ctx context.Context, request *ListenCommitRequest) (*ListenCommitResponse, error)

func (GRPCServer) ListenFinalizeBlock

func (m GRPCServer) ListenFinalizeBlock(ctx context.Context, request *ListenFinalizeBlockRequest) (*ListenFinalizeBlockResponse, error)

type ListenCommitRequest

type ListenCommitRequest struct {
	// explicitly pass in block height as ResponseCommit does not contain this info
	BlockHeight int64                 `protobuf:"varint,1,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"`
	Res         *types.ResponseCommit `protobuf:"bytes,2,opt,name=res,proto3" json:"res,omitempty"`
	ChangeSet   []*types1.StoreKVPair `protobuf:"bytes,3,rep,name=change_set,json=changeSet,proto3" json:"change_set,omitempty"`
}

ListenCommitRequest is the request type for the ListenCommit RPC method

func (*ListenCommitRequest) Descriptor

func (*ListenCommitRequest) Descriptor() ([]byte, []int)

func (*ListenCommitRequest) GetBlockHeight

func (m *ListenCommitRequest) GetBlockHeight() int64

func (*ListenCommitRequest) GetChangeSet

func (m *ListenCommitRequest) GetChangeSet() []*types1.StoreKVPair

func (*ListenCommitRequest) GetRes

func (*ListenCommitRequest) Marshal

func (m *ListenCommitRequest) Marshal() (dAtA []byte, err error)

func (*ListenCommitRequest) MarshalTo

func (m *ListenCommitRequest) MarshalTo(dAtA []byte) (int, error)

func (*ListenCommitRequest) MarshalToSizedBuffer

func (m *ListenCommitRequest) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ListenCommitRequest) ProtoMessage

func (*ListenCommitRequest) ProtoMessage()

func (*ListenCommitRequest) Reset

func (m *ListenCommitRequest) Reset()

func (*ListenCommitRequest) Size

func (m *ListenCommitRequest) Size() (n int)

func (*ListenCommitRequest) String

func (m *ListenCommitRequest) String() string

func (*ListenCommitRequest) Unmarshal

func (m *ListenCommitRequest) Unmarshal(dAtA []byte) error

func (*ListenCommitRequest) XXX_DiscardUnknown

func (m *ListenCommitRequest) XXX_DiscardUnknown()

func (*ListenCommitRequest) XXX_Marshal

func (m *ListenCommitRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ListenCommitRequest) XXX_Merge

func (m *ListenCommitRequest) XXX_Merge(src proto.Message)

func (*ListenCommitRequest) XXX_Size

func (m *ListenCommitRequest) XXX_Size() int

func (*ListenCommitRequest) XXX_Unmarshal

func (m *ListenCommitRequest) XXX_Unmarshal(b []byte) error

type ListenCommitResponse

type ListenCommitResponse struct {
}

ListenCommitResponse is the response type for the ListenCommit RPC method

func (*ListenCommitResponse) Descriptor

func (*ListenCommitResponse) Descriptor() ([]byte, []int)

func (*ListenCommitResponse) Marshal

func (m *ListenCommitResponse) Marshal() (dAtA []byte, err error)

func (*ListenCommitResponse) MarshalTo

func (m *ListenCommitResponse) MarshalTo(dAtA []byte) (int, error)

func (*ListenCommitResponse) MarshalToSizedBuffer

func (m *ListenCommitResponse) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ListenCommitResponse) ProtoMessage

func (*ListenCommitResponse) ProtoMessage()

func (*ListenCommitResponse) Reset

func (m *ListenCommitResponse) Reset()

func (*ListenCommitResponse) Size

func (m *ListenCommitResponse) Size() (n int)

func (*ListenCommitResponse) String

func (m *ListenCommitResponse) String() string

func (*ListenCommitResponse) Unmarshal

func (m *ListenCommitResponse) Unmarshal(dAtA []byte) error

func (*ListenCommitResponse) XXX_DiscardUnknown

func (m *ListenCommitResponse) XXX_DiscardUnknown()

func (*ListenCommitResponse) XXX_Marshal

func (m *ListenCommitResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ListenCommitResponse) XXX_Merge

func (m *ListenCommitResponse) XXX_Merge(src proto.Message)

func (*ListenCommitResponse) XXX_Size

func (m *ListenCommitResponse) XXX_Size() int

func (*ListenCommitResponse) XXX_Unmarshal

func (m *ListenCommitResponse) XXX_Unmarshal(b []byte) error

type ListenFinalizeBlockRequest

type ListenFinalizeBlockRequest struct {
	Req *types.RequestFinalizeBlock  `protobuf:"bytes,1,opt,name=req,proto3" json:"req,omitempty"`
	Res *types.ResponseFinalizeBlock `protobuf:"bytes,2,opt,name=res,proto3" json:"res,omitempty"`
}

ListenEndBlockRequest is the request type for the ListenEndBlock RPC method

func (*ListenFinalizeBlockRequest) Descriptor

func (*ListenFinalizeBlockRequest) Descriptor() ([]byte, []int)

func (*ListenFinalizeBlockRequest) GetReq

func (*ListenFinalizeBlockRequest) GetRes

func (*ListenFinalizeBlockRequest) Marshal

func (m *ListenFinalizeBlockRequest) Marshal() (dAtA []byte, err error)

func (*ListenFinalizeBlockRequest) MarshalTo

func (m *ListenFinalizeBlockRequest) MarshalTo(dAtA []byte) (int, error)

func (*ListenFinalizeBlockRequest) MarshalToSizedBuffer

func (m *ListenFinalizeBlockRequest) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ListenFinalizeBlockRequest) ProtoMessage

func (*ListenFinalizeBlockRequest) ProtoMessage()

func (*ListenFinalizeBlockRequest) Reset

func (m *ListenFinalizeBlockRequest) Reset()

func (*ListenFinalizeBlockRequest) Size

func (m *ListenFinalizeBlockRequest) Size() (n int)

func (*ListenFinalizeBlockRequest) String

func (m *ListenFinalizeBlockRequest) String() string

func (*ListenFinalizeBlockRequest) Unmarshal

func (m *ListenFinalizeBlockRequest) Unmarshal(dAtA []byte) error

func (*ListenFinalizeBlockRequest) XXX_DiscardUnknown

func (m *ListenFinalizeBlockRequest) XXX_DiscardUnknown()

func (*ListenFinalizeBlockRequest) XXX_Marshal

func (m *ListenFinalizeBlockRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ListenFinalizeBlockRequest) XXX_Merge

func (m *ListenFinalizeBlockRequest) XXX_Merge(src proto.Message)

func (*ListenFinalizeBlockRequest) XXX_Size

func (m *ListenFinalizeBlockRequest) XXX_Size() int

func (*ListenFinalizeBlockRequest) XXX_Unmarshal

func (m *ListenFinalizeBlockRequest) XXX_Unmarshal(b []byte) error

type ListenFinalizeBlockResponse

type ListenFinalizeBlockResponse struct {
}

ListenEndBlockResponse is the response type for the ListenEndBlock RPC method

func (*ListenFinalizeBlockResponse) Descriptor

func (*ListenFinalizeBlockResponse) Descriptor() ([]byte, []int)

func (*ListenFinalizeBlockResponse) Marshal

func (m *ListenFinalizeBlockResponse) Marshal() (dAtA []byte, err error)

func (*ListenFinalizeBlockResponse) MarshalTo

func (m *ListenFinalizeBlockResponse) MarshalTo(dAtA []byte) (int, error)

func (*ListenFinalizeBlockResponse) MarshalToSizedBuffer

func (m *ListenFinalizeBlockResponse) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ListenFinalizeBlockResponse) ProtoMessage

func (*ListenFinalizeBlockResponse) ProtoMessage()

func (*ListenFinalizeBlockResponse) Reset

func (m *ListenFinalizeBlockResponse) Reset()

func (*ListenFinalizeBlockResponse) Size

func (m *ListenFinalizeBlockResponse) Size() (n int)

func (*ListenFinalizeBlockResponse) String

func (m *ListenFinalizeBlockResponse) String() string

func (*ListenFinalizeBlockResponse) Unmarshal

func (m *ListenFinalizeBlockResponse) Unmarshal(dAtA []byte) error

func (*ListenFinalizeBlockResponse) XXX_DiscardUnknown

func (m *ListenFinalizeBlockResponse) XXX_DiscardUnknown()

func (*ListenFinalizeBlockResponse) XXX_Marshal

func (m *ListenFinalizeBlockResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ListenFinalizeBlockResponse) XXX_Merge

func (m *ListenFinalizeBlockResponse) XXX_Merge(src proto.Message)

func (*ListenFinalizeBlockResponse) XXX_Size

func (m *ListenFinalizeBlockResponse) XXX_Size() int

func (*ListenFinalizeBlockResponse) XXX_Unmarshal

func (m *ListenFinalizeBlockResponse) XXX_Unmarshal(b []byte) error

type ListenerGRPCPlugin

type ListenerGRPCPlugin struct {
	// GRPCPlugin must still implement the Plugin interface
	plugin.Plugin
	// Concrete implementation, written in Go. This is only used for plugins
	// that are written in Go.
	Impl storetypes.ABCIListener
}

ListenerGRPCPlugin is the implementation of plugin.GRPCPlugin, so we can serve/consume this.

func (*ListenerGRPCPlugin) GRPCClient

func (p *ListenerGRPCPlugin) GRPCClient(
	_ context.Context,
	_ *plugin.GRPCBroker,
	c *grpc.ClientConn,
) (interface{}, error)

func (*ListenerGRPCPlugin) GRPCServer

func (p *ListenerGRPCPlugin) GRPCServer(_ *plugin.GRPCBroker, s *grpc.Server) error

type UnimplementedABCIListenerServiceServer

type UnimplementedABCIListenerServiceServer struct {
}

UnimplementedABCIListenerServiceServer can be embedded to have forward compatible implementations.

func (*UnimplementedABCIListenerServiceServer) ListenCommit

func (*UnimplementedABCIListenerServiceServer) ListenFinalizeBlock

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL