Documentation
¶
Overview ¶
Package ingest provides data ingestion from various external sources into Kusto.
For more information on Kusto Data Ingestion, please see: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/
Create a client ¶
Creating a client simply requires a *kusto.Client, the name of the database and the name of the table to be ingested into.
in, err := kusto.New(kustoClient, "database", "table") if err != nil { panic("add error handling") }
Ingestion from a local file ¶
Ingesting a local file requires simply passing the path to the file to be ingested:
if err := in.FromFile(ctx, "/path/to/a/local/file"); err != nil { panic("add error handling") }
FromFile() will accept Unix path names on Unix platforms and Windows path names on Windows platforms. The file will not be deleted after upload (there is an option that will allow that though).
Ingestion from an Azure Blob Storage file ¶
This package will also accept ingestion from an Azure Blob Storage file:
if err := in.FromFile(ctx, "https://myaccount.blob.core.windows.net/$root/myblob"); err != nil { panic("add error handling") }
This will ingest a file from Azure Blob Storage. We only support https:// paths and your domain name may differ than what is here.
Ingestion from a Stream ¶
Instestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(ctx , jsonEncodedData, ingest.JSON, "mappingName"); err != nil { panic("add error handling") }
Index ¶
- Variables
- type DataFormat
- type FileOption
- func DeleteSource() FileOption
- func FileFormat(et DataFormat) FileOption
- func FlushImmediately() FileOption
- func IfNotExists(ingestByTag string) FileOption
- func IgnoreSizeLimit() FileOption
- func IngestionMapping(mapping interface{}, mappingKind DataFormat) FileOption
- func IngestionMappingRef(refName string, mappingKind DataFormat) FileOption
- func Tags(tags []string) FileOption
- func ValidationPolicy(policy ValPolicy) FileOption
- type Ingestion
- type ValPolicy
- type ValidationImplication
- type ValidationOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTooLarge indicates that the data being passed to a StreamBlock is larger than the maximum StreamBlock size of 4MiB. ErrTooLarge = errors.ES(errors.OpIngestStream, errors.KClientArgs, "cannot add data larger than 4MiB") )
Functions ¶
This section is empty.
Types ¶
type DataFormat ¶
type DataFormat = properties.DataFormat
DataFormat indicates what type of encoding format was used for source data. Not all options can be used in every method.
const ( // DFUnknown indicates the EncodingType is not set. DFUnknown DataFormat = 0 // CSV indicates the source is encoded in comma seperated values. CSV DataFormat = 1 // JSON indicates the source is encoded in Javscript Object Notation. JSON DataFormat = 2 // AVRO indicates the source is encoded in Apache Avro format. AVRO DataFormat = 3 // Parquet indicates the source is encoded in Apache Parquet format. Parquet DataFormat = 4 // ORC indicates the source is encoded in Apache Optimized Row Columnar format. ORC DataFormat = 5 // PSV is pipe "|" separated values. PSV DataFormat = 6 // Raw is a text file that has only a single string value. Raw DataFormat = 7 // SCSV is a file containing semicolon ";" separated values. SCSV DataFormat = 8 // SOHSV is a file containing SOH-separated values(ASCII codepont 1). SOHSV DataFormat = 9 // TSV is a file containing table seperated values ("\t"). TSV DataFormat = 10 // TXT is a text file with lines deliminated by "\n". TXT DataFormat = 11 )
note: any change here needs to be kept up to date with the properties version. I'm not a fan of having two copies, but I don't think it is worth moving to its own package to allow properties and ingest to both import without a cycle.
type FileOption ¶
type FileOption interface {
// contains filtered or unexported methods
}
FileOption is an optional argument to FromFile().
func DeleteSource ¶
func DeleteSource() FileOption
DeleteSource deletes the source file from when it has been uploaded to Kusto.
func FileFormat ¶
func FileFormat(et DataFormat) FileOption
FileFormat can be used to indicate what type of encoding is supported for the file. This is only needed if the file extension is not present. A file like: "input.json.gz" or "input.json" does not need this option, while "input" would.
func FlushImmediately ¶
func FlushImmediately() FileOption
FlushImmediately tells Kusto to flush on write.
func IfNotExists ¶
func IfNotExists(ingestByTag string) FileOption
IfNotExists provides a string value that, if specified, prevents ingestion from succeeding if the table already has data tagged with an ingest-by: tag with the same value. This ensures idempotent data ingestion. For more information see: https://docs.microsoft.com/en-us/azure/kusto/management/extents-overview#ingest-by-extent-tags
func IgnoreSizeLimit ¶
func IgnoreSizeLimit() FileOption
IgnoreSizeLimit ignores the size limit for data ingestion.
func IngestionMapping ¶
func IngestionMapping(mapping interface{}, mappingKind DataFormat) FileOption
IngestionMapping provides runtime mapping of the data being imported to the fields in the table. "ref" will be JSON encoded, so it can be any type that can be JSON marshalled. If you pass a string or []byte, it will be interpreted as already being JSON encoded. mappingKind can only be: CSV, JSON, AVRO, Parquet or ORC.
func IngestionMappingRef ¶
func IngestionMappingRef(refName string, mappingKind DataFormat) FileOption
IngestionMappingRef provides the name of a pre-created mapping for the data being imported to the fields in the table. mappingKind can only be: CSV, JSON, AVRO, Parquet or ORC. For more details, see: https://docs.microsoft.com/en-us/azure/kusto/management/create-ingestion-mapping-command
func Tags ¶
func Tags(tags []string) FileOption
Tags are tags to be associated with the ingested ata.
func ValidationPolicy ¶
func ValidationPolicy(policy ValPolicy) FileOption
ValidationPolicy uses a ValPolicy to set our ingestion data validation policy. If not set, no validation policy is used. For more information, see: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/
type Ingestion ¶
type Ingestion struct {
// contains filtered or unexported fields
}
Ingestion provides data ingestion from external sources into Kusto.
func (*Ingestion) FromFile ¶
FromFile allows uploading a data file for Kusto from either a local path or a blobstore URI path. This method is thread-safe.
Example ¶
package main import ( "context" "time" "github.com/Azure/azure-kusto-go/kusto" "github.com/Azure/azure-kusto-go/kusto/ingest" "github.com/Azure/go-autorest/autorest/azure/auth" ) func main() { var err error authConfig := auth.NewClientCredentialsConfig("clientID", "clientSecret", "tenantID") /* Alteratively, you could so something like: authorizer, err := auth.NewMSIConfig().Authorizer() or authorizer, err := auth.NewAuthorizerFromEnvironment() or auth.New...() then kusto.Authorization{Authorizer: authorizer} */ client, err := kusto.New("endpoint", kusto.Authorization{Config: authConfig}) if err != nil { // Do something } ingestor, err := ingest.New(client, "database", "table") if err != nil { // Do something } // Setup a maximum time for completion to be 10 minutes. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() // Upload our file. When completed, delete the file on local storage we are uploading. if err := ingestor.FromFile(ctx, "/path/to/file", ingest.DeleteSource()); err != nil { // Do something } }
Output:
func (*Ingestion) Stream ¶
func (i *Ingestion) Stream(ctx context.Context, payload []byte, format DataFormat, mappingName string) error
Stream takes a payload that is encoded in format with a server stored mappingName, compresses it and uploads it to Kusto. payload must be a fully formed entry of format and < 4MiB or this will fail. We currently support CSV, TSV, SCSV, SOHSV, PSV, JSON and AVRO. If using JSON or AVRO, you must provide a mappingName that references the name of the pre-created ingestion mapping defined on the table. Otherwise mappingName can be an empty string. More information can be found here: https://docs.microsoft.com/en-us/azure/kusto/management/create-ingestion-mapping-command The context object can be used with a timeout or cancel to limit the request time.
type ValPolicy ¶
type ValPolicy struct { // Options provides an option that will flag data that does not validate. Options ValidationOption `json:"ValidationOptions"` // Implications sets what to do when a policy option is violated. Implications ValidationImplication `json:"ValidationImplications"` }
ValPolicy sets a policy for validating data as it is sent for ingestion. For more information, see: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/
type ValidationImplication ¶
type ValidationImplication int8
ValidationImplication is a setting used to indicate what to do when a Validation Policy is violated. These are defined as constants within this package.
const ( // FailIngestion indicates that any violation of the ValidationPolicy will cause the entire ingestion to fail. FailIngestion ValidationImplication = 0 // IgnoreFailures indicates that failure of the ValidationPolicy will be ignored. IgnoreFailures ValidationImplication = 1 )
type ValidationOption ¶
type ValidationOption int8
ValidationOption is an an option for validating the ingestion input data. These are defined as constants within this package.
const ( // VOUnknown indicates that a ValidationOption was not set. VOUnknown ValidationOption = 0 // SameNumberOfFields indicates that all records ingested must have the same number of fields. SameNumberOfFields ValidationOption = 1 // IgnoreNonDoubleQuotedFields indicates that fields that do not have double quotes should be ignored. IgnoreNonDoubleQuotedFields ValidationOption = 2 )
Directories
¶
Path | Synopsis |
---|---|
internal
|
|
conn
Package conn holds a streaming ingest connetion.
|
Package conn holds a streaming ingest connetion. |
filesystem
Package filesystem provides a client with the ability to import data into Kusto via a variety of fileystems such as local storage or blobstore.
|
Package filesystem provides a client with the ability to import data into Kusto via a variety of fileystems such as local storage or blobstore. |
gzip
Package gzip provides a streaming object for taking in io.ReadCloser that is being written to and providing an io.ReadCloser that outputs the original content gzip compressed.
|
Package gzip provides a streaming object for taking in io.ReadCloser that is being written to and providing an io.ReadCloser that outputs the original content gzip compressed. |
properties
Package properties provides Kusto REST properties that will need to be serialized and sent to Kusto based upon the type of ingestion we are doing.
|
Package properties provides Kusto REST properties that will need to be serialized and sent to Kusto based upon the type of ingestion we are doing. |
resources
Package resources contains objects that are used to gather information about Kusto resources that are used during various ingestion methods.
|
Package resources contains objects that are used to gather information about Kusto resources that are used during various ingestion methods. |