Documentation ¶
Index ¶
- func BuildMergedS3ObjectKey(prefix, schema, partition, chunkKey string) string
- type AthenaTableName
- type Chunk
- type ComposeQueue
- type IndexRecord
- type LogQueue
- type MergeQueue
- type MessageRecord
- type ParquetSchemaName
- type PartitionQueue
- type RawObject
- type RawObjectPrefix
- type Record
- type RecordQueue
- type S3Object
- type S3Objects
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildMergedS3ObjectKey ¶
BuildMergedS3ObjectKey creates S3 key of merged object from paramters
Types ¶
type AthenaTableName ¶
type AthenaTableName string
AthenaTableName idicates table name and directory name
const ( // AthenaTableIndex is table name for index objects and directory name AthenaTableIndex AthenaTableName = "indices" // AthenaTableMessage is table name for message objects and directory name AthenaTableMessage = "messages" )
type Chunk ¶
type Chunk struct { Schema string `dynamo:"schema"` RecordIDs []string `dynamo:"record_ids,set"` TotalSize int64 `dynamo:"total_size"` Partition string `dynamo:"partition"` CreatedAt int64 `dynamo:"created_at"` ChunkKey string `dynamo:"chunk_key"` Freezed bool `dynamo:"freezed"` // For DynamoDB PK string `dynamo:"pk"` SK string `dynamo:"sk"` }
func NewChunkFromDynamoEvent ¶
func NewChunkFromDynamoEvent(image map[string]events.DynamoDBAttributeValue) (*Chunk, error)
NewChunkFromDynamoEvent builds Chunk by DynamoDBAttributeValue
type ComposeQueue ¶
type ComposeQueue struct { RecordID string `json:"record_id"` S3Object S3Object `json:"s3_object"` Size int64 `json:"size"` Schema string `json:"schema"` Partition string `json:"partition"` }
ComposeQueue is sent by indexer and received by composer
type IndexRecord ¶
type IndexRecord struct { Tag string `parquet:"name=tag, type=UTF8, encoding=PLAIN_DICTIONARY" json:"tag" msgpack:"tag"` // Timestamp is unixtime (second) of original log. Timestamp int64 `parquet:"name=timestamp, type=INT64" json:"timestamp" msgpack:"timestamp"` Field string `parquet:"name=field, type=UTF8, encoding=PLAIN_DICTIONARY" json:"field" msgpack:"field"` Term string `parquet:"name=term, type=UTF8, encoding=PLAIN_DICTIONARY" json:"term" msgpack:"term"` ObjectID int64 `parquet:"name=object_id, type=INT64" json:"object_id" msgpack:"object_id"` Seq int32 `parquet:"name=seq, type=INT32" json:"seq" msgpack:"seq"` }
IndexRecord is used for inverted index of log files on S3 bucket.
type LogQueue ¶
type LogQueue struct { Err error Timestamp time.Time Tag string Message string Value interface{} Seq int32 Src S3Object }
LogQueue is used in indexer
type MergeQueue ¶
type MergeQueue struct { Schema ParquetSchemaName `json:"schema"` TotalSize int64 `json:"total_size"` RecordIDs []string `json:"record_ids"` DstObject S3Object `json:"dst_object"` }
MergeQueue specify src object locations to be merged and destination object location.
type MessageRecord ¶
type MessageRecord struct { // Timestamp is unixtime (second) of original log. Timestamp int64 `parquet:"name=timestamp, type=INT64" json:"timestamp" msgpack:"timestamp"` ObjectID int64 `parquet:"name=object_id, type=INT64" json:"object_id" msgpack:"object_id"` Seq int32 `parquet:"name=seq, type=INT32" json:"seq" msgpack:"seq"` Message string `parquet:"name=message, type=UTF8, encoding=PLAIN_DICTIONARY" json:"message" msgpack:"message"` }
MessageRecord stores original log message that is encoded to JSON.
type ParquetSchemaName ¶
type ParquetSchemaName string
ParquetSchemaName identifies schema name
const ( // ParquetSchemaIndex indicates IndexRecord parquet schema ParquetSchemaIndex ParquetSchemaName = "index" // ParquetSchemaMessage indicates MessageRecord ParquetSchemaMessage ParquetSchemaName = "message" )
type PartitionQueue ¶
type PartitionQueue struct { Location string `json:"location"` TableName string `json:"table_name"` Keys map[string]string `json:"keys"` }
PartitionQueue is arguments of partitioner to add a new partition
type RawObject ¶
type RawObject struct { DataSize int64 // contains filtered or unexported fields }
RawObject is converted from original log message, but not merged. File format of the object is not defined and any encoding is acceptable by DumpService. This structure is used to indicate path of S3 and partition for Athena.
func NewRawObject ¶
func NewRawObject(prefix *RawObjectPrefix, ext string) *RawObject
NewRawObject is constrcutor of RawObject. *ext* is extension of the object.
func (*RawObject) PartitionKeys ¶
PartitionKeys returns map of partition name and value
func (*RawObject) PartitionPath ¶
PartitionPath returns S3 path to top of the partition. The path including s3:// prefix and bucket name. e.g.) s3://your-bucket/prefix/indicies/dt=2020-01-02-03/
type RawObjectPrefix ¶
type RawObjectPrefix struct {
// contains filtered or unexported fields
}
RawObjectPrefix is basic information of RawObject. Basically RawObject is one-on-one relationship to original log S3 object. However sometimes multiple RawObjects are generated from one original log object because too large object size. (Large object can not be converted to Parquet file because of OOM error.) Then base parameters are independent from RawObject.
func NewRawObjectPrefix ¶
func NewRawObjectPrefix(schema ParquetSchemaName, base, src S3Object, ts time.Time) *RawObjectPrefix
NewRawObjectPrefix is constructor of RawObjectPrefix. *base* must has destination S3 bucket and prefix. *src* indicates S3 object of original logs. *ts* is log timestamp to identify partition.
func (*RawObjectPrefix) Key ¶
func (x *RawObjectPrefix) Key() string
Key of RawObjectPrefix returns unique key for RawObject
func (*RawObjectPrefix) Schema ¶
func (x *RawObjectPrefix) Schema() ParquetSchemaName
Schema is getter of schema
type RecordQueue ¶
RecordQueue is used for RecordService.Load
type S3Object ¶
type S3Object struct { Region string `json:"region" dynamo:"s3_region"` Bucket string `json:"bucket" dynamo:"s3_bucket"` Key string `json:"key" dynamo:"s3_key"` }
func DecodeS3Object ¶
func NewS3Object ¶
func NewS3ObjectFromRecord ¶
func NewS3ObjectFromRecord(record events.S3EventRecord) S3Object
type S3Objects ¶
type S3Objects struct { Raw string `json:"raw"` // contains filtered or unexported fields }