client

package
v2.0.201+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2019 License: MIT Imports: 20 Imported by: 45

Documentation

Overview

Package client provides implementation for clients of the broker API. Notably, it provides io.Reader and io.Writer implementations which map to respective broker RPCs. It also provides a routing BrokerClient which directs requests to known primary and/or same-zone brokers, reducing the number of proxy hops required and, where possible, keeping network traffic within a single availability zone.

Index

Constants

This section is empty.

Variables

View Source
var (
	// Map common broker error statuses into named errors.
	ErrNotJournalBroker        = errors.New(pb.Status_NOT_JOURNAL_BROKER.String())
	ErrNotJournalPrimaryBroker = errors.New(pb.Status_NOT_JOURNAL_PRIMARY_BROKER.String())
	ErrOffsetNotYetAvailable   = errors.New(pb.Status_OFFSET_NOT_YET_AVAILABLE.String())
	ErrWrongAppendOffset       = errors.New(pb.Status_WRONG_APPEND_OFFSET.String())

	ErrOffsetJump            = errors.New("offset jump")
	ErrSeekRequiresNewReader = errors.New("seek offset requires new Reader")
	ErrDidNotReadExpectedEOF = errors.New("did not read EOF at expected Fragment.End")
)

Functions

func Append

func Append(ctx context.Context, rjc pb.RoutedJournalClient, req pb.AppendRequest,
	content ...io.ReaderAt) (pb.AppendResponse, error)

Append zero or more ReaderAts of |content| to a journal as a single Append transaction. Append retries on transport or routing errors, but fails on all other errors. If no ReaderAts are provided, an Append RPC with no content is issued.

func ApplyJournals

func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error)

ApplyJournals invokes the Apply RPC, and maps a validation or !OK status to an error.

func InstallFileTransport

func InstallFileTransport(root string) (remove func())

InstallFileTransport registers a file:// protocol handler rooted at |root| with the http.Client used by OpenFragmentURL. The returned cleanup function removes the handler and restores the prior http.Client.

func ListAllFragments

func ListAllFragments(ctx context.Context, client pb.RoutedJournalClient, req pb.FragmentsRequest) (*pb.FragmentsResponse, error)

ListAllFragments performs multiple Fragments RPCs, as required to join across multiple FragmentsResponse pages, and returns the completed FragmentResponse. Any encountered error is returned.

func ListAllJournals

func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRequest) (*pb.ListResponse, error)

ListAllJournals performs multiple List RPCs, as required to join across multiple ListResponse pages, and returns the complete ListResponse of the ListRequest. Any encountered error is returned.

func WaitForPendingAppends

func WaitForPendingAppends(pending []*AsyncAppend)

WaitForPendingAppends blocks until all |pending| AsyncAppends have completed.

Types

type AppendService

type AppendService struct {
	pb.RoutedJournalClient
	// contains filtered or unexported fields
}

AppendService batches, dispatches, and (if needed) retries asynchronous Append RPCs. Use of an AppendService is appropriate for clients who make large numbers of small writes to a Journal, and where those writes may be pipelined and batched to amortize the cost of broker Append RPCs. It may also simplify implementations for clients who would prefer to simply have writes block until successfully committed, as opposed to handling errors and retries themselves. AppendService implements the AsyncJournalClient interface.

func NewAppendService

func NewAppendService(ctx context.Context, client pb.RoutedJournalClient) *AppendService

NewAppendService returns an AppendService with the provided Context and BrokerClient.

func (*AppendService) PendingExcept

func (s *AppendService) PendingExcept(except pb.Journal) []*AsyncAppend

PendingExcept implements the AsyncJournalClient interface.

func (*AppendService) StartAppend

func (s *AppendService) StartAppend(name pb.Journal, dependencies ...*AsyncAppend) *AsyncAppend

StartAppend implements the AsyncJournalClient interface.

type Appender

