Documentation ¶
Index ¶
- Variables
- func InternalDeduplicate(ctx context.Context, in <-chan Event, out chan<- Event, pastEvents int)
- func KubernetesAtomicWrite(t *testing.T, dir, fileName, content string)
- func NewChangeFeedConnection(ctx context.Context, l *logrusx.Logger, dsn string) (*sqlx.DB, error)
- func WatchAndServeWS(ctx context.Context, u *url.URL, writer herodot.Writer) (http.HandlerFunc, error)
- type ChangeEvent
- type ErrorEvent
- type Event
- type EventChannel
- type RemoveEvent
- type Watcher
- func Watch(ctx context.Context, u *url.URL, c EventChannel) (Watcher, error)
- func WatchChangeFeed(ctx context.Context, cx *sqlx.DB, tableName string, out EventChannel, ...) (_ Watcher, err error)
- func WatchDirectory(ctx context.Context, dir string, c EventChannel) (Watcher, error)
- func WatchFile(ctx context.Context, file string, c EventChannel) (Watcher, error)
- func WatchWebsocket(ctx context.Context, u *url.URL, c EventChannel) (Watcher, error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrSchemeUnknown is just for checking with errors.Is() ErrSchemeUnknown = &errSchemeUnknown{} ErrWatcherNotRunning = fmt.Errorf("watcher is not running") )
Functions ¶
func InternalDeduplicate ¶ added in v0.0.643
InternalDeduplicate sents events from `events` to the `deduplicated` channel, but deduplicates events that are sent multiple times. This is necessary, because the CochroachDB changefeed has a atleast-once guarantee for change events, meaning that events could be sent multiple times.
For deduplication, the last x `pastEvents` are considered.
func KubernetesAtomicWrite ¶ added in v0.0.539
func NewChangeFeedConnection ¶ added in v0.0.247
NewChangeFeedConnection opens a new connection to the database and enables the CHANGEFEED feature.
The caller is responsible for closing the connection when done.
You must register the `pgx` driver before calling this function:
import _ "github.com/jackc/pgx/v4/stdlib"
Types ¶
type ChangeEvent ¶
type ChangeEvent struct {
// contains filtered or unexported fields
}
func (*ChangeEvent) MarshalJSON ¶ added in v0.0.145
func (e *ChangeEvent) MarshalJSON() ([]byte, error)
func (*ChangeEvent) Reader ¶
func (e *ChangeEvent) Reader() io.Reader
func (*ChangeEvent) String ¶ added in v0.0.171
func (e *ChangeEvent) String() string
type ErrorEvent ¶
type ErrorEvent struct {
// contains filtered or unexported fields
}
func NewErrorEvent ¶ added in v0.0.643
func NewErrorEvent(err error, source_ string) *ErrorEvent
func (*ErrorEvent) MarshalJSON ¶ added in v0.0.145
func (e *ErrorEvent) MarshalJSON() ([]byte, error)
func (*ErrorEvent) Reader ¶
func (e *ErrorEvent) Reader() io.Reader
func (*ErrorEvent) String ¶ added in v0.0.171
func (e *ErrorEvent) String() string
type EventChannel ¶
type EventChannel chan Event
type RemoveEvent ¶
type RemoveEvent struct {
// contains filtered or unexported fields
}
func (*RemoveEvent) MarshalJSON ¶ added in v0.0.145
func (e *RemoveEvent) MarshalJSON() ([]byte, error)
func (*RemoveEvent) Reader ¶
func (e *RemoveEvent) Reader() io.Reader
func (*RemoveEvent) String ¶ added in v0.0.171
func (e *RemoveEvent) String() string
type Watcher ¶ added in v0.0.168
type Watcher interface { // DispatchNow fires the watcher and causes an event. // // WARNING: The returned channel must be read or no further events will // be propagated due to a deadlock. DispatchNow() (<-chan int, error) }
func WatchChangeFeed ¶ added in v0.0.247
func WatchChangeFeed(ctx context.Context, cx *sqlx.DB, tableName string, out EventChannel, cursor time.Time) (_ Watcher, err error)
WatchChangeFeed sends changed rows on the channel. To cancel the execution, cancel the context!
Watcher.DispatchNow() does not have an effect in this method.
This function spawns the necessary go-routines to process the change-feed events and deduplicate them.
func WatchDirectory ¶ added in v0.0.142
func WatchFile ¶
WatchFile spawns a background goroutine to watch file, reporting any changes to c. Watching stops when ctx is canceled.