Documentation ¶
Overview ¶
Package client provides implementation for clients of the broker API. Notably, it provides io.Reader and io.Writer implementations which map to respective broker RPCs. It also provides a routing BrokerClient which directs requests to known primary and/or same-zone brokers, reducing the number of proxy hops required and, where possible, keeping network traffic within a single availability zone.
Index ¶
- Variables
- func Append(ctx context.Context, rjc pb.RoutedJournalClient, req pb.AppendRequest, ...) (pb.AppendResponse, error)
- func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error)
- func InstallFileTransport(root string) (remove func())
- func ListAllFragments(ctx context.Context, client pb.RoutedJournalClient, req pb.FragmentsRequest) (*pb.FragmentsResponse, error)
- func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRequest) (*pb.ListResponse, error)
- func WaitForPendingAppends(pending []*AsyncAppend)
- type AppendService
- type Appender
- type AsyncAppend
- type AsyncJournalClient
- type FragmentReader
- type PolledList
- type Reader
- type RetryReader
- func (rr *RetryReader) AdjustedOffset(br *bufio.Reader) int64
- func (rr *RetryReader) AdjustedSeek(offset int64, whence int, br *bufio.Reader) (int64, error)
- func (rr *RetryReader) Journal() pb.Journal
- func (rr *RetryReader) Offset() int64
- func (rr *RetryReader) Read(p []byte) (n int, err error)
- func (rr *RetryReader) Restart(req pb.ReadRequest)
- func (rr *RetryReader) Seek(offset int64, whence int) (int64, error)
- type RouteCache
Constants ¶
This section is empty.
Variables ¶
var ( // Map common broker error statuses into named errors. ErrNotJournalBroker = errors.New(pb.Status_NOT_JOURNAL_BROKER.String()) ErrNotJournalPrimaryBroker = errors.New(pb.Status_NOT_JOURNAL_PRIMARY_BROKER.String()) ErrOffsetNotYetAvailable = errors.New(pb.Status_OFFSET_NOT_YET_AVAILABLE.String()) ErrWrongAppendOffset = errors.New(pb.Status_WRONG_APPEND_OFFSET.String()) ErrOffsetJump = errors.New("offset jump") ErrSeekRequiresNewReader = errors.New("seek offset requires new Reader") ErrDidNotReadExpectedEOF = errors.New("did not read EOF at expected Fragment.End") )
Functions ¶
func Append ¶
func Append(ctx context.Context, rjc pb.RoutedJournalClient, req pb.AppendRequest, content ...io.ReaderAt) (pb.AppendResponse, error)
Append zero or more ReaderAts of |content| to a journal as a single Append transaction. Append retries on transport or routing errors, but fails on all other errors. If no ReaderAts are provided, an Append RPC with no content is issued.
func ApplyJournals ¶
func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error)
ApplyJournals invokes the Apply RPC, and maps a validation or !OK status to an error.
func InstallFileTransport ¶
func InstallFileTransport(root string) (remove func())
InstallFileTransport registers a file:// protocol handler rooted at |root| with the http.Client used by OpenFragmentURL. The returned cleanup function removes the handler and restores the prior http.Client.
func ListAllFragments ¶
func ListAllFragments(ctx context.Context, client pb.RoutedJournalClient, req pb.FragmentsRequest) (*pb.FragmentsResponse, error)
ListAllFragments performs multiple Fragments RPCs, as required to join across multiple FragmentsResponse pages, and returns the completed FragmentResponse. Any encountered error is returned.
func ListAllJournals ¶
func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRequest) (*pb.ListResponse, error)
ListAllJournals performs multiple List RPCs, as required to join across multiple ListResponse pages, and returns the complete ListResponse of the ListRequest. Any encountered error is returned.
func WaitForPendingAppends ¶
func WaitForPendingAppends(pending []*AsyncAppend)
WaitForPendingAppends blocks until all |pending| AsyncAppends have completed.
Types ¶
type AppendService ¶
type AppendService struct { pb.RoutedJournalClient // contains filtered or unexported fields }
AppendService batches, dispatches, and (if needed) retries asynchronous Append RPCs. Use of an AppendService is appropriate for clients who make large numbers of small writes to a Journal, and where those writes may be pipelined and batched to amortize the cost of broker Append RPCs. It may also simplify implementations for clients who would prefer to simply have writes block until successfully committed, as opposed to handling errors and retries themselves. AppendService implements the AsyncJournalClient interface.
func NewAppendService ¶
func NewAppendService(ctx context.Context, client pb.RoutedJournalClient) *AppendService
NewAppendService returns an AppendService with the provided Context and BrokerClient.
func (*AppendService) PendingExcept ¶
func (s *AppendService) PendingExcept(except pb.Journal) []*AsyncAppend
PendingExcept implements the AsyncJournalClient interface.
func (*AppendService) StartAppend ¶
func (s *AppendService) StartAppend(name pb.Journal, dependencies ...*AsyncAppend) *AsyncAppend
StartAppend implements the AsyncJournalClient interface.
type Appender ¶
type Appender struct { Request pb.AppendRequest // AppendRequest of the Append. Response pb.AppendResponse // AppendResponse sent by broker. // contains filtered or unexported fields }
Appender adapts an Append RPC to the io.WriteCloser interface. Its usages should be limited to cases where the full and complete buffer to append is already available and can be immediately dispatched as, by design, an in- progress RPC prevents the broker from serving other Append RPCs concurrently.
func NewAppender ¶
func NewAppender(ctx context.Context, client pb.RoutedJournalClient, req pb.AppendRequest) *Appender
NewAppender returns an Appender initialized with the BrokerClient and AppendRequest.
func (*Appender) Abort ¶
func (a *Appender) Abort()
Abort the write, causing the broker to discard previously written content.
func (*Appender) Close ¶
Close the Append to complete the transaction, committing previously written content. If Close returns without an error, Append.Response will hold the broker response.
type AsyncAppend ¶
type AsyncAppend struct {
// contains filtered or unexported fields
}
AsyncAppend is an asynchronous Append RPC.
func (*AsyncAppend) Done ¶
func (p *AsyncAppend) Done() <-chan struct{}
Done returns a channel which selects when the AsyncAppend has committed.
func (*AsyncAppend) Release ¶
func (p *AsyncAppend) Release() error
Release the AsyncAppend, allowing further writes to queue or for it to be dispatched to the brokers. Release first determines whether a previous Require failed, or if a Writer error occurred, in which case it will roll back all writes queued by the caller, aborting the append transaction, and return the non-nil error. A non-nil error is returned if and only if the Append was rolled back. Otherwise, the caller may then select on Done to determine when the AsyncAppend has committed and its Response may be examined.
func (*AsyncAppend) Request ¶
func (p *AsyncAppend) Request() pb.AppendRequest
Request returns the AppendRequest that was or will be made by this AsyncAppend. Request is safe to call at all times.
func (*AsyncAppend) Require ¶
func (p *AsyncAppend) Require(err error) *AsyncAppend
Require the error to be nil. If Require is called with a non-nil error, the error is retained and later returned by Release, in which case it will also roll back any writes queued by the caller, aborting the append transaction. Require is valid for use only until Release is called. Require returns itself, allowing uses like `Require(maybeErrors()).Release()`
func (*AsyncAppend) Response ¶
func (p *AsyncAppend) Response() pb.AppendResponse
Response returns the AppendResponse from the broker. Response may be called only after calling BeginCommit and waiting for the returned channel to select.
func (*AsyncAppend) Writer ¶
func (p *AsyncAppend) Writer() *bufio.Writer
Writer returns a bufio.Writer to which content may be appended. Writer is valid for use only until Release is called. Clients may ignore write errors of the Writer, preferring to "fire and forget" a sequence of writes which could fail: Release will itself Require that no error is set on the Writer.
type AsyncJournalClient ¶
type AsyncJournalClient interface { pb.RoutedJournalClient // StartAppend begins a new asynchronous Append RPC. The caller holds exclusive access // to the returned AsyncAppend, and must then: // * Write content to its Writer // * Optionally Require that one or more errors are nil. // * Release the AsyncAppend, allowing queued writes to commit or, // if an error occurred, to roll back. // // For performance reasons, an Append will often be batched with other Appends // dispatched to this AppendService, and note the Response.Fragment will reflect // the entire batch written to the broker. In all cases, relative order of // Appends is preserved. One or more dependencies may optionally be supplied. // The Append RPC will not begin until all such dependencies have committed. // Dependencies must be ordered on applicable Journal name or StartAppend panics. // StartAppend may retain the slice, and it must not be subsequently modified. StartAppend(journal pb.Journal, dependencies ...*AsyncAppend) *AsyncAppend // PendingExcept returns a snapshot of the AsyncAppends being evaluated for all // Journals _other than_ |except|, ordered on Journal. It can be used to build // "barriers" which ensure that all pending writes commit prior to the // commencement of a write which is about to be issued. Eg, given: // // var aa = as.StartAppend("target", as.PendingExcept("target")...) // aa.Writer().WriteString("checkpoint") // aa.Release() // // All ongoing appends to journals other than "target" are guaranteed to commit // before an Append RPC is begun which writes "checkpoint" to journal "target". // PendingExcept("") returns all pending AsyncAppends. PendingExcept(journal pb.Journal) []*AsyncAppend }
AsyncJournalClient composes a RoutedJournalClient with an API for performing asynchronous Append operations.
type FragmentReader ¶
type FragmentReader struct { pb.Fragment // Fragment being read. Offset int64 // Next journal offset to be read, in range [Begin, End). // contains filtered or unexported fields }
FragmentReader is a io.ReadCloser of a Fragment.
func NewFragmentReader ¶
func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*FragmentReader, error)
NewFragmentReader wraps |rc|, which is a io.ReadCloser of raw Fragment bytes, with a returned *FragmentReader which has been pre-seeked to |offset|.
func OpenFragmentURL ¶
func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, url string) (*FragmentReader, error)
OpenFragmentURL directly opens |fragment|, which must be available at URL |url|, and returns a *FragmentReader which has been pre-seeked to |offset|.
func (*FragmentReader) Close ¶
func (fr *FragmentReader) Close() error
Close closes the underlying ReadCloser and associated decompressor (if any).
func (*FragmentReader) Read ¶
func (fr *FragmentReader) Read(p []byte) (n int, err error)
Read returns the next bytes of decompressed Fragment content. When Read returns, Offset has been updated to reflect the next byte to be read. Read returns EOF only if the underlying Reader returns EOF at precisely Offset == Fragment.End. If the underlying Reader is too short, io.ErrUnexpectedEOF is returned. If it's too long, ErrDidNotReadExpectedEOF is returned.
type PolledList ¶
type PolledList struct {
// contains filtered or unexported fields
}
PolledList performs periodic polls of a ListRequest. Its most recent polled result may be accessed via List.
func NewPolledList ¶
func NewPolledList(ctx context.Context, client pb.JournalClient, dur time.Duration, req pb.ListRequest) (*PolledList, error)
NewPolledList returns a PolledList of the ListRequest which is initialized and ready for immediate use, and which will regularly refresh with interval |dur|. An error encountered in the first List RPC is returned. Subsequent RPC errors will be logged as warnings and retried as part of regular refreshes.
func (*PolledList) List ¶
func (pl *PolledList) List() *pb.ListResponse
List returns the most recent ListResponse.
type Reader ¶
type Reader struct { Request pb.ReadRequest // ReadRequest of the Reader. Response pb.ReadResponse // Most recent ReadResponse from broker. // contains filtered or unexported fields }
Reader adapts a Read RPC to the io.Reader interface. It additionally supports directly reading Fragment URLs advertised but not proxied by the broker (eg, because DoNotProxy of the ReadRequest is true). A Reader is invalidated by its first returned error, with the exception of ErrOffsetJump: this error is returned to notify the client that the next Journal offset to be Read is not the offset that was requested, but the Reader is prepared to continue at the updated offset.
func NewReader ¶
func NewReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRequest) *Reader
NewReader returns a Reader initialized with the given BrokerClient and ReadRequest.
func (*Reader) AdjustedOffset ¶
AdjustedOffset returns the current journal offset, adjusted for content read by |br| (which wraps this Reader) but not yet consumed from |br|'s buffer.
func (*Reader) Seek ¶
Seek provides a limited form of seeking support. Specifically, iff a Fragment URL is being directly read, the Seek offset is ahead of the current Reader offset, and the Fragment also covers the desired Seek offset, then a seek is performed by reading and discarding to the seeked offset. Seek will otherwise return ErrSeekRequiresNewReader.
type RetryReader ¶
type RetryReader struct { // Reader is the current underlying Reader of the RetryReader. This instance // may change many times over the lifetime of a RetryReader, as Read RPCs // finish or are cancelled and then restarted. Reader *Reader // Cancel Read operations of the current Reader. Notably this will cause an // ongoing blocked Read (as well as any future Reads) to return a "Cancelled" // error. Restart may be called to re-initialize the RetryReader. Cancel context.CancelFunc // contains filtered or unexported fields }
RetryReader wraps Reader with error handling and retry behavior, as well as support for cancellation of an ongoing Read or Seek operation. RetryReader is not thread-safe, with one exception: Cancel may be called from one goroutine to abort an ongoing Read or Seek call in another.
func NewRetryReader ¶
func NewRetryReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRequest) *RetryReader
NewRetryReader returns a RetryReader initialized with the BrokerClient and ReadRequest.
func (*RetryReader) AdjustedOffset ¶
func (rr *RetryReader) AdjustedOffset(br *bufio.Reader) int64
AdjustedOffset returns the current journal offset, adjusted for content read by |br| (which wraps this RetryReader) but not yet consumed from |br|'s buffer.
func (*RetryReader) AdjustedSeek ¶
AdjustedSeek sets the offset for the next Read, accounting for buffered data and updating the buffer as needed.
func (*RetryReader) Journal ¶
func (rr *RetryReader) Journal() pb.Journal
Journal being read by this RetryReader.
func (*RetryReader) Offset ¶
func (rr *RetryReader) Offset() int64
Offset of the next Journal byte to be returned by Read.
func (*RetryReader) Read ¶
func (rr *RetryReader) Read(p []byte) (n int, err error)
Read returns the next bytes of journal content. It will return a non-nil error in the following cases:
- Cancel is called, or the RetryReader context is cancelled.
- The broker returns OFFSET_NOT_YET_AVAILABLE (ErrOffsetNotYetAvailable) for a non-blocking ReadRequest.
- An offset jump occurred (ErrOffsetJump), in which case the client should inspect the new Offset may continue reading if desired.
All other errors are retried.
func (*RetryReader) Restart ¶
func (rr *RetryReader) Restart(req pb.ReadRequest)
Restart the RetryReader with a new ReadRequest.
type RouteCache ¶
type RouteCache struct {
// contains filtered or unexported fields
}
RouteCache caches observed Routes for JournalSpecs (and consumer ShardSpecs, or any other allocator.Item)
func NewRouteCache ¶
func NewRouteCache(size int, ttl time.Duration) *RouteCache
NewRouteCache returns a RouteCache of the given size (which must be > 0) and caching Duration.
func (*RouteCache) IsNoopRouter ¶
func (rc *RouteCache) IsNoopRouter() bool
IsNoopRouter returns false.
func (*RouteCache) UpdateRoute ¶
func (rc *RouteCache) UpdateRoute(item string, route *pb.Route)
UpdateRoute caches the provided Route for |item|, or invalidates it if |route| is nil.