Documentation
¶
Index ¶
- Constants
- Variables
- type BatchWriter
- type DefaultFirehoseFactory
- type DefaultKinesisFactory
- type Event
- type EventForwarder
- type FirehoseBatchWriter
- type FirehoseFactory
- type JSONRecord
- type KinesisConfig
- type KinesisFactory
- type KinesisWriter
- type Multee
- type Record
- type RotateConditions
- type SpadeWriter
- type SpadeWriterManager
- type Statter
- type StreamBatchWriter
- type WriteRequest
Constants ¶
const (
// RedshiftDatetimeIngestString is the format of timestamps that Redshift understands.
RedshiftDatetimeIngestString = "2006-01-02 15:04:05.999"
)
Variables ¶
var ( // EventsDir is the local subdirectory where successfully-transformed events are written. EventsDir = "events" // NonTrackedDir is the local subdirectory where non-tracked events are written. NonTrackedDir = "nontracked" )
Functions ¶
This section is empty.
Types ¶
type BatchWriter ¶
type BatchWriter interface {
SendBatch([][]byte)
}
BatchWriter is an interface to write batches to an external sink.
type DefaultFirehoseFactory ¶
DefaultFirehoseFactory returns a normal Firehose client.
func (*DefaultFirehoseFactory) New ¶
func (f *DefaultFirehoseFactory) New(region, role string) firehoseiface.FirehoseAPI
New returns a firehose client configured to use the given region/role.
type DefaultKinesisFactory ¶
DefaultKinesisFactory returns a normal Kinesis client.
func (*DefaultKinesisFactory) New ¶
func (f *DefaultKinesisFactory) New(region, role string) kinesisiface.KinesisAPI
New returns a kinesis client configured to use the given region/role.
type EventForwarder ¶
type EventForwarder interface { Submit([]byte) Close() }
EventForwarder receives events and forwards them to Kinesis or another EventForwarder.
type FirehoseBatchWriter ¶
type FirehoseBatchWriter struct {
// contains filtered or unexported fields
}
FirehoseBatchWriter writes batches to Kinesis Firehose
func (*FirehoseBatchWriter) SendBatch ¶
func (w *FirehoseBatchWriter) SendBatch(batch [][]byte)
SendBatch writes the given batch to a firehose, configured by the FirehoseBatchWriter
type FirehoseFactory ¶
type FirehoseFactory interface {
New(region, role string) firehoseiface.FirehoseAPI
}
FirehoseFactory returns a firehose interface from a given session.
type JSONRecord ¶
JSONRecord is a raw JSON record to be sent to Kinesis.
type KinesisConfig ¶
type KinesisConfig struct { StreamConfig scoop_protocol.KinesisWriterConfig CommonFilters map[string]scoop_protocol.EventFilterFunc DefaultFilter scoop_protocol.EventFilterFunc }
KinesisConfig represents a stream's config and some base kinesis config.
type KinesisFactory ¶
type KinesisFactory interface {
New(region, role string) kinesisiface.KinesisAPI
}
KinesisFactory returns a kinesis interface from a given session.
type KinesisWriter ¶
KinesisWriter is a writer that writes events to kinesis
func (*KinesisWriter) Rotate ¶
func (w *KinesisWriter) Rotate() (bool, error)
Rotate doesn't do anything as KinesisWriters don't need to rotate.
func (*KinesisWriter) Write ¶
func (w *KinesisWriter) Write(req *WriteRequest)
type Multee ¶
Multee implements the `SpadeWriter` and 'SpadeWriterManager' interface and forwards all calls to a map of targets.
func (*Multee) Add ¶
func (t *Multee) Add(key string, w SpadeWriter)
Add adds a new writer to the target map
func (*Multee) Replace ¶
func (t *Multee) Replace(key string, newWriter SpadeWriter)
Replace adds a new writer to the target map
func (*Multee) Write ¶
func (t *Multee) Write(r *WriteRequest)
Write forwards a writerequest to multiple targets
type RotateConditions ¶
RotateConditions is the parameters for maximum time/size until we force a rotation.
type SpadeWriter ¶
type SpadeWriter interface { Write(*WriteRequest) Close() error // Rotate requests a rotation from the SpadeWriter, which *may* write to S3 or Kinesis // depending on timing and amount of information already buffered. This should be // called periodically, as this is the only time a SpadeWriter will write to its // sink (except on Close). It returns a bool indicating whether all sinks were // written to and one of the errors which arose in writing (if any). Rotate() (bool, error) }
SpadeWriter is an interface for writing to external sinks, like S3 or Kinesis.
func NewKinesisWriter ¶
func NewKinesisWriter( kinesisFactory KinesisFactory, firehoseFactory FirehoseFactory, statter statsd.Statter, config *KinesisConfig, errorsBeforeThrottling int, secondsPerError int64) (SpadeWriter, error)
NewKinesisWriter returns an instance of SpadeWriter that writes events to kinesis
func NewWriterController ¶
func NewWriterController( folder string, reporter reporter.Reporter, spadeUploaderPool *uploader.UploaderPool, blueprintUploaderPool *uploader.UploaderPool, maxLogBytes int64, maxLogAgeSecs int64, nontrackedMaxLogAgeSecs int64, ) SpadeWriter
NewWriterController returns a writerController that handles logic to distribute writes across a number of workers. Each worker owns and operates one file. There are several sets of workers. Each set corresponds to a event type. Thus if we are processing a log file with 2 types of events we should produce (nWriters * 2) files
type SpadeWriterManager ¶
type SpadeWriterManager interface { Add(key string, w SpadeWriter) Drop(key string) Replace(key string, newWriter SpadeWriter) }
SpadeWriterManager allows operations on a set of SpadeWriters
type Statter ¶
type Statter struct {
// contains filtered or unexported fields
}
Statter sends stats for a BatchWriter.
func NewStatter ¶
NewStatter returns a Statter for the given stream.
type StreamBatchWriter ¶
type StreamBatchWriter struct {
// contains filtered or unexported fields
}
StreamBatchWriter writes batches to Kinesis Streams
func (*StreamBatchWriter) SendBatch ¶
func (w *StreamBatchWriter) SendBatch(batch [][]byte)
SendBatch writes the given batch to a stream, configured by the KinesisWriter
type WriteRequest ¶
type WriteRequest struct { Category string Version int // Line is the transformed data in tsv format Line string // Record is the transformed data in a key/value map Record map[string]string UUID string // Keep the source around for logging Source json.RawMessage Failure reporter.FailMode Pstart time.Time }
WriteRequest is a processed event with metadata, ready for writing to an output.
func MakeErrorRequest ¶
func MakeErrorRequest(e *parser.MixpanelEvent, err interface{}) *WriteRequest
MakeErrorRequest returns a WriteRequest indicating panic happened during processing.
func (*WriteRequest) GetCategory ¶
func (r *WriteRequest) GetCategory() string
GetCategory returns the event type.
func (*WriteRequest) GetMessage ¶
func (r *WriteRequest) GetMessage() string
GetMessage returns the raw JSON of the event.
func (*WriteRequest) GetResult ¶
func (r *WriteRequest) GetResult() *reporter.Result
GetResult returns timing and metadata of the event.
func (*WriteRequest) GetStartTime ¶
func (r *WriteRequest) GetStartTime() time.Time
GetStartTime returns when procesing of the event started.