awsglue

package
v1.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2020 License: AGPL-3.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LogProcessingDatabaseName        = "panther_logs"
	LogProcessingDatabaseDescription = "Holds tables with data from Panther log processing"

	RuleMatchDatabaseName        = "panther_rule_matches"
	RuleMatchDatabaseDescription = "Holds tables with data from Panther rule matching (same table structure as panther_logs)"

	ViewsDatabaseName        = "panther_views"
	ViewsDatabaseDescription = "Holds views useful for querying Panther data"

	RuleErrorsDatabaseName        = "panther_rule_errors"
	RuleErrorsDatabaseDescription = "Holds tables with data that failed Panther rule matching (same table structure as panther_logs)"

	TempDatabaseName        = "panther_temp"
	TempDatabaseDescription = "Holds temporary tables used for processing tasks"
)
View Source
const (
	// We want our output JSON timestamps to be: YYYY-MM-DD HH:MM:SS.fffffffff
	// https://aws.amazon.com/premiumsupport/knowledge-center/query-table-athena-timestamp-empty/
	TimestampLayout     = `2006-01-02 15:04:05.000000000`
	TimestampLayoutJSON = `"` + TimestampLayout + `"`
)
View Source
const (
	DefaultMaxCommentLength = 255
	GlueTimestampType       = "timestamp"
	GlueStringType          = "string"
)

Variables

View Source
var (
	// MaxCommentLength is the maximum size for a column comment (clip if larger), public var so it can be set to control output
	MaxCommentLength = DefaultMaxCommentLength

	// GlueMappings for custom Panther types.
	GlueMappings = []CustomMapping{
		{
			From: reflect.TypeOf(time.Time{}),
			To:   GlueTimestampType,
		},
		{
			From: reflect.TypeOf(jsoniter.RawMessage{}),
			To:   GlueStringType,
		},
		{
			From: reflect.TypeOf(numerics.Integer(0)),
			To:   "bigint",
		},
		{
			From: reflect.TypeOf(numerics.Int64(0)),
			To:   "bigint",
		},
		{
			From: reflect.TypeOf(null.Float64{}),
			To:   "double",
		},
		{
			From: reflect.TypeOf(null.Float32{}),
			To:   "float",
		},
		{
			From: reflect.TypeOf(null.Int64{}),
			To:   "bigint",
		},
		{
			From: reflect.TypeOf(null.Int32{}),
			To:   "int",
		},
		{
			From: reflect.TypeOf(null.Int16{}),
			To:   "smallint",
		},
		{
			From: reflect.TypeOf(null.Int8{}),
			To:   "tinyint",
		},
		{
			From: reflect.TypeOf(null.Uint64{}),
			To:   "bigint",
		},
		{
			From: reflect.TypeOf(null.Uint32{}),
			To:   "bigint",
		},
		{
			From: reflect.TypeOf(null.Uint16{}),
			To:   "int",
		},
		{
			From: reflect.TypeOf(null.Uint8{}),
			To:   "smallint",
		},
		{
			From: reflect.TypeOf(null.String{}),
			To:   GlueStringType,
		},
		{
			From: reflect.TypeOf(null.NonEmpty{}),
			To:   GlueStringType,
		},
		{
			From: reflect.TypeOf(null.Bool{}),
			To:   "boolean",
		},
	}

	// RuleMatchColumns are columns added by the rules engine
	RuleMatchColumns = []Column{
		{
			Name:    "p_rule_id",
			Type:    GlueStringType,
			Comment: "Rule id",
		},
		{
			Name:    "p_alert_id",
			Type:    GlueStringType,
			Comment: "Alert id",
		},
		{
			Name:    "p_alert_creation_time",
			Type:    GlueTimestampType,
			Comment: "The time the alert was initially created (first match)",
		},
		{
			Name:    "p_alert_update_time",
			Type:    GlueTimestampType,
			Comment: "The time the alert last updated (last match)",
		},
		{
			Name:    "p_rule_tags",
			Type:    ArrayOf(GlueStringType),
			Comment: "The tags of the rule that generated this alert",
		},
		{
			Name:    "p_rule_reports",
			Type:    MapOf(GlueStringType, ArrayOf(GlueStringType)),
			Comment: "The reporting tags of the rule that generated this alert",
		},
	}

	// RuleErrorColumns are columns added by the rules engine
	RuleErrorColumns = append(
		RuleMatchColumns,
		Column{
			Name:    "p_rule_error",
			Type:    GlueStringType,
			Comment: "The rule error",
		},
	)
)

Functions

func ArrayOf added in v1.7.0

func ArrayOf(typ string) string

func CreateDatabase added in v1.4.0

func CreateDatabase(client glueiface.GlueAPI, name, description string) (*glue.CreateDatabaseOutput, error)

func CreatePartition

func CreatePartition(client glueiface.GlueAPI, databaseName, tableName string,
	partitionValues []*string, storageDescriptor *glue.StorageDescriptor,
	parameters map[string]*string) (*glue.CreatePartitionOutput, error)

func DeleteDatabase added in v1.4.0

func DeleteDatabase(client glueiface.GlueAPI, name string) (*glue.DeleteDatabaseOutput, error)

func DeletePartition

func DeletePartition(client glueiface.GlueAPI, databaseName, tableName string,
	partitionValues []*string) (*glue.DeletePartitionOutput, error)

func DeleteTable

func DeleteTable(client glueiface.GlueAPI, databaseName, tableName string) (*glue.DeleteTableOutput, error)

func GetDataPrefix

func GetDataPrefix(databaseName string) string

func GetPartition

func GetPartition(client glueiface.GlueAPI, databaseName, tableName string,
	partitionValues []*string) (*glue.GetPartitionOutput, error)

func GetPartitionLocation

func GetPartitionLocation(s3Path string) (string, error)

GetPartitionLocation takes an S3 path for an object and returns just the part of the patch associated with the partition

func GetPartitionPrefix

func GetPartitionPrefix(datatype models.DataType, logType string, timebin GlueTableTimebin, time time.Time) string

func GetTable

func GetTable(client glueiface.GlueAPI, databaseName, tableName string) (*glue.GetTableOutput, error)

func GetTableName

func GetTableName(logType string) string

func IsJSONPartition

func IsJSONPartition(storageDescriptor *glue.StorageDescriptor) bool

func MapOf added in v1.7.0

func MapOf(key, typ string) string

func MustRegisterMapping added in v1.7.0

func MustRegisterMapping(from reflect.Type, to string)

func NewTimestampEncoder added in v1.7.0

func NewTimestampEncoder() tcodec.TimeEncoder

func ParseS3URL

func ParseS3URL(s3URL string) (bucket, key string, err error)

func PartitionTimeFromValues added in v1.10.0

func PartitionTimeFromValues(values []*string) (tm time.Time, err error)

PartitionTimeFromValues resolves the timebin from a glue partition's values

func RegisterExtensions added in v1.7.0

func RegisterExtensions(api jsoniter.API) jsoniter.API

func RegisterMapping added in v1.7.0

func RegisterMapping(from reflect.Type, to string) error

func RewriteFieldName added in v1.7.0

func RewriteFieldName(name string) string

func TableHasPartitions

func TableHasPartitions(client glueiface.GlueAPI, databaseName, tableName string) (hasData bool, err error)

func UpdatePartition

func UpdatePartition(client glueiface.GlueAPI, databaseName, tableName string,
	partitionValues []*string, storageDescriptor *glue.StorageDescriptor,
	parameters map[string]*string) (*glue.UpdatePartitionOutput, error)

Types

type Column added in v1.4.0

type Column struct {
	Name     string
	Type     string // this is the Glue type
	Comment  string
	Required bool
}

func InferJSONColumns added in v1.4.0

func InferJSONColumns(obj interface{}, customMappings ...CustomMapping) ([]Column, []string)

Walks the object creating Glue columns using JSON SerDe expected types. It allows passing custom mappings. It also returns a slice with the names of all nested fields. The names of the fields will be used to generate the appropriate field mappings in the SERDE properties. For more information see: https://aws.amazon.com/premiumsupport/knowledge-center/json-duplicate-key-error-athena-config/

type CustomMapping added in v1.4.0

type CustomMapping struct {
	From reflect.Type // type to map (result of reflect.TypeOf() )
	To   string       // glue type to emit
}

type GluePartition

type GluePartition struct {
	// contains filtered or unexported fields
}

A partition in Glue containing Panther data

func GetPartitionFromS3

func GetPartitionFromS3(s3Bucket, s3ObjectKey string) (*GluePartition, error)

Gets the partition from S3bucket and S3 object key info. The s3Object key is expected to be in the the format `{logs,rules}/{table_name}/year=d{4}/month=d{2}/[day=d{2}/][hour=d{2}/]/{S+}.json.gz` otherwise an error is returned.

func GetPartitionFromS3Path

func GetPartitionFromS3Path(s3Path string) (*GluePartition, error)

func (*GluePartition) GetDatabase

func (gp *GluePartition) GetDatabase() string

func (*GluePartition) GetGlueTableMetadata

func (gp *GluePartition) GetGlueTableMetadata() *GlueTableMetadata

func (*GluePartition) GetPartitionColumnsInfo

func (gp *GluePartition) GetPartitionColumnsInfo() []PartitionColumnInfo

func (*GluePartition) GetPartitionLocation

func (gp *GluePartition) GetPartitionLocation() string

func (*GluePartition) GetS3Bucket

func (gp *GluePartition) GetS3Bucket() string

func (*GluePartition) GetTable

func (gp *GluePartition) GetTable() string

func (*GluePartition) GetTime

func (gp *GluePartition) GetTime() time.Time

type GlueTableMetadata

type GlueTableMetadata struct {
	// contains filtered or unexported fields
}

Metadata about Glue table

func NewGlueTableMetadata

func NewGlueTableMetadata(
	dataType models.DataType, logType, logDescription string, timebin GlueTableTimebin, eventStruct interface{}) *GlueTableMetadata

Creates a new GlueTableMetadata object for Panther log sources

func (*GlueTableMetadata) CreateJSONPartition

func (gm *GlueTableMetadata) CreateJSONPartition(client glueiface.GlueAPI, t time.Time) (created bool, err error)

func (*GlueTableMetadata) CreateOrUpdateTable added in v1.4.0

func (gm *GlueTableMetadata) CreateOrUpdateTable(glueClient glueiface.GlueAPI, bucketName string) error

func (*GlueTableMetadata) DataType added in v1.4.0

func (gm *GlueTableMetadata) DataType() models.DataType

func (*GlueTableMetadata) DatabaseName

func (gm *GlueTableMetadata) DatabaseName() string

func (*GlueTableMetadata) Description

func (gm *GlueTableMetadata) Description() string

func (*GlueTableMetadata) EventStruct

func (gm *GlueTableMetadata) EventStruct() interface{}

func (*GlueTableMetadata) GetPartition

func (gm *GlueTableMetadata) GetPartition(client glueiface.GlueAPI, t time.Time) (output *glue.GetPartitionOutput, err error)

get partition, return nil if it does not exist

func (*GlueTableMetadata) GetPartitionPrefix

func (gm *GlueTableMetadata) GetPartitionPrefix(t time.Time) string

Based on Timebin(), return an S3 prefix for objects of this table

func (*GlueTableMetadata) HasPartitions

func (gm *GlueTableMetadata) HasPartitions(glueClient glueiface.GlueAPI) (bool, error)

func (*GlueTableMetadata) LogType

func (gm *GlueTableMetadata) LogType() string

func (*GlueTableMetadata) PartitionKeys

func (gm *GlueTableMetadata) PartitionKeys() (partitions []PartitionKey)

The partition keys for this table

func (*GlueTableMetadata) Prefix

func (gm *GlueTableMetadata) Prefix() string

All data for this table are stored in this S3 prefix

func (*GlueTableMetadata) RuleErrorTable added in v1.10.0

func (gm *GlueTableMetadata) RuleErrorTable() *GlueTableMetadata

func (*GlueTableMetadata) RuleTable added in v1.4.0

func (gm *GlueTableMetadata) RuleTable() *GlueTableMetadata

func (*GlueTableMetadata) Signature added in v1.4.0

