word_count

package
v2.0.200+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2019 License: MIT Imports: 21 Imported by: 10

Documentation

Overview

Package word_count is an example application which provides a gRPC API for publishing texts and querying running counts of NGrams extracted from previously published texts.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthWordCount = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowWordCount   = fmt.Errorf("proto: integer overflow")
)

Functions

func RegisterNGramServer

func RegisterNGramServer(s *grpc.Server, srv NGramServer)

Types

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

Counter consumes NGramCount messages and aggregates total counts of each NGram. It also provides gRPC APIs for publishing text and querying NGram counts. It implements the following interfaces: - runconsumer.Application - NGramServer (generated gRPC service stub).

func (Counter) ConsumeMessage

func (Counter) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope) error

ConsumeMessage folds an NGramCount into its respective running NGram count. Implements consumer.Application.

func (Counter) FinalizeTxn

func (Counter) FinalizeTxn(shard consumer.Shard, store consumer.Store) error

FinalizeTxn marshals in-memory NGram counts to the |store|, ensuring persistence across consumer transactions. Implements consumer.Application.

func (*Counter) InitApplication

func (counter *Counter) InitApplication(args runconsumer.InitArgs) error

InitModule initializes the application to serve the NGram gRPC service.

func (Counter) NewConfig

func (Counter) NewConfig() runconsumer.Config

NewConfig returns a new configuration instance.

func (Counter) NewMessage

func (Counter) NewMessage(*pb.JournalSpec) (message.Message, error)

NewMessage returns an NGramCount message. Implements consumer.Application.

func (Counter) NewStore

func (Counter) NewStore(shard consumer.Shard, dir string, rec *recoverylog.Recorder) (consumer.Store, error)

NewStore builds a RocksDB store for the Shard. Implements consumer.Application.

func (*Counter) Publish

func (counter *Counter) Publish(ctx context.Context, req *PublishRequest) (*PublishResponse, error)

Publish extracts NGrams of the configured length from the PublishRequest, and publishes an NGramCount message for each. It returns after all published messages have committed to their respective journals.

func (*Counter) Query

func (counter *Counter) Query(ctx context.Context, req *QueryRequest) (resp *QueryResponse, err error)

Query a count for an NGram count (or counts for a prefix thereof). If the requested or imputed Shard does not resolve locally, Query will proxy the request to the responsible process.

type NGram

type NGram string

NGram is a string of N space-delimited tokens, where N is fixed.

type NGramClient

type NGramClient interface {
	Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishResponse, error)
	Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error)
}

NGramClient is the client API for NGram service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.

func NewNGramClient

func NewNGramClient(cc *grpc.ClientConn) NGramClient

type NGramCount

type NGramCount struct {
	NGram                NGram    `protobuf:"bytes,1,opt,name=n_gram,json=nGram,proto3,casttype=NGram" json:"n_gram,omitempty"`
	Count                uint64   `protobuf:"varint,2,opt,name=count,proto3" json:"count,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*NGramCount) Descriptor

func (*NGramCount) Descriptor() ([]byte, []int)

func (*NGramCount) GetCount

func (m *NGramCount) GetCount() uint64

func (*NGramCount) GetNGram

func (m *NGramCount) GetNGram() NGram

func (*NGramCount) Marshal

func (m *NGramCount) Marshal() (dAtA []byte, err error)

func (*NGramCount) MarshalTo

func (m *NGramCount) MarshalTo(dAtA []byte) (int, error)

func (*NGramCount) ProtoMessage

func (*NGramCount) ProtoMessage()

func (*NGramCount) ProtoSize

func (m *NGramCount) ProtoSize() (n int)

func (*NGramCount) Reset

func (m *NGramCount) Reset()

func (*NGramCount) String

func (m *NGramCount) String() string

func (*NGramCount) Unmarshal

func (m *NGramCount) Unmarshal(dAtA []byte) error

func (*NGramCount) XXX_DiscardUnknown

func (m *NGramCount) XXX_DiscardUnknown()

func (*NGramCount) XXX_Marshal

func (m *NGramCount) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*NGramCount) XXX_Merge

func (dst *NGramCount) XXX_Merge(src proto.Message)

func (*NGramCount) XXX_Size

func (m *NGramCount) XXX_Size() int

func (*NGramCount) XXX_Unmarshal

func (m *NGramCount) XXX_Unmarshal(b []byte) error

type NGramServer

type NGramServer interface {
	Publish(context.Context, *PublishRequest) (*PublishResponse, error)
	Query(context.Context, *QueryRequest) (*QueryResponse, error)
}

NGramServer is the server API for NGram service.

type PublishRequest

type PublishRequest struct {
	Text                 string   `protobuf:"bytes,1,opt,name=text,proto3" json:"text,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*PublishRequest) Descriptor

func (*PublishRequest) Descriptor() ([]byte, []int)

func (*PublishRequest) GetText

func (m *PublishRequest) GetText() string

func (*PublishRequest) Marshal

func (m *PublishRequest) Marshal() (dAtA []byte, err error)

func (*PublishRequest) MarshalTo

func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error)

func (*PublishRequest) ProtoMessage

func (*PublishRequest) ProtoMessage()

func (*PublishRequest) ProtoSize

func (m *PublishRequest) ProtoSize() (n int)

func (*PublishRequest) Reset

func (m *PublishRequest) Reset()

func (*PublishRequest) String

func (m *PublishRequest) String() string

func (*PublishRequest) Unmarshal

func (m *PublishRequest) Unmarshal(dAtA []byte) error

func (*PublishRequest) XXX_DiscardUnknown

func (m *PublishRequest) XXX_DiscardUnknown()

func (*PublishRequest) XXX_Marshal

func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*PublishRequest) XXX_Merge

func (dst *PublishRequest) XXX_Merge(src proto.Message)

func (*PublishRequest) XXX_Size

func (m *PublishRequest) XXX_Size() int

func (*PublishRequest) XXX_Unmarshal

func (m *PublishRequest) XXX_Unmarshal(b []byte) error

type PublishResponse

type PublishResponse struct {
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*PublishResponse) Descriptor

func (*PublishResponse) Descriptor() ([]byte, []int)

func (*PublishResponse) Marshal

func (m *PublishResponse) Marshal() (dAtA []byte, err error)

func (*PublishResponse) MarshalTo

func (m *PublishResponse) MarshalTo(dAtA []byte) (int, error)

func (*PublishResponse) ProtoMessage

func (*PublishResponse) ProtoMessage()

func (*PublishResponse) ProtoSize

func (m *PublishResponse) ProtoSize() (n int)

func (*PublishResponse) Reset

func (m *PublishResponse) Reset()

func (*PublishResponse) String

func (m *PublishResponse) String() string

func (*PublishResponse) Unmarshal

func (m *PublishResponse) Unmarshal(dAtA []byte) error

func (*PublishResponse) XXX_DiscardUnknown

func (m *PublishResponse) XXX_DiscardUnknown()

func (*PublishResponse) XXX_Marshal

func (m *PublishResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*PublishResponse) XXX_Merge

func (dst *PublishResponse) XXX_Merge(src proto.Message)

func (*PublishResponse) XXX_Size

func (m *PublishResponse) XXX_Size() int

func (*PublishResponse) XXX_Unmarshal

func (m *PublishResponse) XXX_Unmarshal(b []byte) error

type QueryRequest

type QueryRequest struct {
	// Header attached by a proxy-ing peer. Not directly set by clients.
	Header *protocol.Header `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
	// NGram prefix to query.
	Prefix NGram `protobuf:"bytes,2,opt,name=prefix,proto3,casttype=NGram" json:"prefix,omitempty"`
	// Shard to query. Optional; if not set, shard is inferred from |prefix|'s current mapping.
	Shard                github_com_LiveRamp_gazette_v2_pkg_consumer.ShardID `protobuf:"bytes,3,opt,name=shard,proto3,casttype=github.com/LiveRamp/gazette/v2/pkg/consumer.ShardID" json:"shard,omitempty"`
	XXX_NoUnkeyedLiteral struct{}                                            `json:"-"`
	XXX_unrecognized     []byte                                              `json:"-"`
	XXX_sizecache        int32                                               `json:"-"`
}

func (*QueryRequest) Descriptor

func (*QueryRequest) Descriptor() ([]byte, []int)

func (*QueryRequest) GetHeader

func (m *QueryRequest) GetHeader() *protocol.Header

func (*QueryRequest) GetPrefix

func (m *QueryRequest) GetPrefix() NGram

func (*QueryRequest) GetShard

func (*QueryRequest) Marshal

func (m *QueryRequest) Marshal() (dAtA []byte, err error)

func (*QueryRequest) MarshalTo

func (m *QueryRequest) MarshalTo(dAtA []byte) (int, error)

func (*QueryRequest) ProtoMessage

func (*QueryRequest) ProtoMessage()

func (*QueryRequest) ProtoSize

func (m *QueryRequest) ProtoSize() (n int)

func (*QueryRequest) Reset

func (m *QueryRequest) Reset()

func (*QueryRequest) String

func (m *QueryRequest) String() string

func (*QueryRequest) Unmarshal

func (m *QueryRequest) Unmarshal(dAtA []byte) error

func (*QueryRequest) XXX_DiscardUnknown

func (m *QueryRequest) XXX_DiscardUnknown()

func (*QueryRequest) XXX_Marshal

func (m *QueryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*QueryRequest) XXX_Merge

func (dst *QueryRequest) XXX_Merge(src proto.Message)

func (*QueryRequest) XXX_Size

func (m *QueryRequest) XXX_Size() int

func (*QueryRequest) XXX_Unmarshal

func (m *QueryRequest) XXX_Unmarshal(b []byte) error

type QueryResponse

type QueryResponse struct {
	Grams                []NGramCount `protobuf:"bytes,1,rep,name=grams" json:"grams"`
	XXX_NoUnkeyedLiteral struct{}     `json:"-"`
	XXX_unrecognized     []byte       `json:"-"`
	XXX_sizecache        int32        `json:"-"`
}

func (*QueryResponse) Descriptor

func (*QueryResponse) Descriptor() ([]byte, []int)

func (*QueryResponse) GetGrams

func (m *QueryResponse) GetGrams() []NGramCount

func (*QueryResponse) Marshal

func (m *QueryResponse) Marshal() (dAtA []byte, err error)

func (*QueryResponse) MarshalTo

func (m *QueryResponse) MarshalTo(dAtA []byte) (int, error)

func (*QueryResponse) ProtoMessage

func (*QueryResponse) ProtoMessage()

func (*QueryResponse) ProtoSize

func (m *QueryResponse) ProtoSize() (n int)

func (*QueryResponse) Reset

func (m *QueryResponse) Reset()

func (*QueryResponse) String

func (m *QueryResponse) String() string

func (*QueryResponse) Unmarshal

func (m *QueryResponse) Unmarshal(dAtA []byte) error

func (*QueryResponse) XXX_DiscardUnknown

func (m *QueryResponse) XXX_DiscardUnknown()

func (*QueryResponse) XXX_Marshal

func (m *QueryResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*QueryResponse) XXX_Merge

func (dst *QueryResponse) XXX_Merge(src proto.Message)

func (*QueryResponse) XXX_Size

func (m *QueryResponse) XXX_Size() int

func (*QueryResponse) XXX_Unmarshal

func (m *QueryResponse) XXX_Unmarshal(b []byte) error

Directories

Path Synopsis
Package counter runs the word_count.Counter consumer.
Package counter runs the word_count.Counter consumer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL