Documentation ¶
Overview ¶
Package client implements a Go client for interacting with the gRPC Journal service of Gazette brokers. It concerns itself with operations over journal specifications, fragments and byte-streams. See package message for an implementation of messages layered atop journal streams.
The package provides Reader and RetryReader, which adapt the broker Read RPC to the io.Reader interface. Reader will utilize just one Read RPC, while RetryReader will silently restart Read RPCs as needed:
// Copy a journal byte range to os.Stdout. io.Copy(os.Stdout, NewReader(ctx, client, pb.ReadRequest{ Journal: "a/journal/name", Offset: 1234, EndOffset: 5678, }))
It provides Appender, which adapts the Append RPC to a io.WriteCloser:
// Copy os.Stdin to the journal. var a = NewAppender(ctx, client, pb.AppendRequest{ Journal: "a/journal/name", }) if err = io.Copy(a, os.Stdin); err == nil { err = a.Close() // Commit the append. }
Gazette appends are linearizable (atomic) per journal. Appender streams content to brokers as its written, but no content of an Appender will be visible to any reader until Close is called and succeeds. An implication of this is that once brokers have begun to sequence an append into a journal, they expect the remaining content and Close of that Appender to be forthcoming, and will quickly time it out if it stalls. Uses of Appender should thus be limited to cases where its full content is readily available.
Most clients should instead use an AppendService. It offers automatic retries, an asynchronous API, and supports constraints on the ordering of appends with respect to other ongoing operations (ie, "append to journal Foo, but not before this append to journal Bar completes"). It also dynamically batches many co-occurring small writes into larger ones for efficiency.
var as = NewAppendService(ctx, client) var op = as.StartAppend(pb.AppendRequest{ Journal: "a/journal/name", }, myOtherOpsWhichMustCompleteFirst) // Produce content to append into the AsyncAppend's Writer. // We hold an exclusive lock over it until Release. op.Writer().Write("hello, ") op.Writer().Write("gazette: ") op.Require(os.Copy(op.Writer(), os.Stdin)) // If os.Copy error'd, it aborts the append and is returned by Release. if err = op.Release(); err == nil { err = op.Err() // Blocks until operation completes. }
The package offers functions for listing Fragments & JournalSpecs and applying JournalSpecs, while accounting for pagination details. Also notable is PolledList, which is an important building-block for applications scaling to multiple journals.
Index ¶
- Variables
- func Append(ctx context.Context, rjc pb.RoutedJournalClient, req pb.AppendRequest, ...) (pb.AppendResponse, error)
- func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error)
- func ApplyJournalsInBatches(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest, size int) (*pb.ApplyResponse, error)
- func GetJournal(ctx context.Context, jc pb.JournalClient, journal pb.Journal) (*pb.JournalSpec, error)
- func InstallFileTransport(root string) (remove func())
- func ListAllFragments(ctx context.Context, client pb.RoutedJournalClient, req pb.FragmentsRequest) (*pb.FragmentsResponse, error)
- func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRequest) (*pb.ListResponse, error)
- func ReadListResponse(stream pb.Journal_ListClient, req pb.ListRequest) (*pb.ListResponse, error)
- type AppendService
- type Appender
- type AsyncAppend
- func (p *AsyncAppend) Done() <-chan struct{}
- func (p *AsyncAppend) Err() error
- func (p *AsyncAppend) Release() error
- func (p *AsyncAppend) Request() pb.AppendRequest
- func (p *AsyncAppend) Require(err error) *AsyncAppend
- func (p *AsyncAppend) Response() pb.AppendResponse
- func (p *AsyncAppend) Writer() *bufio.Writer
- type AsyncJournalClient
- type AsyncOperation
- type FragmentReader
- type OpFuture
- type OpFutures
- type Reader
- type RetryReader
- func (rr *RetryReader) AdjustedOffset(br *bufio.Reader) int64
- func (rr *RetryReader) AdjustedSeek(offset int64, whence int, br *bufio.Reader) (int64, error)
- func (rr *RetryReader) Journal() pb.Journal
- func (rr *RetryReader) Offset() int64
- func (rr *RetryReader) Read(p []byte) (n int, err error)
- func (rr *RetryReader) Restart(req pb.ReadRequest)
- func (rr *RetryReader) Seek(offset int64, whence int) (int64, error)
- type RouteCache
- type WatchedList
Constants ¶
This section is empty.
Variables ¶
var ( // Map common broker error statuses into named errors. ErrInsufficientJournalBrokers = errors.New(pb.Status_INSUFFICIENT_JOURNAL_BROKERS.String()) ErrJournalNotFound = errors.New(pb.Status_JOURNAL_NOT_FOUND.String()) ErrNotJournalBroker = errors.New(pb.Status_NOT_JOURNAL_BROKER.String()) ErrNotJournalPrimaryBroker = errors.New(pb.Status_NOT_JOURNAL_PRIMARY_BROKER.String()) ErrOffsetNotYetAvailable = errors.New(pb.Status_OFFSET_NOT_YET_AVAILABLE.String()) ErrRegisterMismatch = errors.New(pb.Status_REGISTER_MISMATCH.String()) ErrWrongAppendOffset = errors.New(pb.Status_WRONG_APPEND_OFFSET.String()) // ErrOffsetJump is returned by Reader.Read to indicate that the next byte // available to be read is at a larger offset than that requested (eg, // because a span of the Journal has been deleted). The Reader's ReadResponse // should be inspected by the caller, and Read may be invoked again to continue. ErrOffsetJump = errors.New("offset jump") // ErrSeekRequiresNewReader is returned by Reader.Seek if it is unable to // satisfy the requested Seek. A new Reader should be started instead. ErrSeekRequiresNewReader = errors.New("seek offset requires new Reader") // ErrDidNotReadExpectedEOF is returned by FragmentReader.Read if the // underlying file did not return EOF at the expected Fragment End offset. ErrDidNotReadExpectedEOF = errors.New("did not read EOF at expected Fragment.End") )
Functions ¶
func Append ¶
func Append(ctx context.Context, rjc pb.RoutedJournalClient, req pb.AppendRequest, content ...io.ReaderAt) (pb.AppendResponse, error)
Append zero or more ReaderAts to a journal as a single Append transaction. Append retries on transport or routing errors, but fails on all other errors. Each ReaderAt is read from byte zero until EOF, and may be read multiple times. If no ReaderAts are provided, an Append RPC with no content is issued.
func ApplyJournals ¶
func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error)
ApplyJournals applies journal changes detailed in the ApplyRequest via the broker Apply RPC. Changes are applied as a single Etcd transaction. If the change list is larger than an Etcd transaction can accommodate, ApplyJournalsInBatches should be used instead. ApplyResponse statuses other than OK are mapped to an error.
func ApplyJournalsInBatches ¶
func ApplyJournalsInBatches(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest, size int) (*pb.ApplyResponse, error)
ApplyJournalsInBatches is like ApplyJournals, but chunks the ApplyRequest into batches of the given size, which should be less than Etcd's maximum configured transaction size (usually 128). If size is 0 all changes will be attempted in a single transaction. Be aware that ApplyJournalsInBatches may only partially succeed, with some batches having applied and others not. The final ApplyResponse is returned, unless an error occurs. ApplyResponse statuses other than OK are mapped to an error.
func GetJournal ¶ added in v0.83.1
func GetJournal(ctx context.Context, jc pb.JournalClient, journal pb.Journal) (*pb.JournalSpec, error)
GetJournal retrieves the JournalSpec of the named Journal, or returns an error.
func InstallFileTransport ¶
func InstallFileTransport(root string) (remove func())
InstallFileTransport registers a file:// protocol handler at the given root with the http.Client used by OpenFragmentURL. It's used in testing contexts, and is also appropriate when brokers share a common NAS file store to which fragments are persisted, and to which this client also has access. The returned cleanup function removes the handler and restores the prior http.Client.
const root = "/mnt/shared-nas-array/path/to/fragment-root" defer client.InstallFileTransport(root)() var rr = NewRetryReader(ctx, client, protocol.ReadRequest{ Journal: "a/journal/with/nas/fragment/store", DoNotProxy: true, }) // rr.Read will read Fragments directly from NAS.
func ListAllFragments ¶
func ListAllFragments(ctx context.Context, client pb.RoutedJournalClient, req pb.FragmentsRequest) (*pb.FragmentsResponse, error)
ListAllFragments performs multiple Fragments RPCs, as required to join across multiple FragmentsResponse pages, and returns the completed FragmentResponse. Any encountered error is returned.
func ListAllJournals ¶
func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRequest) (*pb.ListResponse, error)
ListAllJournals performs a unary broker journal listing. Any encountered error is returned.
func ReadListResponse ¶ added in v0.100.0
func ReadListResponse(stream pb.Journal_ListClient, req pb.ListRequest) (*pb.ListResponse, error)
ReadListResponse reads a complete ListResponse snapshot from the stream.
Types ¶
type AppendService ¶
type AppendService struct { pb.RoutedJournalClient // contains filtered or unexported fields }
AppendService batches, dispatches, and (if needed) retries asynchronous Append RPCs. Use of an AppendService is appropriate for clients who make large numbers of small writes to a Journal, and where those writes may be pipelined and batched to amortize the cost of broker Append RPCs. It may also simplify implementations for clients who would prefer to simply have writes block until successfully committed, as opposed to handling errors and retries themselves.
For each journal, AppendService manages an ordered list of AsyncAppends, each having buffered content to be appended. The list is dispatched in FIFO order by a journal-specific goroutine.
AsyncAppends are backed by temporary files on the local disk rather than memory buffers. This minimizes the impact of buffering on the heap and garbage collector, and also makes AppendService tolerant to sustained service disruptions (up to the capacity of the disk).
AppendService implements the AsyncJournalClient interface.
func NewAppendService ¶
func NewAppendService(ctx context.Context, client pb.RoutedJournalClient) *AppendService
NewAppendService returns an AppendService with the provided Context and BrokerClient.
func (*AppendService) PendingExcept ¶
func (s *AppendService) PendingExcept(except pb.Journal) OpFutures
PendingExcept implements the AsyncJournalClient interface.
func (*AppendService) StartAppend ¶
func (s *AppendService) StartAppend(req pb.AppendRequest, dependencies OpFutures) *AsyncAppend
StartAppend implements the AsyncJournalClient interface.
type Appender ¶
type Appender struct { Request pb.AppendRequest // AppendRequest of the Append. Response pb.AppendResponse // AppendResponse sent by broker. // contains filtered or unexported fields }
Appender adapts an Append RPC to the io.WriteCloser interface. The first byte written to the Appender initiates the RPC. Subsequent bytes are streamed to brokers as they are written. Writes to the Appender may stall as the RPC window fills, while waiting for brokers to sequence this Append into the journal. Once they do, brokers will expect remaining content to append is quickly written to this Appender (and may time-out the RPC if it's not).
Content written to this Appender does not commit until Close is called, including cases where the application dies without calling Close. If a call to Close is started and the application dies before Close returns, the append may or may commit.
The application can cleanly roll-back a started Appender by Aborting it.
func NewAppender ¶
func NewAppender(ctx context.Context, client pb.RoutedJournalClient, req pb.AppendRequest) *Appender
NewAppender returns an initialized Appender of the given AppendRequest.
func (*Appender) Abort ¶
func (a *Appender) Abort()
Abort the append, causing the broker to discard previously written content.
func (*Appender) Close ¶
Close the Append to complete the transaction, committing previously written content. If Close returns without an error, Append.Response will hold the broker response.
type AsyncAppend ¶
type AsyncAppend struct {
// contains filtered or unexported fields
}
AsyncAppend represents an asynchronous Append RPC started and managed by an AppendService.
func (*AsyncAppend) Done ¶
func (p *AsyncAppend) Done() <-chan struct{}
Done returns a channel which selects when the AsyncAppend has committed or has been aborted along with the AppendService's Context.
func (*AsyncAppend) Err ¶
func (p *AsyncAppend) Err() error
Err blocks until Done, and returns the final operation error.
func (*AsyncAppend) Release ¶
func (p *AsyncAppend) Release() error
Release the AsyncAppend, allowing further writes to queue or for it to be dispatched to the brokers. Release first determines whether a previous Require failed, or if a Writer error occurred, in which case it will roll back all writes queued by the caller, aborting the append transaction, and return the non-nil error. A non-nil error is returned if and only if the Append was rolled back. Otherwise, the caller may then select on Done to determine when the AsyncAppend has committed and its Response may be examined.
func (*AsyncAppend) Request ¶
func (p *AsyncAppend) Request() pb.AppendRequest
Request returns the AppendRequest that was or will be made by this AsyncAppend. Request is safe to call at all times.
func (*AsyncAppend) Require ¶
func (p *AsyncAppend) Require(err error) *AsyncAppend
Require the error to be nil. If Require is called with a non-nil error, the error is retained and later returned by Release, in which case it will also roll back any writes queued by the caller, aborting the append transaction. Require is valid for use only until Release is called. Require returns itself, allowing uses like:
Require(maybeErrors()).Release()
func (*AsyncAppend) Response ¶
func (p *AsyncAppend) Response() pb.AppendResponse
Response returns the AppendResponse from the broker, and may be called only after Done selects.
func (*AsyncAppend) Writer ¶
func (p *AsyncAppend) Writer() *bufio.Writer
Writer returns a bufio.Writer to which content may be appended. Writer is valid for use only until Release is called. Clients may ignore write errors of the Writer, preferring to "fire and forget" a sequence of writes which could fail: Release will itself Require that no error is set on the Writer.
type AsyncJournalClient ¶
type AsyncJournalClient interface { pb.RoutedJournalClient // StartAppend begins a new asynchronous Append RPC. The caller holds exclusive access // to the returned AsyncAppend, and must then: // * Write content to its Writer. // * Optionally Require that one or more errors are nil. // * Release the AsyncAppend, allowing queued writes to commit or, // if an error occurred, to roll back. // // For performance reasons, an Append will often be batched with other Appends // having identical AppendRequests which are dispatched to this AppendService, // and note the Response.Fragment will reflect the entire batch written to the // broker. In all cases, relative order of Appends is preserved. One or more // dependencies may optionally be supplied. The Append RPC will not begin // until all dependencies have completed without error. A failure of a // dependency will also permanently fail the returned AsyncAppend and prevent // any further appends to this journal from starting. For this reason, an // OpFuture should only fail if it also invalidates the AsyncJournalClient // (eg, because the client is scoped to a context which is invalidated by the // OpFuture failure). StartAppend(req pb.AppendRequest, dependencies OpFutures) *AsyncAppend // PendingExcept returns an OpFutures set of *AsyncAppend instances being // evaluated for all Journals other than |except|. It can be used to build // "barriers" which ensure that all pending appends have committed prior to the // commencement of an append which is about to be issued. Eg, given: // // var op = as.StartAppend(pb.AppendRequest{Journal: "target"}, as.PendingExcept("target")) // op.Writer().WriteString("checkpoint") // op.Release() // // All ongoing appends to journals other than "target" are guaranteed to commit // before an Append RPC is begun which writes "checkpoint" to journal "target". // PendingExcept("") returns all pending AsyncAppends. // // If a prior journal append failed (eg, because its dependency failed) a // resolved OpFuture with that error will be included in returned OpFutures. // This ensures the error will properly cascade to an operation which may // depend on these OpFutures. PendingExcept(except pb.Journal) OpFutures }
AsyncJournalClient composes a RoutedJournalClient with an API for performing asynchronous Append operations.
type AsyncOperation ¶ added in v0.83.1
type AsyncOperation struct {
// contains filtered or unexported fields
}
AsyncOperation is a simple, minimal implementation of the OpFuture interface.
func NewAsyncOperation ¶ added in v0.83.1
func NewAsyncOperation() *AsyncOperation
NewAsyncOperation returns a new AsyncOperation.
func (*AsyncOperation) Done ¶ added in v0.83.1
func (o *AsyncOperation) Done() <-chan struct{}
Done selects when Resolve is called.
func (*AsyncOperation) Err ¶ added in v0.83.1
func (o *AsyncOperation) Err() error
Err blocks until Resolve is called, then returns its error.
func (*AsyncOperation) Resolve ¶ added in v0.83.1
func (o *AsyncOperation) Resolve(err error)
Resolve marks the AsyncOperation as completed with the given error.
type FragmentReader ¶
type FragmentReader struct { pb.Fragment // Fragment being read. Offset int64 // Next journal offset to be read, in range [Begin, End). // contains filtered or unexported fields }
FragmentReader directly reads from an opened Fragment file.
func NewFragmentReader ¶
func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*FragmentReader, error)
NewFragmentReader wraps a io.ReadCloser of raw Fragment bytes with a returned *FragmentReader which has been pre-seeked to the given offset.
func OpenFragmentURL ¶
func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, url string) (*FragmentReader, error)
OpenFragmentURL directly opens the Fragment, which must be available at the given URL, and returns a *FragmentReader which has been pre-seeked to the given offset.
func (*FragmentReader) Close ¶
func (fr *FragmentReader) Close() error
Close closes the underlying ReadCloser and associated decompressor (if any).
func (*FragmentReader) Read ¶
func (fr *FragmentReader) Read(p []byte) (n int, err error)
Read returns the next bytes of decompressed Fragment content. When Read returns, Offset has been updated to reflect the next byte to be read. Read returns EOF only if the underlying Reader returns EOF at precisely Offset == Fragment.End. If the underlying Reader is too short, io.ErrUnexpectedEOF is returned. If it's too long, ErrDidNotReadExpectedEOF is returned.
type OpFuture ¶ added in v0.83.1
type OpFuture interface { // Done selects when operation background execution has finished. Done() <-chan struct{} // Err blocks until Done() and returns the final error of the OpFuture. Err() error }
OpFuture represents an operation which is executing in the background. The operation has completed when Done selects. Err may be invoked to determine whether the operation succeeded or failed.
func FinishedOperation ¶ added in v0.83.1
FinishedOperation is a convenience that returns an already-resolved AsyncOperation.
type OpFutures ¶ added in v0.83.1
type OpFutures map[OpFuture]struct{}
OpFutures is a set of OpFuture instances.
func (OpFutures) IsSubsetOf ¶ added in v0.83.1
IsSubsetOf is true of this OpFutures is a subset of the other.
type Reader ¶
type Reader struct { Request pb.ReadRequest // ReadRequest of the Reader. Response pb.ReadResponse // Most recent ReadResponse from broker. // contains filtered or unexported fields }
Reader adapts a Read RPC to the io.Reader interface. The first byte read from the Reader initiates the RPC, and subsequent reads stream from it.
If DoNotProxy is true then the broker may close the RPC after sending a signed Fragment URL, and Reader will directly open the Fragment (decompressing if needed), seek to the requested offset, and read its content.
Reader returns EOF if:
- The broker closes the RPC, eg because its assignment has change or it's shutting down.
- The requested EndOffset has been read through.
- A Fragment being read by the Reader reaches EOF.
If Block is true, Read may block indefinitely. Otherwise, ErrOffsetNotYetAvailable is returned upon reaching the journal write head.
A Reader is invalidated by its first returned error, with the exception of ErrOffsetJump: this error is returned to notify the client that the next Journal offset to be Read is not the offset that was requested (eg, because a portion of the Journal was deleted), but the Reader is prepared to continue at the updated, strictly larger offset.
func NewReader ¶
func NewReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRequest) *Reader
NewReader returns an initialized Reader of the given ReadRequest.
func (*Reader) AdjustedOffset ¶
AdjustedOffset returns the current journal offset adjusted for content read by the bufio.Reader (which must wrap this Reader), which has not yet been consumed from the bufio.Reader's buffer.
func (*Reader) Read ¶
Read from the journal. If this is the first Read of the Reader, a Read RPC is started.
func (*Reader) Seek ¶
Seek provides a limited form of seeking support. Specifically, if:
- A Fragment URL is being directly read, and
- The Seek offset is ahead of the current Reader offset, and
- The Fragment also covers the desired Seek offset
Then a seek is performed by reading and discarding to the seeked offset. Seek will otherwise return ErrSeekRequiresNewReader.
type RetryReader ¶
type RetryReader struct { // Context of the RetryReader, which parents the Context provided to // underlying *Reader instances. Context context.Context // Client of the RetryReader. Client pb.RoutedJournalClient // Reader is the current underlying Reader of the RetryReader. This instance // may change many times over the lifetime of a RetryReader, as Read RPCs // finish or are cancelled and then restarted. Reader *Reader // Cancel Read operations of the current *Reader. Notably this will cause an // ongoing blocked Read (as well as any future Reads) to return a "Cancelled" // error. Restart may be called to re-initialize the RetryReader. Cancel context.CancelFunc }
RetryReader wraps Reader with error handling and retry behavior, as well as support for cancellation of an ongoing Read or Seek operation. RetryReader is not thread-safe, with one exception: Cancel may be called from one goroutine to abort an ongoing Read or Seek call in another.
func NewRetryReader ¶
func NewRetryReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRequest) *RetryReader
NewRetryReader returns a RetryReader initialized with the BrokerClient and ReadRequest.
func (*RetryReader) AdjustedOffset ¶
func (rr *RetryReader) AdjustedOffset(br *bufio.Reader) int64
AdjustedOffset delegates to the current Reader's AdjustedOffset.
func (*RetryReader) AdjustedSeek ¶
AdjustedSeek sets the offset for the next Read, accounting for buffered data and, where possible, accomplishing the AdjustedSeek by discarding from the bufio.Reader.
func (*RetryReader) Journal ¶
func (rr *RetryReader) Journal() pb.Journal
Journal being read by this RetryReader.
func (*RetryReader) Offset ¶
func (rr *RetryReader) Offset() int64
Offset of the next Journal byte to be returned by Read.
func (*RetryReader) Read ¶
func (rr *RetryReader) Read(p []byte) (n int, err error)
Read returns the next bytes of journal content. It will return a non-nil error in the following cases:
- Cancel is called, or the RetryReader context is cancelled.
- The broker returns OFFSET_NOT_YET_AVAILABLE (ErrOffsetNotYetAvailable) for a non-blocking ReadRequest.
- An offset jump occurred (ErrOffsetJump), in which case the client should inspect the new Offset and may continue reading if desired.
- The broker returns io.EOF upon reaching the requested EndOffset.
All other errors are retried.
func (*RetryReader) Restart ¶
func (rr *RetryReader) Restart(req pb.ReadRequest)
Restart the RetryReader with a new ReadRequest. Restart without a prior Cancel will leak resources.
func (*RetryReader) Seek ¶
func (rr *RetryReader) Seek(offset int64, whence int) (int64, error)
Seek sets the offset for the next Read. It returns an error if (and only if) whence is io.SeekEnd, which is not supported. Where possible Seek will delegate to the current Reader, but in most cases a new Read RPC must be started.
type RouteCache ¶
type RouteCache struct {
// contains filtered or unexported fields
}
RouteCache caches observed Routes for items (eg, Journals, or Shards). It implements the protocol.DispatchRouter interface, and where a cached Route of an item is available, it enables applications to dispatch RPCs directly to the most appropriate broker or consumer process. This reduces the overall number of network hops, and especially the number of hops crossing availability zones (which often cost more).
For example, RouteCache can direct an application to a broker in its same availability zone which is replicating a desired journal, and to which a long-lived Read RPC can be dispatched.
// Adapt a JournalClient to a RoutedJournalClient by using a RouteCache. var jc protocol.JournalClient var rjc = protocol.NewRoutedJournalClient(jc, NewRouteCache(256, time.Hour))
func NewRouteCache ¶
func NewRouteCache(size int, ttl time.Duration) *RouteCache
NewRouteCache returns a RouteCache of the given size (which must be > 0) and caching Duration.
func (*RouteCache) IsNoopRouter ¶
func (rc *RouteCache) IsNoopRouter() bool
IsNoopRouter returns false.
func (*RouteCache) UpdateRoute ¶
func (rc *RouteCache) UpdateRoute(item string, route *pb.Route)
UpdateRoute caches the provided Route for the item, or invalidates it if the route is nil or empty.
type WatchedList ¶ added in v0.100.0
type WatchedList struct {
// contains filtered or unexported fields
}
WatchedList drives ongoing List RPCs with a given ListRequest, making its most recent result available via List. It's a building block for applications which interact with dynamic journal sets and wish to react to changes in their set membership over time.
var partitions, _ = protocol.ParseLabelSelector("logs=clicks, source=mobile") var pl, err = NewPolledList(ctx, client, protocol.ListRequest{ Selector: partitions, })
func NewWatchedList ¶ added in v0.100.0
func NewWatchedList(ctx context.Context, client pb.JournalClient, req pb.ListRequest, updateCh chan error) *WatchedList
NewWatchedList returns a WatchedList of the ListRequest which will keep up-to-date with watched changes from brokers.
Upon return, the WatchedList is not yet ready, but the caller may await UpdateCh() to be notified when the very first listing (or error) is available.
If `updateCh` is non-nil, then it's used as UpdateCh() instead of allocating a new channel. This is recommended if you have a dynamic set of WatchedList instances and wish to take action if any one of them update. Otherwise, a new channel with a single buffered item is allocated.
func (*WatchedList) List ¶ added in v0.100.0
func (pl *WatchedList) List() (out *pb.ListResponse)
List returns the most recent ListResponse snapshot, or nil if a snapshot has not been received yet.
func (*WatchedList) UpdateCh ¶ added in v0.100.0
func (pl *WatchedList) UpdateCh() <-chan error
UpdateCh returns a channel which is signaled with each update or error of the PolledList. Errors are informational only: WatchedList will retry on all errors, but the caller may wish to examine errors and cancel its context. Only one signal is sent per-update, so if multiple goroutines select from UpdateCh only one will wake.