Documentation ¶
Index ¶
- Constants
- type ErrorStore
- type ErrorStreamLogger
- func (esl *ErrorStreamLogger) Debugf(format string, v ...interface{})
- func (esl *ErrorStreamLogger) Errorf(format string, v ...interface{})
- func (esl *ErrorStreamLogger) Infof(format string, v ...interface{})
- func (esl *ErrorStreamLogger) Panicf(format string, v ...interface{})
- func (esl *ErrorStreamLogger) Printf(format string, v ...interface{})
- func (esl *ErrorStreamLogger) Warnf(format string, v ...interface{})
- func (esl *ErrorStreamLogger) WithPrefix(prefix string) logger.Logger
- type ErrorType
- type Main
- type Record
- type ShardOffset
- type ShardRecord
- type SinkErrorPayload
- type SinkErrorQueue
- type Source
- type StreamOffsets
- type StreamReader
- type StreamReaderConfig
Constants ¶
const ( RecoverableErrorType = ErrorType("Error") PanicErrorType = ErrorType("Panic") // Runtime errors. )
These are the currently supported ErrorType values that can be emitted to an ErrorStore.
const ( ShardStatusActive = "ACTIVE" ShardStatusPending = "PENDING" ShardStatusClosed = "CLOSED" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ErrorStore ¶
type ErrorStore interface { // Available checks if the backing resource can receive error // messages via Push. Available() bool // Push emits an error message of type ErrorType to the backing resource. // // The caller should NOT assume that Available is called implicitly // to check for ErrorStore availability. Push(ErrorType, string, logger.Logger) error }
ErrorStore is an abstraction over a resource like external storage, a database, a queue, etc. that can receive and store error messages.
type ErrorStreamLogger ¶
type ErrorStreamLogger struct {
// contains filtered or unexported fields
}
ErrorStreamLogger is a logger.Logger implementation that decorates a base logger.Logger instance and emits error and panic messages to an ErrorStore.
All other log levels delegate to the base logger.Logger implementation.
func NewErrorStreamLogger ¶
func NewErrorStreamLogger(base logger.Logger, store ErrorStore) *ErrorStreamLogger
NewErrorStreamLogger constructs an ErrorStreamLogger from a logger.Logger and an ErrorStore.
func (*ErrorStreamLogger) Debugf ¶
func (esl *ErrorStreamLogger) Debugf(format string, v ...interface{})
Debugf just delegates to the wrapped/base logger.Logger's Debugf implementation.
func (*ErrorStreamLogger) Errorf ¶
func (esl *ErrorStreamLogger) Errorf(format string, v ...interface{})
Errorf delegates to the wrapped/base logger.Logger's Errorf implementation and additionally/ pushes an error message with RecoverableErrorType ErrorType to the ErrorStore.
If an error occurs during a Push to the ErrorStore, the error is logged to the wrapped/base logger.Logger using Errorf.
func (*ErrorStreamLogger) Infof ¶
func (esl *ErrorStreamLogger) Infof(format string, v ...interface{})
Infof just delegates to the wrapped/base logger.Logger's Infof implementation.
func (*ErrorStreamLogger) Panicf ¶
func (esl *ErrorStreamLogger) Panicf(format string, v ...interface{})
Panicf delegates to the wrapped/base logger.Logger's Panicf implementation and additionally pushes an error message with PanicErrorType ErrorType to the ErrorStore.
If an error occurs during a Push to the ErrorStore, the error is logged to the wrapped/base logger.Logger using Errorf NOT Panicf.
func (*ErrorStreamLogger) Printf ¶
func (esl *ErrorStreamLogger) Printf(format string, v ...interface{})
Printf just delegates to the wrapped/base logger.Logger's Printf implementation.
func (*ErrorStreamLogger) Warnf ¶
func (esl *ErrorStreamLogger) Warnf(format string, v ...interface{})
Warnf just delegates to the wrapped/base logger.Logger's Warnf implementation.
func (*ErrorStreamLogger) WithPrefix ¶ added in v3.32.0
func (esl *ErrorStreamLogger) WithPrefix(prefix string) logger.Logger
type Main ¶
type Main struct { idk.Main `flag:"!embed"` Timeout time.Duration `help:"Time to wait for more records from Kinesis before flushing a batch. 0 to disable."` Header string `help:"Path to the static schema, in JSON header format. May be a path on the local filesystem, or an S3 URI."` AWSRegion string `help:"AWS Region. Alternatively, use environment variable AWS_REGION."` AllowMissingFields bool `help:"Will proceed with ingest even if a field is missing from a record but specified in the JSON config file. Default false"` StreamName string `help:"Name of AWS Kinesis stream to consume records from."` OffsetsPath string `help:"Path where the offsets file will be written. May be a path on the local filesystem, or an S3 URI."` AWSProfile string `help:"Name of AWS profile to use. Alternatively, use environment variable AWS_PROFILE."` ErrorQueueName string `help:"SQS queue name to send error and panic/runtime errors to."` }
Main is the holder of all configurations for a Kinesis stream consumer.
Along with the additional configuration fields, kinesis.Main also gains all fields and methods from idk.Main via composition.
func NewMain ¶
func NewMain() *Main
NewMain returns a new instance of a Kinesis stream consumer configuration object.
It specifies a callback NewSource that can be invoked to create a kinesis.Source object. This callback implicitly initializes an AWS session and uses that session to initialize clients to the following AWS resources: S3, Kinesis, and SQS. Client creation happens regardless of configuration. (ex: OffsetsPath and Header are local paths -> S3 client is created.)
The default BatchSize is 20000 and Concurrency is 1. Any Concurrency value > 1 is NOT supported. These values are set on the returned kinesis.Main instance.
The Logger instance on the kinesis.Source is always decorated when NewSource is invoked. Assuming no errors occur during AWS client initialization, the decorated Logger instance is propagated back to the kinesis.Main so that callers that configured it can also emit errors and panics to the SQS queue specified by ErrorQueueName. The behavior of the wrapped Logger depends on a non-empty ErrorQueueName, the existence of an SQS queue instance in AWSRegion with that name the StreamName field being of a particular format 'PREFIX'-VALID_UUID, and if a valid SQS queue URL can be resolved at the time of Logger initialization. If any of these are false, the error emission to an SQS queue functionality is not activated and the Logger instance behaves identically to its wrapped Logger and emits a warning to the caller that errors are not propagated to SQS.
type Record ¶
type Record struct {
// contains filtered or unexported fields
}
func (*Record) StreamOffset ¶
type ShardOffset ¶
type ShardRecord ¶
ShardRecord wraps a kinesis record including metadata about its shard and an IDK-local index
type SinkErrorPayload ¶
type SinkErrorPayload struct { SinkId string `json:"sink_id"` ErrorType ErrorType `json:"error_type"` ErrorMessage string `json:"error_msg"` Timestamp string `json:"time"` }
SinkErrorPayload contains all data about a single IDK error message that will be emitted to an ErrorStore; includes a timestamp and a valid sink ID.
This payload is not meant to be used outside the context of error propogation to an ErrorStore.
type SinkErrorQueue ¶
type SinkErrorQueue struct {
// contains filtered or unexported fields
}
SinkErrorQueue is an ErrorStore implementation that uses an SQS queue as its backing resource to emit error and panic messages to.
It also maps 1-to-1 to a Kinesis stream via a unique sink ID.
func NewSinkErrorQueue ¶
func NewSinkErrorQueue(queue sqsiface.SQSAPI, queueName, sinkId string) (*SinkErrorQueue, error)
NewSinkErrorQueue attempts to construct a SinkErrorQueue instance from an AWS SQS client, a queue name, and a sink ID.
On success, callers can assume that a backing SQS queue resource exists and is fully initialized.
Returns nil and an SQS error if an SQS queue URL cannot be resolved from the queue name and/or the AWS SQS client.
This method assumes the sink ID argument is valid.
func SinkErrorQueueFrom ¶
func SinkErrorQueueFrom(queue sqsiface.SQSAPI, source *Source) *SinkErrorQueue
SinkErrorQueueFrom always constructs a SinkErrorQueue instance from an AWS SQS client and a kinesis.Source.
Unlike NewSinkErrorQueue, this does NOT return an error if a queue URL cannot be resolved from the queue name and/or the AWS SQS client. Instead it will collapse to a SinkErrorQueue instance with a backing SQS resource that is ALWAYS unavailable. Attempting to invoke Push on this instance will not result in an error; instead it will just emit a warning that no backing SQS resource could be written to.
This does check if the sink ID has a valid form: 'PREFIX'-VALID_UUID. If not, this collapses to a SinkErrorQueue instance that is ALWAYS unavailable.
func (*SinkErrorQueue) Available ¶
func (seq *SinkErrorQueue) Available() bool
Available checks that a valid SQS queue resource exists.
func (*SinkErrorQueue) Push ¶
Push attempts to emit a single error message of type ErrorType to a SQS queue resource.
If the backing SQS queue resource is not available, this function is a no-op and does NOT return an error. Instead it emits a warning to the logger.Logger instance specified by the log argument.
The warning can be ignored entirely by passing a nil log argument.
type Source ¶
type Source struct { Log logger.Logger Timeout time.Duration Header string AWSRegion string AWSProfile string AllowMissingFields bool StreamName string OffsetsPath string ErrorQueueName string // contains filtered or unexported fields }
Source implements the idk.Source interface using kafka as a data source. It is not threadsafe! Due to the way Kafka clients work, to achieve concurrency, create multiple Sources.
type StreamOffsets ¶
type StreamOffsets struct { // synchronize updates/reads to the Shards map sync.RWMutex StreamName string `json:"stream_name"` Shards map[string]*ShardOffset `json:"shards"` }
func ReadOffsets ¶
func ReadOffsets(cfg StreamReaderConfig) (*StreamOffsets, error)
func (*StreamOffsets) Load ¶
func (o *StreamOffsets) Load(shardID string) (*ShardOffset, bool)
Load returns the ShardOffset for the shard in a thread safe manner
type StreamReader ¶
type StreamReader struct { StreamReaderConfig // contains filtered or unexported fields }
func NewStreamReader ¶
func NewStreamReader(cfg StreamReaderConfig) (*StreamReader, error)
func (*StreamReader) Close ¶
func (r *StreamReader) Close()
func (*StreamReader) CommitMessages ¶
func (r *StreamReader) CommitMessages(ctx context.Context, msgs ...ShardRecord) error
func (*StreamReader) FetchMessage ¶
func (r *StreamReader) FetchMessage(ctx context.Context) (ShardRecord, error)
func (*StreamReader) ProcessShards ¶
func (r *StreamReader) ProcessShards() error
func (*StreamReader) Start ¶
func (r *StreamReader) Start() error
type StreamReaderConfig ¶
type StreamReaderConfig struct {
// contains filtered or unexported fields
}