Documentation ¶
Overview ¶
Package word_count is an example application which provides a gRPC API for publishing texts and querying running counts of NGrams extracted from previously published texts.
Index ¶
- Variables
- func RegisterNGramServer(s *grpc.Server, srv NGramServer)
- type Counter
- func (Counter) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope) error
- func (Counter) FinalizeTxn(shard consumer.Shard, store consumer.Store) error
- func (counter *Counter) InitApplication(args runconsumer.InitArgs) error
- func (Counter) NewConfig() runconsumer.Config
- func (Counter) NewMessage(*pb.JournalSpec) (message.Message, error)
- func (Counter) NewStore(shard consumer.Shard, dir string, rec *recoverylog.Recorder) (consumer.Store, error)
- func (counter *Counter) Publish(ctx context.Context, req *PublishRequest) (*PublishResponse, error)
- func (counter *Counter) Query(ctx context.Context, req *QueryRequest) (resp *QueryResponse, err error)
- type NGram
- type NGramClient
- type NGramCount
- func (*NGramCount) Descriptor() ([]byte, []int)
- func (m *NGramCount) GetCount() uint64
- func (m *NGramCount) GetNGram() NGram
- func (m *NGramCount) Marshal() (dAtA []byte, err error)
- func (m *NGramCount) MarshalTo(dAtA []byte) (int, error)
- func (*NGramCount) ProtoMessage()
- func (m *NGramCount) ProtoSize() (n int)
- func (m *NGramCount) Reset()
- func (m *NGramCount) String() string
- func (m *NGramCount) Unmarshal(dAtA []byte) error
- func (m *NGramCount) XXX_DiscardUnknown()
- func (m *NGramCount) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *NGramCount) XXX_Merge(src proto.Message)
- func (m *NGramCount) XXX_Size() int
- func (m *NGramCount) XXX_Unmarshal(b []byte) error
- type NGramServer
- type PublishRequest
- func (*PublishRequest) Descriptor() ([]byte, []int)
- func (m *PublishRequest) GetText() string
- func (m *PublishRequest) Marshal() (dAtA []byte, err error)
- func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error)
- func (*PublishRequest) ProtoMessage()
- func (m *PublishRequest) ProtoSize() (n int)
- func (m *PublishRequest) Reset()
- func (m *PublishRequest) String() string
- func (m *PublishRequest) Unmarshal(dAtA []byte) error
- func (m *PublishRequest) XXX_DiscardUnknown()
- func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *PublishRequest) XXX_Merge(src proto.Message)
- func (m *PublishRequest) XXX_Size() int
- func (m *PublishRequest) XXX_Unmarshal(b []byte) error
- type PublishResponse
- func (*PublishResponse) Descriptor() ([]byte, []int)
- func (m *PublishResponse) Marshal() (dAtA []byte, err error)
- func (m *PublishResponse) MarshalTo(dAtA []byte) (int, error)
- func (*PublishResponse) ProtoMessage()
- func (m *PublishResponse) ProtoSize() (n int)
- func (m *PublishResponse) Reset()
- func (m *PublishResponse) String() string
- func (m *PublishResponse) Unmarshal(dAtA []byte) error
- func (m *PublishResponse) XXX_DiscardUnknown()
- func (m *PublishResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *PublishResponse) XXX_Merge(src proto.Message)
- func (m *PublishResponse) XXX_Size() int
- func (m *PublishResponse) XXX_Unmarshal(b []byte) error
- type QueryRequest
- func (*QueryRequest) Descriptor() ([]byte, []int)
- func (m *QueryRequest) GetHeader() *protocol.Header
- func (m *QueryRequest) GetPrefix() NGram
- func (m *QueryRequest) GetShard() github_com_LiveRamp_gazette_v2_pkg_consumer.ShardID
- func (m *QueryRequest) Marshal() (dAtA []byte, err error)
- func (m *QueryRequest) MarshalTo(dAtA []byte) (int, error)
- func (*QueryRequest) ProtoMessage()
- func (m *QueryRequest) ProtoSize() (n int)
- func (m *QueryRequest) Reset()
- func (m *QueryRequest) String() string
- func (m *QueryRequest) Unmarshal(dAtA []byte) error
- func (m *QueryRequest) XXX_DiscardUnknown()
- func (m *QueryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *QueryRequest) XXX_Merge(src proto.Message)
- func (m *QueryRequest) XXX_Size() int
- func (m *QueryRequest) XXX_Unmarshal(b []byte) error
- type QueryResponse
- func (*QueryResponse) Descriptor() ([]byte, []int)
- func (m *QueryResponse) GetGrams() []NGramCount
- func (m *QueryResponse) Marshal() (dAtA []byte, err error)
- func (m *QueryResponse) MarshalTo(dAtA []byte) (int, error)
- func (*QueryResponse) ProtoMessage()
- func (m *QueryResponse) ProtoSize() (n int)
- func (m *QueryResponse) Reset()
- func (m *QueryResponse) String() string
- func (m *QueryResponse) Unmarshal(dAtA []byte) error
- func (m *QueryResponse) XXX_DiscardUnknown()
- func (m *QueryResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *QueryResponse) XXX_Merge(src proto.Message)
- func (m *QueryResponse) XXX_Size() int
- func (m *QueryResponse) XXX_Unmarshal(b []byte) error
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthWordCount = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowWordCount = fmt.Errorf("proto: integer overflow") )
Functions ¶
func RegisterNGramServer ¶
func RegisterNGramServer(s *grpc.Server, srv NGramServer)
Types ¶
type Counter ¶
type Counter struct {
// contains filtered or unexported fields
}
Counter consumes NGramCount messages and aggregates total counts of each NGram. It also provides gRPC APIs for publishing text and querying NGram counts. It implements the following interfaces: - runconsumer.Application - NGramServer (generated gRPC service stub).
func (Counter) ConsumeMessage ¶
func (Counter) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope) error
ConsumeMessage folds an NGramCount into its respective running NGram count. Implements consumer.Application.
func (Counter) FinalizeTxn ¶
FinalizeTxn marshals in-memory NGram counts to the |store|, ensuring persistence across consumer transactions. Implements consumer.Application.
func (*Counter) InitApplication ¶
func (counter *Counter) InitApplication(args runconsumer.InitArgs) error
InitModule initializes the application to serve the NGram gRPC service.
func (Counter) NewConfig ¶
func (Counter) NewConfig() runconsumer.Config
NewConfig returns a new configuration instance.
func (Counter) NewMessage ¶
NewMessage returns an NGramCount message. Implements consumer.Application.
func (Counter) NewStore ¶
func (Counter) NewStore(shard consumer.Shard, dir string, rec *recoverylog.Recorder) (consumer.Store, error)
NewStore builds a RocksDB store for the Shard. Implements consumer.Application.
func (*Counter) Publish ¶
func (counter *Counter) Publish(ctx context.Context, req *PublishRequest) (*PublishResponse, error)
Publish extracts NGrams of the configured length from the PublishRequest, and publishes an NGramCount message for each. It returns after all published messages have committed to their respective journals.
func (*Counter) Query ¶
func (counter *Counter) Query(ctx context.Context, req *QueryRequest) (resp *QueryResponse, err error)
Query a count for an NGram count (or counts for a prefix thereof). If the requested or imputed Shard does not resolve locally, Query will proxy the request to the responsible process.
type NGramClient ¶
type NGramClient interface { Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishResponse, error) Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) }
NGramClient is the client API for NGram service.
For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
func NewNGramClient ¶
func NewNGramClient(cc *grpc.ClientConn) NGramClient
type NGramCount ¶
type NGramCount struct { NGram NGram `protobuf:"bytes,1,opt,name=n_gram,json=nGram,proto3,casttype=NGram" json:"n_gram,omitempty"` Count uint64 `protobuf:"varint,2,opt,name=count,proto3" json:"count,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*NGramCount) Descriptor ¶
func (*NGramCount) Descriptor() ([]byte, []int)
func (*NGramCount) GetCount ¶
func (m *NGramCount) GetCount() uint64
func (*NGramCount) GetNGram ¶
func (m *NGramCount) GetNGram() NGram
func (*NGramCount) Marshal ¶
func (m *NGramCount) Marshal() (dAtA []byte, err error)
func (*NGramCount) ProtoMessage ¶
func (*NGramCount) ProtoMessage()
func (*NGramCount) ProtoSize ¶
func (m *NGramCount) ProtoSize() (n int)
func (*NGramCount) Reset ¶
func (m *NGramCount) Reset()
func (*NGramCount) String ¶
func (m *NGramCount) String() string
func (*NGramCount) Unmarshal ¶
func (m *NGramCount) Unmarshal(dAtA []byte) error
func (*NGramCount) XXX_DiscardUnknown ¶
func (m *NGramCount) XXX_DiscardUnknown()
func (*NGramCount) XXX_Marshal ¶
func (m *NGramCount) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*NGramCount) XXX_Merge ¶
func (dst *NGramCount) XXX_Merge(src proto.Message)
func (*NGramCount) XXX_Size ¶
func (m *NGramCount) XXX_Size() int
func (*NGramCount) XXX_Unmarshal ¶
func (m *NGramCount) XXX_Unmarshal(b []byte) error
type NGramServer ¶
type NGramServer interface { Publish(context.Context, *PublishRequest) (*PublishResponse, error) Query(context.Context, *QueryRequest) (*QueryResponse, error) }
NGramServer is the server API for NGram service.
type PublishRequest ¶
type PublishRequest struct { Text string `protobuf:"bytes,1,opt,name=text,proto3" json:"text,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*PublishRequest) Descriptor ¶
func (*PublishRequest) Descriptor() ([]byte, []int)
func (*PublishRequest) GetText ¶
func (m *PublishRequest) GetText() string
func (*PublishRequest) Marshal ¶
func (m *PublishRequest) Marshal() (dAtA []byte, err error)
func (*PublishRequest) ProtoMessage ¶
func (*PublishRequest) ProtoMessage()
func (*PublishRequest) ProtoSize ¶
func (m *PublishRequest) ProtoSize() (n int)
func (*PublishRequest) Reset ¶
func (m *PublishRequest) Reset()
func (*PublishRequest) String ¶
func (m *PublishRequest) String() string
func (*PublishRequest) Unmarshal ¶
func (m *PublishRequest) Unmarshal(dAtA []byte) error
func (*PublishRequest) XXX_DiscardUnknown ¶
func (m *PublishRequest) XXX_DiscardUnknown()
func (*PublishRequest) XXX_Marshal ¶
func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*PublishRequest) XXX_Merge ¶
func (dst *PublishRequest) XXX_Merge(src proto.Message)
func (*PublishRequest) XXX_Size ¶
func (m *PublishRequest) XXX_Size() int
func (*PublishRequest) XXX_Unmarshal ¶
func (m *PublishRequest) XXX_Unmarshal(b []byte) error
type PublishResponse ¶
type PublishResponse struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*PublishResponse) Descriptor ¶
func (*PublishResponse) Descriptor() ([]byte, []int)
func (*PublishResponse) Marshal ¶
func (m *PublishResponse) Marshal() (dAtA []byte, err error)
func (*PublishResponse) ProtoMessage ¶
func (*PublishResponse) ProtoMessage()
func (*PublishResponse) ProtoSize ¶
func (m *PublishResponse) ProtoSize() (n int)
func (*PublishResponse) Reset ¶
func (m *PublishResponse) Reset()
func (*PublishResponse) String ¶
func (m *PublishResponse) String() string
func (*PublishResponse) Unmarshal ¶
func (m *PublishResponse) Unmarshal(dAtA []byte) error
func (*PublishResponse) XXX_DiscardUnknown ¶
func (m *PublishResponse) XXX_DiscardUnknown()
func (*PublishResponse) XXX_Marshal ¶
func (m *PublishResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*PublishResponse) XXX_Merge ¶
func (dst *PublishResponse) XXX_Merge(src proto.Message)
func (*PublishResponse) XXX_Size ¶
func (m *PublishResponse) XXX_Size() int
func (*PublishResponse) XXX_Unmarshal ¶
func (m *PublishResponse) XXX_Unmarshal(b []byte) error
type QueryRequest ¶
type QueryRequest struct { // Header attached by a proxy-ing peer. Not directly set by clients. Header *protocol.Header `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` // NGram prefix to query. Prefix NGram `protobuf:"bytes,2,opt,name=prefix,proto3,casttype=NGram" json:"prefix,omitempty"` // Shard to query. Optional; if not set, shard is inferred from |prefix|'s current mapping. Shard github_com_LiveRamp_gazette_v2_pkg_consumer.ShardID `protobuf:"bytes,3,opt,name=shard,proto3,casttype=github.com/LiveRamp/gazette/v2/pkg/consumer.ShardID" json:"shard,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*QueryRequest) Descriptor ¶
func (*QueryRequest) Descriptor() ([]byte, []int)
func (*QueryRequest) GetHeader ¶
func (m *QueryRequest) GetHeader() *protocol.Header
func (*QueryRequest) GetPrefix ¶
func (m *QueryRequest) GetPrefix() NGram
func (*QueryRequest) GetShard ¶
func (m *QueryRequest) GetShard() github_com_LiveRamp_gazette_v2_pkg_consumer.ShardID
func (*QueryRequest) Marshal ¶
func (m *QueryRequest) Marshal() (dAtA []byte, err error)
func (*QueryRequest) ProtoMessage ¶
func (*QueryRequest) ProtoMessage()
func (*QueryRequest) ProtoSize ¶
func (m *QueryRequest) ProtoSize() (n int)
func (*QueryRequest) Reset ¶
func (m *QueryRequest) Reset()
func (*QueryRequest) String ¶
func (m *QueryRequest) String() string
func (*QueryRequest) Unmarshal ¶
func (m *QueryRequest) Unmarshal(dAtA []byte) error
func (*QueryRequest) XXX_DiscardUnknown ¶
func (m *QueryRequest) XXX_DiscardUnknown()
func (*QueryRequest) XXX_Marshal ¶
func (m *QueryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*QueryRequest) XXX_Merge ¶
func (dst *QueryRequest) XXX_Merge(src proto.Message)
func (*QueryRequest) XXX_Size ¶
func (m *QueryRequest) XXX_Size() int
func (*QueryRequest) XXX_Unmarshal ¶
func (m *QueryRequest) XXX_Unmarshal(b []byte) error
type QueryResponse ¶
type QueryResponse struct { Grams []NGramCount `protobuf:"bytes,1,rep,name=grams" json:"grams"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*QueryResponse) Descriptor ¶
func (*QueryResponse) Descriptor() ([]byte, []int)
func (*QueryResponse) GetGrams ¶
func (m *QueryResponse) GetGrams() []NGramCount
func (*QueryResponse) Marshal ¶
func (m *QueryResponse) Marshal() (dAtA []byte, err error)
func (*QueryResponse) ProtoMessage ¶
func (*QueryResponse) ProtoMessage()
func (*QueryResponse) ProtoSize ¶
func (m *QueryResponse) ProtoSize() (n int)
func (*QueryResponse) Reset ¶
func (m *QueryResponse) Reset()
func (*QueryResponse) String ¶
func (m *QueryResponse) String() string
func (*QueryResponse) Unmarshal ¶
func (m *QueryResponse) Unmarshal(dAtA []byte) error
func (*QueryResponse) XXX_DiscardUnknown ¶
func (m *QueryResponse) XXX_DiscardUnknown()
func (*QueryResponse) XXX_Marshal ¶
func (m *QueryResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*QueryResponse) XXX_Merge ¶
func (dst *QueryResponse) XXX_Merge(src proto.Message)
func (*QueryResponse) XXX_Size ¶
func (m *QueryResponse) XXX_Size() int
func (*QueryResponse) XXX_Unmarshal ¶
func (m *QueryResponse) XXX_Unmarshal(b []byte) error
Directories ¶
Path | Synopsis |
---|---|
Package counter runs the word_count.Counter consumer.
|
Package counter runs the word_count.Counter consumer. |