func (gm *GlueTableMetadata) Signature() (string, error)

func (*GlueTableMetadata) SyncPartitions

func (gm *GlueTableMetadata) SyncPartitions(glueClient glueiface.GlueAPI, s3Client s3iface.S3API,
	startDate time.Time, deadline *time.Time) (*time.Time, error)

SyncPartitions updates a table's partitions using the latest table schema. Used when schemas change. If deadline is non-nil, it will stop when execution time has passed the deadline and will return the _next_ time period needing evaluation. Deadlines are used when this is called in Lambdas to avoid running past the lambda deadline.

func (*GlueTableMetadata) TableName

func (gm *GlueTableMetadata) TableName() string

func (*GlueTableMetadata) Timebin

func (gm *GlueTableMetadata) Timebin() GlueTableTimebin

type GlueTableTimebin

type GlueTableTimebin int

Use this to tag the time partitioning used in a GlueTableMetadata table

const (
	GlueTableMonthly GlueTableTimebin = iota + 1
	GlueTableDaily
	GlueTableHourly
)

func TimebinFromTable added in v1.10.0

func TimebinFromTable(tbl *glue.TableData) (GlueTableTimebin, error)

TimebinFromTable resolves the timebin from a table storage descriptor

func (GlueTableTimebin) Next

func (tb GlueTableTimebin) Next(t time.Time) (next time.Time)

Next returns the next time interval

func (GlueTableTimebin) PartitionFilter added in v1.10.0

func (tb GlueTableTimebin) PartitionFilter(start, end time.Time) string

PartitionFilter returns a partition filter expression

func (GlueTableTimebin) PartitionHasData

func (tb GlueTableTimebin) PartitionHasData(client s3iface.S3API, t time.Time, tableOutput *glue.GetTableOutput) (bool, error)

PartitionHasData checks if there is at least 1 S3 object in the partition

func (GlueTableTimebin) PartitionPathS3 added in v1.10.0

func (tb GlueTableTimebin) PartitionPathS3(t time.Time) (s3Path string)

PartitionPathS3 constructs the S3 path for this partition

func (GlueTableTimebin) PartitionValuesFromTime

func (tb GlueTableTimebin) PartitionValuesFromTime(t time.Time) (values []*string)

PartitionValuesFromTime returns an []*string values (used for Glue APIs)

func (GlueTableTimebin) PartitionsAfter added in v1.10.0

func (tb GlueTableTimebin) PartitionsAfter(tm time.Time) string

PartitionsAfter returns an expression to scan for partitions after tm see https://docs.aws.amazon.com/glue/latest/webapi/API_GetPartitions.html nolint:lll

func (GlueTableTimebin) PartitionsBefore added in v1.10.0

func (tb GlueTableTimebin) PartitionsBefore(tm time.Time) string

PartitionsBefore returns an expression to scan for partitions before tm see https://docs.aws.amazon.com/glue/latest/webapi/API_GetPartitions.html nolint:lll

func (GlueTableTimebin) PartitionsBetween added in v1.10.0

func (tb GlueTableTimebin) PartitionsBetween(start, end time.Time) string

PartitionsBetween returns an expression to scan for partitions between two timestamps see https://docs.aws.amazon.com/glue/latest/webapi/API_GetPartitions.html

func (GlueTableTimebin) S3PathLayout added in v1.10.0

func (tb GlueTableTimebin) S3PathLayout() string

S3PathLayout returns a go time layout to format/parse S3 paths for Glue partitions

func (GlueTableTimebin) TimeFromS3Path added in v1.10.0

func (tb GlueTableTimebin) TimeFromS3Path(path string) (time.Time, bool)

TimeFromS3Path converts an S3 path to time. The path must not contain any prefixes such as db/table name

func (GlueTableTimebin) Truncate added in v1.10.0

func (tb GlueTableTimebin) Truncate(t time.Time) time.Time

Truncate truncates the date to the time bin time unit

type PartitionColumnInfo

type PartitionColumnInfo struct {
	Key   string
	Value string
}

Contains information about partition columns

type PartitionKey

type PartitionKey struct {
	Name string
	Type string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL