azkustoingest

package module
v1.0.0-preview-5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2024 License: MIT Imports: 27 Imported by: 2

Documentation

Overview

Package azkustoingest provides a client for ingesting data into Azure Data Explorer (Kusto) clusters.

This package enables users to use different ingestion methods including queued, streaming, and managed ingestion from various sources such as local files, Azure Blob Storage urls, streams, or any `io.Reader`.

To start using this package, create an instance of the Ingestor, passing in a connection string built using the NewConnectionStringBuilder() function from the azkustodata package.

Example FromFile usage:

kcsb := azkustodata.NewConnectionStringBuilder(`endpoint`).WithAadAppKey("clientID", "clientSecret", "tenentID")
ingestor, err := azkustoingest.New(kcsb, azkustoingest.WithDefaultDatabase("database"), azkustoingest.WithDefaultTable("table"))

if err != nil {
	// Handle error
}

defer ingestor.Close() // Always close the ingestor when done.

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

_, err = ingestor.FromFile(ctx, "/path/to/file", azkustoingest.DeleteSource())

... // Handle any errors and status

The package supports advanced features such as status reporting to Kusto tables, file deletion after ingestion, and handling of retryable errors.

For complete documentation, please visit: https://github.com/Azure/azure-kusto-go https://pkg.go.dev/github.com/Azure/azure-kusto-go/azkustoingest

Index

Examples

Constants

View Source
const (
	FromFile SourceScope = 1 << iota
	FromReader
	FromBlob
	QueuedClient ClientScope = 1 << iota
	StreamingClient
	ManagedClient
)

Variables

This section is empty.

Functions

func GetErrorCode

func GetErrorCode(err error) (string, error)

GetErrorCode extracts the error code from an ingestion error

func IsRetryable

func IsRetryable(err error) bool

IsRetryable indicates whether there's any merit in retying ingestion

func IsStatusRecord

func IsStatusRecord(err error) bool

IsStatusRecord verifies that the given error is a status record.

func StatusFromMapForTests

func StatusFromMapForTests(data map[string]interface{}) error

StatusFromMapForTests converts an ingestion status record to a key value map. This is useful for comparison in tests.

Types

type ClientScope

type ClientScope uint

func (ClientScope) String

func (s ClientScope) String() string

type DataFormat

type DataFormat = properties.DataFormat

DataFormat indicates what type of encoding format was used for source data. Not all options can be used in every method.

const (
	// DFUnknown indicates the EncodingType is not set.
	DFUnknown DataFormat = properties.DFUnknown
	// AVRO indicates the source is encoded in Apache Avro format.
	AVRO DataFormat = properties.AVRO
	// ApacheAVRO indicates the source is encoded in Apache avro2json format.
	ApacheAVRO DataFormat = properties.ApacheAVRO
	// CSV indicates the source is encoded in comma seperated values.
	CSV DataFormat = properties.CSV
	// JSON indicates the source is encoded as one or more lines, each containing a record in Javascript Object Notation.
	JSON DataFormat = properties.JSON
	// MultiJSON indicates the source is encoded in JSON-Array of individual records in Javascript Object Notation. Optionally,
	//multiple documents can be concatenated.
	MultiJSON DataFormat = properties.MultiJSON
	// ORC indicates the source is encoded in Apache Optimized Row Columnar format.
	ORC DataFormat = properties.ORC
	// Parquet indicates the source is encoded in Apache Parquet format.
	Parquet DataFormat = properties.Parquet
	// PSV is pipe "|" separated values.
	PSV DataFormat = properties.PSV
	// Raw is a text file that has only a single string value.
	Raw DataFormat = properties.Raw
	// SCSV is a file containing semicolon ";" separated values.
	SCSV DataFormat = properties.SCSV
	// SOHSV is a file containing SOH-separated values(ASCII codepoint 1).
	SOHSV DataFormat = properties.SOHSV
	// SStream indicates the source is encoded as a Microsoft Cosmos Structured Streams format
	SStream DataFormat = properties.SStream
	// TSV is a file containing tab seperated values ("\t").
	TSV DataFormat = properties.TSV
	// TSVE is a file containing escaped-tab seperated values ("\t").
	TSVE DataFormat = properties.TSVE
	// TXT is a text file with lines ending with "\n".
	TXT DataFormat = properties.TXT
	// W3CLogFile indicates the source is encoded using W3C Extended Log File format
	W3CLogFile DataFormat = properties.W3CLogFile
	// SingleJSON indicates the source is a single JSON value -- newlines are regular whitespace.
	SingleJSON DataFormat = properties.SingleJSON
)

note: any change here needs to be kept up to date with the properties version. I'm not a fan of having two copies, but I don't think it is worth moving to its own package to allow properties and ingest to both import without a cycle.

func InferFormatFromFileName

func InferFormatFromFileName(fName string) DataFormat

InferFormatFromFileName looks at the file name and tries to discern what the file format is

type FailureStatusCode

type FailureStatusCode string

FailureStatusCode indicates the status of failed ingestion attempts

const (
	// Unknown represents an undefined or unset failure state
	Unknown FailureStatusCode = "Unknown"
	// Permanent represnets failure state that will benefit from a retry attempt
	Permanent FailureStatusCode = "Permanent"
	// Transient represnet a retryable failure state
	Transient FailureStatusCode = "Transient"
	// Exhausted represents a retryable failure that has exhusted all retry attempts
	Exhausted FailureStatusCode = "Exhausted"
)

func GetIngestionFailureStatus

func GetIngestionFailureStatus(err error) (FailureStatusCode, error)

GetIngestionFailureStatus extracts the ingestion failure code from an ingestion error

func (FailureStatusCode) IsRetryable

func (i FailureStatusCode) IsRetryable() bool

IsRetryable indicates whether there's any merit in retying ingestion

type FileOption

type FileOption interface {
	fmt.Stringer

	SourceScopes() SourceScope
	ClientScopes() ClientScope

	Run(p *properties.All, clientType ClientScope, sourceType SourceScope) error
}

FileOption is an optional argument to FromFile().

func ClientRequestId

func ClientRequestId(clientRequestId string) FileOption

ClientRequestId is an identifier for the ingestion, that can later be queried.

func CompressionType

func CompressionType(compressionType ingestoptions.CompressionType) FileOption

CompressionType sets the compression type of the data. Use this if the file name does not expose the compression type. This sets DontCompress to true for compressed data.

func Database

func Database(name string) FileOption

Database overrides the default database name.

func DeleteSource

func DeleteSource() FileOption

DeleteSource deletes the source file from when it has been uploaded to Kusto.

func DontCompress

func DontCompress() FileOption

func FileFormat

func FileFormat(et DataFormat) FileOption

FileFormat can be used to indicate what type of encoding is supported for the file. This is only needed if the file extension is not present. A file like: "input.json.gz" or "input.json" does not need this option, while "input" would. If an ingestion mapping is specified, there is no need to specify the file format.

func FlushImmediately

func FlushImmediately() FileOption

FlushImmediately the service batching manager will not aggregate this file, thus overriding the batching policy

func IfNotExists

func IfNotExists(ingestByTag string) FileOption

IfNotExists provides a string value that, if specified, prevents ingestion from succeeding if the table already has data tagged with an ingest-by: tag with the same value. This ensures idempotent data ingestion. For more information see: https://docs.microsoft.com/en-us/azure/kusto/management/extents-overview#ingest-by-extent-tags

func IgnoreFirstRecord

func IgnoreFirstRecord() FileOption

IgnoreFirstRecord tells Kusto to flush on write.

func IgnoreSizeLimit

func IgnoreSizeLimit() FileOption

IgnoreSizeLimit ignores the size limit for data ingestion.

func IngestionMapping

func IngestionMapping(mapping interface{}, format DataFormat) FileOption

IngestionMapping provides runtime mapping of the data being imported to the columns in the table. "mapping" will be JSON encoded, so it can be any type that can be JSON marshalled. If you pass a string or []byte, it will be interpreted as already being JSON encoded. The format parameter will automatically set the FileOption.Format option.

func IngestionMappingRef

func IngestionMappingRef(refName string, format DataFormat) FileOption

IngestionMappingRef provides the name of a pre-created mapping for the data being imported to the fields in the table. For more details, see: https://docs.microsoft.com/azure/kusto/management/create-ingestion-mapping-command The formatparameter will also automatically set the FileOption.Format option.

func RawDataSize

func RawDataSize(size int64) FileOption

RawDataSize is the uncompressed data size. Should be used to comunicate the file size to the service for efficient ingestion. Also used by managed client in the decision to use queued ingestion instead of streaming (if > 4mb)

func ReportResultToTable

func ReportResultToTable() FileOption

ReportResultToTable option requests that the ingestion status will be tracked in an Azure table. Note using Table status reporting is not recommended for high capacity ingestions, as it could slow down the ingestion. In such cases, it's recommended to enable it temporarily for debugging failed ingestions.

func SetCreationTime

func SetCreationTime(t time.Time) FileOption

SetCreationTime option allows the user to override the data creation time the retention policies are considered against If not set the data creation time is considered to be the time of ingestion

func Table

func Table(name string) FileOption

Table overrides the default table name.

func Tags

func Tags(tags []string) FileOption

Tags are tags to be associated with the ingested ata.

func ValidationPolicy

func ValidationPolicy(policy ValPolicy) FileOption

ValidationPolicy uses a ValPolicy to set our ingestion data validation policy. If not set, no validation policy is used. For more information, see: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/

type Ingestion

type Ingestion struct {
	// contains filtered or unexported fields
}

Ingestion provides data ingestion from external sources into Kusto.

func New

func New(kcsb *azkustodata.ConnectionStringBuilder, options ...Option) (*Ingestion, error)

New is a constructor for Ingestion.

func (*Ingestion) Close

func (i *Ingestion) Close() error

func (*Ingestion) FromFile

func (i *Ingestion) FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error)

FromFile allows uploading a data file for Kusto from either a local path or a blobstore URI path. This method is thread-safe.

Example
package main

import (
	"context"
	"github.com/Azure/azure-kusto-go/azkustodata"
	"github.com/Azure/azure-kusto-go/azkustoingest"
	"time"
)

func main() {
	var err error

	kcsb := azkustodata.NewConnectionStringBuilder(`endpoint`).WithAadAppKey("clientID", "clientSecret", "tenentID")

	ingestor, err := azkustoingest.New(kcsb, azkustoingest.WithDefaultDatabase("database"), azkustoingest.WithDefaultTable("table"))
	if err != nil {
		// Do something
	}
	// Closing the ingestor will not close the client (since the client may be used separately),
	//but it is still important to close the ingestor when you're done.
	defer ingestor.Close()

	// Setup a maximum time for completion to be 10 minutes.
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
	defer cancel()

	// Upload our file WITHOUT status reporting.
	// When completed, delete the file on local storage we are uploading.
	_, err = ingestor.FromFile(ctx, "/path/to/file", azkustoingest.DeleteSource())
	if err != nil {
		// The ingestion command failed to be sent, Do something
	}

	// Upload our file WITH status reporting.
	// When completed, delete the file on local storage we are uploading.
	status, err := ingestor.FromFile(ctx, "/path/to/file", azkustoingest.DeleteSource(), azkustoingest.ReportResultToTable())
	if err != nil {
		// The ingestion command failed to be sent, Do something
	}

	err = <-status.Wait(ctx)
	if err != nil {
		// the operation complete with an error
		if azkustoingest.IsRetryable(err) {
			// Handle reties
		} else {
			// inspect the failure
			// statusCode, _ := azkustoingest.GetIngestionStatus(err)
			// failureStatus, _ := azkustoingest.GetIngestionFailureStatus(err)
		}
	}
}
Output:

func (*Ingestion) FromReader

func (i *Ingestion) FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error)

FromReader allows uploading a data file for Kusto from an io.Reader. The content is uploaded to Blobstore and ingested after all data in the reader is processed. Content should not use compression as the content will be compressed with gzip. This method is thread-safe.

type Ingestor

type Ingestor interface {
	io.Closer
	FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error)
	FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error)
}

type Managed

type Managed struct {
	// contains filtered or unexported fields
}

func NewManaged

func NewManaged(kcsb *azkustodata.ConnectionStringBuilder, options ...Option) (*Managed, error)

NewManaged is a constructor for Managed.

func (*Managed) Close

func (m *Managed) Close() error

func (*Managed) FromFile

func (m *Managed) FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error)

func (*Managed) FromReader

func (m *Managed) FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error)

type Option

type Option func(s *Ingestion)

Option is an optional argument to New().

func WithCustomIngestConnectionString

func WithCustomIngestConnectionString(kcsb *azkustodata.ConnectionStringBuilder) Option

WithCustomIngestConnectionString is relevant to Managed ingestion client only. It configures the ingest client using a custom connection string, as opposed to one derived from the streaming client. This option implies WithoutEndpointCorrection().

func WithDefaultDatabase

func WithDefaultDatabase(db string) Option

WithDefaultDatabase configures the ingest client to use the given database name as the default database for all ingest operations.

func WithDefaultTable

func WithDefaultTable(table string) Option

WithDefaultTable configures the ingest client to use the given table name as the default table for all ingest operations.

func WithStaticBuffer

func WithStaticBuffer(bufferSize int, maxBuffers int) Option

WithStaticBuffer configures the ingest client to upload data to Kusto using a set of one or more static memory buffers with a fixed size. Only relevant for Queued and Managed ingestion.

func WithoutEndpointCorrection

func WithoutEndpointCorrection() Option

WithoutEndpointCorrection disables the automatic correction of the Kusto cluster address. The address will be used as-is, without adding or removing the "ingest-" prefix.

type QueryClient

type QueryClient interface {
	io.Closer
	Auth() azkustodata.Authorization
	Endpoint() string
	Query(ctx context.Context, db string, query azkustodata.Statement, options ...azkustodata.QueryOption) (query.Dataset, error)
	Mgmt(ctx context.Context, db string, query azkustodata.Statement, options ...azkustodata.QueryOption) (v1.Dataset, error)
	IterativeQuery(ctx context.Context, db string, query azkustodata.Statement, options ...azkustodata.QueryOption) (query.IterativeDataset, error)
	HttpClient() *http.Client
	ClientDetails() *azkustodata.ClientDetails
}

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result provides a way for users track the state of ingestion jobs.

func (*Result) Wait

func (r *Result) Wait(ctx context.Context) chan error

Wait returns a channel that can be checked for ingestion results. In order to check actual status please use the ReportResultToTable option when ingesting data.

type SourceScope

type SourceScope uint

func (SourceScope) String

func (s SourceScope) String() string

type StatusCode

type StatusCode string

StatusCode is the ingestion status

const (
	// Pending status represents a temporary status.
	// Might change during the course of ingestion based on the
	// outcome of the data ingestion operation into Kusto.
	Pending StatusCode = "Pending"
	// Succeeded status represents a permanent status.
	// The data has been successfully ingested to Kusto.
	Succeeded StatusCode = "Succeeded"
	// Failed Status represents a permanent status.
	// The data has not been ingested to Kusto.
	Failed StatusCode = "Failed"
	// Queued status represents a permanent status.
	// The data has been queued for ingestion &  status tracking was not requested.
	// (This does not indicate that the ingestion was successful.)
	Queued StatusCode = "Queued"
	// Skipped status represents a permanent status.
	// No data was supplied for ingestion. The ingest operation was skipped.
	Skipped StatusCode = "Skipped"
	// PartiallySucceeded status represents a permanent status.
	// Part of the data was successfully ingested to Kusto, while other parts failed.
	PartiallySucceeded StatusCode = "PartiallySucceeded"

	// StatusRetrievalFailed means the client ran into truble reading the status from the service
	StatusRetrievalFailed StatusCode = "StatusRetrievalFailed"
	// StatusRetrievalCanceled means the user canceld the status check
	StatusRetrievalCanceled StatusCode = "StatusRetrievalCanceled"
)

func GetIngestionStatus

func GetIngestionStatus(err error) (StatusCode, error)

GetIngestionStatus extracts the ingestion status code from an ingestion error

func (StatusCode) IsFinal

func (i StatusCode) IsFinal() bool

IsFinal returns true if the ingestion status is a final status, or false if the status is temporary

func (StatusCode) IsSuccess

func (i StatusCode) IsSuccess() bool

IsSuccess returns true if the status code is a final successfull status code

type Streaming

type Streaming struct {
	// contains filtered or unexported fields
}

Streaming provides data ingestion from external sources into Kusto.

func NewStreaming

func NewStreaming(kcsb *azkustodata.ConnectionStringBuilder, options ...Option) (*Streaming, error)

NewStreaming is the constructor for Streaming. More information can be found here: https://docs.microsoft.com/en-us/azure/kusto/management/create-ingestion-mapping-command

func (*Streaming) Close

func (i *Streaming) Close() error

func (*Streaming) FromFile

func (i *Streaming) FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error)

FromFile allows uploading a data file for Kusto from either a local path or a blobstore URI path. This method is thread-safe.

func (*Streaming) FromReader

func (i *Streaming) FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error)

FromReader allows uploading a data file for Kusto from an io.Reader. The content is uploaded to Blobstore and ingested after all data in the reader is processed. Content should not use compression as the content will be compressed with gzip. This method is thread-safe.

type ValPolicy

type ValPolicy struct {
	// Options provides an option that will flag data that does not validate.
	Options ValidationOption `json:"ValidationOptions"`
	// Implications sets what to do when a policy option is violated.
	Implications ValidationImplication `json:"ValidationImplications"`
}

ValPolicy sets a policy for validating data as it is sent for ingestion. For more information, see: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/

type ValidationImplication

type ValidationImplication int8

ValidationImplication is a setting used to indicate what to do when a Validation Policy is violated. These are defined as constants within this package.

const (
	// FailIngestion indicates that any violation of the ValidationPolicy will cause the entire ingestion to fail.
	FailIngestion ValidationImplication = 0
	// IgnoreFailures indicates that failure of the ValidationPolicy will be ignored.
	IgnoreFailures ValidationImplication = 1
)

type ValidationOption

type ValidationOption int8

ValidationOption is an an option for validating the ingestion input data. These are defined as constants within this package.

const (
	// VOUnknown indicates that a ValidationOption was not set.
	VOUnknown ValidationOption = 0
	// SameNumberOfFields indicates that all records ingested must have the same number of fields.
	SameNumberOfFields ValidationOption = 1
	// IgnoreNonDoubleQuotedFields indicates that fields that do not have double quotes should be ignored.
	IgnoreNonDoubleQuotedFields ValidationOption = 2
)

Directories

Path Synopsis
internal
gzip
Package gzip provides a streaming object for taking in io.ReadCloser that is being written to and providing an io.ReadCloser that outputs the original content gzip compressed.
Package gzip provides a streaming object for taking in io.ReadCloser that is being written to and providing an io.ReadCloser that outputs the original content gzip compressed.
properties
Package properties provides Kusto REST properties that will need to be serialized and sent to Kusto based upon the type of ingestion we are doing.
Package properties provides Kusto REST properties that will need to be serialized and sent to Kusto based upon the type of ingestion we are doing.
queued
Package filesystem provides a client with the ability to import data into Kusto via a variety of fileystems such as local storage or blobstore.
Package filesystem provides a client with the ability to import data into Kusto via a variety of fileystems such as local storage or blobstore.
resources
Package resources contains objects that are used to gather information about Kusto resources that are used during various ingestion methods.
Package resources contains objects that are used to gather information about Kusto resources that are used during various ingestion methods.
utils
Package resources contains objects that are used to gather information about Kusto resources that are used during various ingestion methods.
Package resources contains objects that are used to gather information about Kusto resources that are used during various ingestion methods.
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL