Documentation ¶
Overview ¶
Package journal contains all runtime components for Gazette journals, including Fragment & Spool for journal content, Head (serving replications), Tail (reads), and Broker (for brokering new writes).
Code generated by mockery v1.0.0 ¶
Code generated by mockery v1.0.0 ¶
Code generated by mockery v1.0.0 ¶
Code generated by mockery v1.0.0 ¶
Code generated by mockery v1.0.0
Index ¶
- Constants
- Variables
- func ErrorFromResponse(response *http.Response) error
- func NewWalkFuncAdapter(callback func(Fragment) error, rewrites ...string) filepath.WalkFunc
- func StatusCodeForError(err error) int
- type AppendArgs
- type AppendOp
- type AppendResult
- type AsyncAppend
- type Broker
- type BrokerConfig
- type Client
- type Creator
- type Doer
- type Fragment
- func (f Fragment) AsDirectURL(cfs cloudstore.FileSystem, duration time.Duration) (*url.URL, error)
- func (f Fragment) ContentName() string
- func (f *Fragment) ContentPath() string
- func (f Fragment) IsLocal() bool
- func (f Fragment) ReaderFromOffset(offset int64, cfs cloudstore.FileSystem) (io.ReadCloser, error)
- func (f Fragment) Size() int64
- type FragmentFile
- type FragmentPersister
- type FragmentSet
- type Getter
- type Head
- type Header
- type IndexWatcher
- type Mark
- type MarkedReader
- type MemoryBroker
- func (j *MemoryBroker) Create(journal Name) error
- func (j *MemoryBroker) Flush()
- func (j *MemoryBroker) Get(args ReadArgs) (ReadResult, io.ReadCloser)
- func (j *MemoryBroker) Head(args ReadArgs) (ReadResult, *url.URL)
- func (j *MemoryBroker) ReadFrom(name Name, r io.Reader) (*AsyncAppend, error)
- func (j *MemoryBroker) Write(name Name, b []byte) (*AsyncAppend, error)
- type MockDoer
- type MockFragmentFile
- func (_m *MockFragmentFile) Close() error
- func (_m *MockFragmentFile) Fd() uintptr
- func (_m *MockFragmentFile) Read(p []byte) (int, error)
- func (_m *MockFragmentFile) ReadAt(p []byte, off int64) (int, error)
- func (_m *MockFragmentFile) Seek(offset int64, whence int) (int64, error)
- func (_m *MockFragmentFile) Write(p []byte) (int, error)
- type MockGetter
- type MockHeader
- type MockWriter
- type Name
- type ReadArgs
- type ReadOp
- type ReadResult
- type Replica
- type ReplicateArgs
- type ReplicateOp
- type ReplicateResult
- type Replicator
- type RetryReader
- type RouteToken
- type Spool
- type Tail
- type WriteCommitter
- type Writer
Constants ¶
const (
AppendOpBufferSize = 100
)
const ReplicateOpBufferSize = 10
Variables ¶
var ( ErrExists = errors.New("journal exists") ErrNotBroker = errors.New("not journal broker") ErrNotFound = errors.New("journal not found") ErrNotReplica = errors.New("not journal replica") ErrNotYetAvailable = errors.New("offset not yet available") ErrReplicationFailed = errors.New("replication failed") ErrWrongRouteToken = errors.New("wrong route token") ErrWrongWriteHead = errors.New("wrong write head") )
var ErrInvalidDelta = errors.New("invalid delta")
Functions ¶
func ErrorFromResponse ¶
Maps a HTTP status code into a correponding Journal protocol error, or nil. Unknown status codes are converted into an error.
func NewWalkFuncAdapter ¶
NewWalkFuncAdapter returns a filepath.WalkFunc which parses encountered files as Fragments, and passes each to the provided |callback|. Prefix |rewrites| may be included, as pairs of "from", "to" prefixes which are applied in order. For example, NewWalkFuncAdapter(cb, "/from/", "/foo/to/", "/foo/", "/") would rewrite path "/from/bar" => "/to/bar".
func StatusCodeForError ¶
Maps Journal protocol errors into a unique HTTP status code. Other errors are mapped into http.StatusInternalServerError.
Types ¶
type AppendArgs ¶
type AppendArgs struct { Journal Name // Content to be appended to |Journal|. The append will consume |Content| // until io.EOF, and abort the append (without committing any content) // if any other error is returned by |Content.Read()|. Content io.Reader // Context which may trace, cancel or supply a deadline for the operation. Context context.Context }
func (AppendArgs) String ¶
func (a AppendArgs) String() string
type AppendOp ¶
type AppendOp struct { AppendArgs // Channel by which broker returns operation status. Result chan AppendResult `json:"-"` }
type AppendResult ¶
type AppendResult struct { // Any error that occurred during the append operation (PUT request.) Error error // Write head at the completion of the operation. WriteHead int64 // RouteToken of the Journal. Set on ErrNotBroker. RouteToken }
func (AppendResult) String ¶
func (a AppendResult) String() string
type AsyncAppend ¶
type AsyncAppend struct { // Read-only, and valid only after Ready is signaled. AppendResult // Signaled with the AppendOp has completed. Ready chan struct{} }
Represents an AppendOp which is being asynchronously executed.
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker is responsible for scattering journal writes to each replica, i.e., brokering transactions.
func (*Broker) StartServingOps ¶
StartServingOps starts a loop to consume config updates and serves appends. Updates are always handled before appends.
func (*Broker) Stop ¶
func (b *Broker) Stop()
Stop shuts down the broker. It blocks until all pending config updates and appends are handled.
func (*Broker) UpdateConfig ¶
func (b *Broker) UpdateConfig(config BrokerConfig)
type BrokerConfig ¶
type BrokerConfig struct { // Replica instances which should be involved in brokered transactions. Replicas []Replicator // Token representing the Broker's view of the current replication topology. // Sent with replication requests and verified for consensus by each remote // replica: for a transaction to succeed, all replicas must agree on the // current |RouteToken|. RouteToken // Next offset of the next brokered write transaction. Also sent with // replication requests and verifed for consensus by each remote replica: // for a transaction to succeed, all replicas must agree on the |WriteHead|. WriteHead int64 // contains filtered or unexported fields }
BrokerConfig is used to periodically update Broker with updated cluster topology and replication configuration.
type Doer ¶
Provides low-level routing and access to a Gazette service, suitable for proxying requests and modeled on http.Client. The client will perform journal-based routing to the appropriate Gazette instance. See gazette.Client.
type Fragment ¶
type Fragment struct { Journal Name Begin, End int64 Sum [sha1.Size]byte // Backing file of the fragment, if present locally. File FragmentFile // If fragment is remote, the time of last modification. // NOTE(joshk): Does not get set in Client use. // TODO(johnny): Is this the appropriate factoring? RemoteModTime time.Time }
func LocalFragments ¶
LocalFragments returns fragments of |journal| under the local |directory|.
TODO(johnny): Collapse with NewWalkFuncAdapter above, or deprecate as part of a larger local-fragment change (Issues #30 & #31).
func (Fragment) AsDirectURL ¶
func (f Fragment) AsDirectURL(cfs cloudstore.FileSystem, duration time.Duration) (*url.URL, error)
func (Fragment) ContentName ¶
func (*Fragment) ContentPath ¶
func (Fragment) ReaderFromOffset ¶
func (f Fragment) ReaderFromOffset(offset int64, cfs cloudstore.FileSystem) (io.ReadCloser, error)
type FragmentFile ¶
type FragmentFile interface { Close() error Read(p []byte) (n int, err error) ReadAt(p []byte, off int64) (n int, err error) Seek(offset int64, whence int) (int64, error) Fd() uintptr Write(p []byte) (n int, err error) }
Portions of os.File interface used by Fragment. An interface is used (rather than directly using *os.File) in support of test mocks.
type FragmentPersister ¶
type FragmentPersister interface {
Persist(Fragment)
}
FragmentPersister accepts completed local fragment spools, which should be persisted to long-term storage. See |gazette.Persister|.
type FragmentSet ¶
type FragmentSet []Fragment
Maintains fragments ordered on |Begin| and |End|, with the invariant that no fragment is fully overlapped by another fragment in the set (though it may be overlapped by a combination of other fragments). Larger fragments are preferred (and will replace spans of overlapped smaller fragments). An implication of this invariant is that no two fragments have the same |Begin| or |End| (as that would imply an overlap). Both are monotonically increasing in the set: set[0].Begin represents the minimum offset, and set[len(set)-1].End represents the maximum offset.
func (*FragmentSet) Add ¶
func (s *FragmentSet) Add(fragment Fragment) bool
func (*FragmentSet) BeginOffset ¶
func (s *FragmentSet) BeginOffset() int64
func (*FragmentSet) EndOffset ¶
func (s *FragmentSet) EndOffset() int64
func (*FragmentSet) LongestOverlappingFragment ¶
func (s *FragmentSet) LongestOverlappingFragment(offset int64) int
Finds and returns the fragment covering |offset|, which has the most content after |offset|. If no fragment covers |offset|, the first fragment beginning after |offset| is returned.
type Getter ¶
type Getter interface {
Get(args ReadArgs) (ReadResult, io.ReadCloser)
}
Performs a Gazette GET operation.
type Head ¶
type Head struct {
// contains filtered or unexported fields
}
func NewHead ¶
func NewHead(journal Name, directory string, persister FragmentPersister, updates chan<- Fragment) *Head
func (*Head) Replicate ¶
func (h *Head) Replicate(op ReplicateOp)
func (*Head) StartServingOps ¶
type Header ¶
type Header interface {
Head(args ReadArgs) (result ReadResult, fragmentLocation *url.URL)
}
Performs a Gazette HEAD operation.
type IndexWatcher ¶
type IndexWatcher struct {
// contains filtered or unexported fields
}
IndexWatcher monitors a journal's storage location in the cloud filesystem for new fragments, by performing periodic directory listings. When new fragment metadata arrives, it's published to the journal Tail via a shared channel, which indexes the fragment and makes it available for read requests.
func NewIndexWatcher ¶
func NewIndexWatcher(journal Name, cfs cloudstore.FileSystem, updates chan<- Fragment) *IndexWatcher
func (*IndexWatcher) StartWatchingIndex ¶
func (w *IndexWatcher) StartWatchingIndex() *IndexWatcher
func (*IndexWatcher) Stop ¶
func (w *IndexWatcher) Stop()
func (*IndexWatcher) WaitForInitialLoad ¶
func (w *IndexWatcher) WaitForInitialLoad()
type MarkedReader ¶
type MarkedReader struct { Mark Mark io.ReadCloser }
A MarkedReader delegates reads to an underlying reader, and maintains |Mark| such that it always points to the next byte to be read.
func NewMarkedReader ¶
func NewMarkedReader(mark Mark, r io.ReadCloser) *MarkedReader
func (*MarkedReader) AdjustedMark ¶
func (r *MarkedReader) AdjustedMark(br *bufio.Reader) Mark
AdjustedMark returns the current Mark adjusted for content read by |br| (which must wrap this MarkedReader) but unconsumed from |br|'s buffer.
func (*MarkedReader) Close ¶
func (r *MarkedReader) Close() error
type MemoryBroker ¶
type MemoryBroker struct { // DelayWrites indicates that writes should queue (and their promises not resolve) until: // * The next explicit Flush, or // * DelayWrites is set to false and another Write occurs. DelayWrites bool // Content written to each journal. Content map[Name]*bytes.Buffer // Pending content which will be written on the next Flush (or write, if !DelayWrites). Pending map[Name]*bytes.Buffer // contains filtered or unexported fields }
MemoryBroker provides an in-memory implementation of the Client interface. The intended use is within unit tests which exercise components coordinating through the Client interface.
func NewMemoryBroker ¶
func NewMemoryBroker() *MemoryBroker
NewMemoryBroker returns an initialized, zero-value MemoryBroker.
func (*MemoryBroker) Create ¶
func (j *MemoryBroker) Create(journal Name) error
func (*MemoryBroker) Flush ¶
func (j *MemoryBroker) Flush()
Flush resolves all pending writes and wakes any blocked read operations.
func (*MemoryBroker) Get ¶
func (j *MemoryBroker) Get(args ReadArgs) (ReadResult, io.ReadCloser)
func (*MemoryBroker) Head ¶
func (j *MemoryBroker) Head(args ReadArgs) (ReadResult, *url.URL)
func (*MemoryBroker) ReadFrom ¶
func (j *MemoryBroker) ReadFrom(name Name, r io.Reader) (*AsyncAppend, error)
func (*MemoryBroker) Write ¶
func (j *MemoryBroker) Write(name Name, b []byte) (*AsyncAppend, error)
type MockFragmentFile ¶
MockFragmentFile is an autogenerated mock type for the FragmentFile type
func (*MockFragmentFile) Close ¶
func (_m *MockFragmentFile) Close() error
Close provides a mock function with given fields:
func (*MockFragmentFile) Fd ¶
func (_m *MockFragmentFile) Fd() uintptr
Fd provides a mock function with given fields:
func (*MockFragmentFile) Read ¶
func (_m *MockFragmentFile) Read(p []byte) (int, error)
Read provides a mock function with given fields: p
func (*MockFragmentFile) ReadAt ¶
func (_m *MockFragmentFile) ReadAt(p []byte, off int64) (int, error)
ReadAt provides a mock function with given fields: p, off
type MockGetter ¶
MockGetter is an autogenerated mock type for the Getter type
func (*MockGetter) Get ¶
func (_m *MockGetter) Get(args ReadArgs) (ReadResult, io.ReadCloser)
Get provides a mock function with given fields: args
type MockHeader ¶
MockHeader is an autogenerated mock type for the Header type
func (*MockHeader) Head ¶
func (_m *MockHeader) Head(args ReadArgs) (ReadResult, *url.URL)
Head provides a mock function with given fields: args
type MockWriter ¶
MockWriter is an autogenerated mock type for the Writer type
func (*MockWriter) ReadFrom ¶
func (_m *MockWriter) ReadFrom(journal Name, r io.Reader) (*AsyncAppend, error)
ReadFrom provides a mock function with given fields: journal, r
func (*MockWriter) Write ¶
func (_m *MockWriter) Write(journal Name, buffer []byte) (*AsyncAppend, error)
Write provides a mock function with given fields: journal, buffer
type Name ¶
type Name string
A typed journal name. By convention, journals are named using a forward- slash notation which captures their hierarchical relationships into organizations, topics and partitions. For example, a complete Name might be: "company-journals/interesting-topic/part-1234"
type ReadArgs ¶
type ReadArgs struct { Journal Name // Desired offset to begin reading from. Value -1 has special handling, where // the read is performed from the current write head. All other positive // values specify a desired exact byte offset to read from. If the offset is // not available (eg, because it represents a portion of Journal which has // been permantently deleted), the broker will return the next available // offset. Callers should therefore always inspect the ReadResult Offset. Offset int64 // Whether the operation should block until content becomes available. // ErrNotYetAvailable is returned if a non-blocking read has no ready content. Blocking bool // Context which may trace, cancel or supply a deadline for the operation. Context context.Context // Deprecated: Server-side support for deadlines will be removed. Use // context.WithDeadline instead. // The time at which blocking will expire Deadline time.Time }
type ReadOp ¶
type ReadOp struct { ReadArgs // Channel by which replica returns a ReadResult. Result chan ReadResult `json:"-"` }
type ReadResult ¶
type ReadResult struct { Error error // The effective offset of the operation. Offset int64 // Write head at the completion of the operation. WriteHead int64 // RouteToken of the Journal. Set on ErrNotReplica. RouteToken // Result fragment, set iff |Error| is nil. Fragment Fragment }
func (ReadResult) String ¶
func (a ReadResult) String() string
type Replica ¶
type Replica struct {
// contains filtered or unexported fields
}
Replica manages journal components required to serve brokered writes, replications, and reads. A Replica instance is capable of switching roles at any time (and multiple times), from a pure replica which may serve replication requests only, to a broker of the journal.
func NewReplica ¶
func NewReplica(journal Name, localDir string, persister FragmentPersister, cfs cloudstore.FileSystem) *Replica
func (*Replica) Replicate ¶
func (r *Replica) Replicate(op ReplicateOp)
func (*Replica) StartBrokeringWithPeers ¶
func (r *Replica) StartBrokeringWithPeers(routeToken RouteToken, peers []Replicator)
Switch the Replica into broker mode. Appends will be brokered to |peers| with the topology captured by |routeToken|.
func (*Replica) StartReplicating ¶
func (r *Replica) StartReplicating(routeToken RouteToken)
Switch the Replica into pure-replica mode.
type ReplicateArgs ¶
type ReplicateArgs struct { Journal Name // WriteHead (eg, first byte) of the replicated transaction. // Already known and verified by all journal replicas. WriteHead int64 // RouteToken of the transaction, also known and verified by all replicas. RouteToken // Flags whether replicas should begin a new spool for this transaction. NewSpool bool // Context which may trace, cancel or supply a deadline for the operation. Context context.Context }
func (ReplicateArgs) String ¶
func (a ReplicateArgs) String() string
type ReplicateOp ¶
type ReplicateOp struct { ReplicateArgs // Channel by which replica returns a ReplicateResult. Result chan ReplicateResult `json:"-"` }
type ReplicateResult ¶
type ReplicateResult struct { Error error // Iff |Error| is ErrWrongWriteHead, then |ErrorWriteHead| is the replica's // own, strictly greater write head. ErrorWriteHead int64 // Set iff |Error| is nil. Writer WriteCommitter }
func (ReplicateResult) String ¶
func (a ReplicateResult) String() string
type Replicator ¶
type Replicator interface {
Replicate(op ReplicateOp)
}
A Replicator is able to serve a ReplicateOp. It may be backed by a local Spool, or by a remote Gazette process.
type RetryReader ¶
type RetryReader struct { // MarkedReader manages the current reader and offset tracking. MarkedReader // Whether read operations should block (the default). If Blocking is false, // than Read operations may return ErrNotYetAvailable. Blocking bool // LastResult retains the result of the last journal read operation. // Callers may access it to inspect metadata returned by the broker. // It may be invalidated on every Read call. LastResult ReadResult // Getter against which to perform read operations. Getter Getter // Context to use in read operations issued to Getter. Context context.Context }
RetryReader wraps a Getter and MarkedReader to provide callers with a long-lived journal reader. RetryReader transparently handles and retries errors, and will block as needed to await new journal content.
func NewRetryReader
deprecated
func NewRetryReader(mark Mark, getter Getter) *RetryReader
Deprecated: Use NewRetryReaderContext instead. NewRetryReader returns a RetryReader at |mark|, using the provided |getter| for all operations.
func NewRetryReaderContext ¶
func NewRetryReaderContext(ctx context.Context, mark Mark, getter Getter) *RetryReader
NewRetryReaderContext returns a RetryReader at |mark|, using the provided |getter| and |ctx| for all operations.
func (*RetryReader) AdjustedSeek ¶
AdjustedSeek sets the offset for the next Read, accounting for buffered data and updating the buffer as needed.
func (*RetryReader) Read ¶
func (rr *RetryReader) Read(p []byte) (n int, err error)
Read returns the next available bytes of journal content, retrying as required retry errors or await content to be written. Read will return a non-nil error in the following cases:
- If the RetryReader context is cancelled.
- If Blocking is false, and ErrNotYetAvailable is returned by the broker.
All other errors are retried.
type RouteToken ¶
type RouteToken string
Token which describes the ordered set of responsible servers for a Journal: the first acts as broker, and the rest serve replications and reads (only). Structured as '|'-separated URLs rooting the server's Journal hierarchy. Ex: "http://srv-2/a/root|https://srv-1|http://12.34.56.7:8080/other/root".
type Tail ¶
type Tail struct {
// contains filtered or unexported fields
}
func (*Tail) StartServingOps ¶
type WriteCommitter ¶
type WriteCommitter interface { io.Writer // Commits the first |count| bytes of previous Write([]byte) content. Commit(count int64) error }
A WriteCommitter extends Writer with a protocol for committing those writes.
type Writer ¶
type Writer interface { // Appends |buffer| to |journal|. Either all of |buffer| is written, or none // of it is. Returns a Promise which is resolved when the write has been // fully committed. Write(journal Name, buffer []byte) (*AsyncAppend, error) // Appends |r|'s content to |journal|, by reading until io.EOF. Either all of // |r| is written, or none of it is. Returns a Promise which is resolved when // the write has been fully committed. ReadFrom(journal Name, r io.Reader) (*AsyncAppend, error) }
A Writer allows for append-only writes to a named journal.