type Appender struct {
	Request  pb.AppendRequest  // AppendRequest of the Append.
	Response pb.AppendResponse // AppendResponse sent by broker.
	// contains filtered or unexported fields
}

Appender adapts an Append RPC to the io.WriteCloser interface. Its usages should be limited to cases where the full and complete buffer to append is already available and can be immediately dispatched as, by design, an in- progress RPC prevents the broker from serving other Append RPCs concurrently.

func NewAppender

func NewAppender(ctx context.Context, client pb.RoutedJournalClient, req pb.AppendRequest) *Appender

NewAppender returns an Appender initialized with the BrokerClient and AppendRequest.

func (*Appender) Abort

func (a *Appender) Abort()

Abort the write, causing the broker to discard previously written content.

func (*Appender) Close

func (a *Appender) Close() (err error)

Close the Append to complete the transaction, committing previously written content. If Close returns without an error, Append.Response will hold the broker response.

func (*Appender) Reset

func (a *Appender) Reset()

Reset the Appender to its post-construction state, allowing it to be re-used or re-tried.

func (*Appender) Write

func (a *Appender) Write(p []byte) (n int, err error)

type AsyncAppend

type AsyncAppend struct {
	// contains filtered or unexported fields
}

AsyncAppend is an asynchronous Append RPC.

func (*AsyncAppend) Done

func (p *AsyncAppend) Done() <-chan struct{}

Done returns a channel which selects when the AsyncAppend has committed.

func (*AsyncAppend) Release

func (p *AsyncAppend) Release() error

Release the AsyncAppend, allowing further writes to queue or for it to be dispatched to the brokers. Release first determines whether a previous Require failed, or if a Writer error occurred, in which case it will roll back all writes queued by the caller, aborting the append transaction, and return the non-nil error. A non-nil error is returned if and only if the Append was rolled back. Otherwise, the caller may then select on Done to determine when the AsyncAppend has committed and its Response may be examined.

func (*AsyncAppend) Request

func (p *AsyncAppend) Request() pb.AppendRequest

Request returns the AppendRequest that was or will be made by this AsyncAppend. Request is safe to call at all times.

func (*AsyncAppend) Require

func (p *AsyncAppend) Require(err error) *AsyncAppend

Require the error to be nil. If Require is called with a non-nil error, the error is retained and later returned by Release, in which case it will also roll back any writes queued by the caller, aborting the append transaction. Require is valid for use only until Release is called. Require returns itself, allowing uses like `Require(maybeErrors()).Release()`

func (*AsyncAppend) Response

func (p *AsyncAppend) Response() pb.AppendResponse

Response returns the AppendResponse from the broker. Response may be called only after calling BeginCommit and waiting for the returned channel to select.

func (*AsyncAppend) Writer

func (p *AsyncAppend) Writer() *bufio.Writer

Writer returns a bufio.Writer to which content may be appended. Writer is valid for use only until Release is called. Clients may ignore write errors of the Writer, preferring to "fire and forget" a sequence of writes which could fail: Release will itself Require that no error is set on the Writer.

type AsyncJournalClient

type AsyncJournalClient interface {
	pb.RoutedJournalClient

	// StartAppend begins a new asynchronous Append RPC. The caller holds exclusive access
	// to the returned AsyncAppend, and must then:
	//  * Write content to its Writer
	//  * Optionally Require that one or more errors are nil.
	//  * Release the AsyncAppend, allowing queued writes to commit or,
	//    if an error occurred, to roll back.
	//
	// For performance reasons, an Append will often be batched with other Appends
	// dispatched to this AppendService, and note the Response.Fragment will reflect
	// the entire batch written to the broker. In all cases, relative order of
	// Appends is preserved. One or more dependencies may optionally be supplied.
	// The Append RPC will not begin until all such dependencies have committed.
	// Dependencies must be ordered on applicable Journal name or StartAppend panics.
	// StartAppend may retain the slice, and it must not be subsequently modified.
	StartAppend(journal pb.Journal, dependencies ...*AsyncAppend) *AsyncAppend

	// PendingExcept returns a snapshot of the AsyncAppends being evaluated for all
	// Journals _other than_ |except|, ordered on Journal. It can be used to build
	// "barriers" which ensure that all pending writes commit prior to the
	// commencement of a write which is about to be issued. Eg, given:
	//
	//   var aa = as.StartAppend("target", as.PendingExcept("target")...)
	//   aa.Writer().WriteString("checkpoint")
	//   aa.Release()
	//
	// All ongoing appends to journals other than "target" are guaranteed to commit
	// before an Append RPC is begun which writes "checkpoint" to journal "target".
	// PendingExcept("") returns all pending AsyncAppends.
	PendingExcept(journal pb.Journal) []*AsyncAppend
}

AsyncJournalClient composes a RoutedJournalClient with an API for performing asynchronous Append operations.

type FragmentReader

type FragmentReader struct {
	pb.Fragment       // Fragment being read.
	Offset      int64 // Next journal offset to be read, in range [Begin, End).
	// contains filtered or unexported fields
}

FragmentReader is a io.ReadCloser of a Fragment.

func NewFragmentReader

func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*FragmentReader, error)

NewFragmentReader wraps |rc|, which is a io.ReadCloser of raw Fragment bytes, with a returned *FragmentReader which has been pre-seeked to |offset|.

func OpenFragmentURL

func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, url string) (*FragmentReader, error)

OpenFragmentURL directly opens |fragment|, which must be available at URL |url|, and returns a *FragmentReader which has been pre-seeked to |offset|.

func (*FragmentReader) Close

func (fr *FragmentReader) Close() error

Close closes the underlying ReadCloser and associated decompressor (if any).

func (*FragmentReader) Read

func (fr *FragmentReader) Read(p []byte) (n int, err error)

Read returns the next bytes of decompressed Fragment content. When Read returns, Offset has been updated to reflect the next byte to be read. Read returns EOF only if the underlying Reader returns EOF at precisely Offset == Fragment.End. If the underlying Reader is too short, io.ErrUnexpectedEOF is returned. If it's too long, ErrDidNotReadExpectedEOF is returned.

type PolledList

type PolledList struct {
	// contains filtered or unexported fields
}

PolledList performs periodic polls of a ListRequest. Its most recent polled result may be accessed via List.

func NewPolledList

func NewPolledList(ctx context.Context, client pb.JournalClient, dur time.Duration, req pb.ListRequest) (*PolledList, error)

NewPolledList returns a PolledList of the ListRequest which is initialized and ready for immediate use, and which will regularly refresh with interval |dur|. An error encountered in the first List RPC is returned. Subsequent RPC errors will be logged as warnings and retried as part of regular refreshes.

func (*PolledList) List

func (pl *PolledList) List() *pb.ListResponse

List returns the most recent ListResponse.

type Reader

type Reader struct {
	Request  pb.ReadRequest  // ReadRequest of the Reader.
	Response pb.ReadResponse // Most recent ReadResponse from broker.
	// contains filtered or unexported fields
}

Reader adapts a Read RPC to the io.Reader interface. It additionally supports directly reading Fragment URLs advertised but not proxied by the broker (eg, because DoNotProxy of the ReadRequest is true). A Reader is invalidated by its first returned error, with the exception of ErrOffsetJump: this error is returned to notify the client that the next Journal offset to be Read is not the offset that was requested, but the Reader is prepared to continue at the updated offset.

func NewReader

func NewReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRequest) *Reader

NewReader returns a Reader initialized with the given BrokerClient and ReadRequest.

func (*Reader) AdjustedOffset

func (r *Reader) AdjustedOffset(br *bufio.Reader) int64

AdjustedOffset returns the current journal offset, adjusted for content read by |br| (which wraps this Reader) but not yet consumed from |br|'s buffer.

func (*Reader) Read

func (r *Reader) Read(p []byte) (n int, err error)

func (*Reader) Seek

func (r *Reader) Seek(offset int64, whence int) (int64, error)

Seek provides a limited form of seeking support. Specifically, iff a Fragment URL is being directly read, the Seek offset is ahead of the current Reader offset, and the Fragment also covers the desired Seek offset, then a seek is performed by reading and discarding to the seeked offset. Seek will otherwise return ErrSeekRequiresNewReader.

type RetryReader

type RetryReader struct {
	// Reader is the current underlying Reader of the RetryReader. This instance
	// may change many times over the lifetime of a RetryReader, as Read RPCs
	// finish or are cancelled and then restarted.
	Reader *Reader
	// Cancel Read operations of the current Reader. Notably this will cause an
	// ongoing blocked Read (as well as any future Reads) to return a "Cancelled"
	// error. Restart may be called to re-initialize the RetryReader.
	Cancel context.CancelFunc
	// contains filtered or unexported fields
}

RetryReader wraps Reader with error handling and retry behavior, as well as support for cancellation of an ongoing Read or Seek operation. RetryReader is not thread-safe, with one exception: Cancel may be called from one goroutine to abort an ongoing Read or Seek call in another.

func NewRetryReader

func NewRetryReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRequest) *RetryReader

NewRetryReader returns a RetryReader initialized with the BrokerClient and ReadRequest.

func (*RetryReader) AdjustedOffset

func (rr *RetryReader) AdjustedOffset(br *bufio.Reader) int64

AdjustedOffset returns the current journal offset, adjusted for content read by |br| (which wraps this RetryReader) but not yet consumed from |br|'s buffer.

func (*RetryReader) AdjustedSeek

func (rr *RetryReader) AdjustedSeek(offset int64, whence int, br *bufio.Reader) (int64, error)

AdjustedSeek sets the offset for the next Read, accounting for buffered data and updating the buffer as needed.

func (*RetryReader) Journal

func (rr *RetryReader) Journal() pb.Journal

Journal being read by this RetryReader.

func (*RetryReader) Offset

func (rr *RetryReader) Offset() int64

Offset of the next Journal byte to be returned by Read.

func (*RetryReader) Read

func (rr *RetryReader) Read(p []byte) (n int, err error)

Read returns the next bytes of journal content. It will return a non-nil error in the following cases:

  • Cancel is called, or the RetryReader context is cancelled.
  • The broker returns OFFSET_NOT_YET_AVAILABLE (ErrOffsetNotYetAvailable) for a non-blocking ReadRequest.
  • An offset jump occurred (ErrOffsetJump), in which case the client should inspect the new Offset may continue reading if desired.

All other errors are retried.

func (*RetryReader) Restart

func (rr *RetryReader) Restart(req pb.ReadRequest)

Restart the RetryReader with a new ReadRequest.

func (*RetryReader) Seek

func (rr *RetryReader) Seek(offset int64, whence int) (int64, error)

Seek sets the offset for the next Read. It returns an error if (and only if) |whence| is io.SeekEnd, which is not supported.

type RouteCache

type RouteCache struct {
	// contains filtered or unexported fields
}

RouteCache caches observed Routes for JournalSpecs (and consumer ShardSpecs, or any other allocator.Item)

func NewRouteCache

func NewRouteCache(size int, ttl time.Duration) *RouteCache

NewRouteCache returns a RouteCache of the given size (which must be > 0) and caching Duration.

func (*RouteCache) IsNoopRouter

func (rc *RouteCache) IsNoopRouter() bool

IsNoopRouter returns false.

func (*RouteCache) Route

func (rc *RouteCache) Route(ctx context.Context, item string) pb.Route

Route queries for a cached Route of |item|.

func (*RouteCache) UpdateRoute

func (rc *RouteCache) UpdateRoute(item string, route *pb.Route)

UpdateRoute caches the provided Route for |item|, or invalidates it if |route| is nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL