kapacitor

package module
v0.1.1-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2015 License: MIT Imports: 33 Imported by: 0

README

Kapacitor Circle CI

Open source framework for processing, monitoring, and alerting on time series data

Getting Started

This README gives you a high level overview of what Kapacitor is and what its like to use it. As well as some details of how it works. To get starting using Kapacitor see this guide.

Workflows

There are two different ways to consume Kapacitor.

  1. Define tasks that process streams of data. This method provides low latency (order of 100ms) processing but without aggregations or anything, just the raw data stream.
  2. Define tasks that process batches of data. The batches are the results of scheduled queries. This method is higher latency (order of 10s) but allows for aggregations or anything else you can do with a query.

Stream workflow

  1. Start Kapacitor

    $ kapacitord
    
  2. Start a data stream. Configure telegraf with an output to Kapacitor.

  3. Create a replayable snapshot

  • Select data from an existing InfluxDB host and save it:

    $ kapacitor record query -type stream -query 'select value from cpu_idle where time > start and time < stop'
    b6d1de3f-b27f-4420-96ee-b0365d859d1c
    
  • Or record the live stream for a bit:

    $ kapacitor record stream -duration 60s
    b6d1de3f-b27f-4420-96ee-b0365d859d1c
    
  1. Define a Kapacitor stream task. A stream task is an entity that defines what data should be processed and how.

    $ kapacitor define \
        -type stream \
        -name alert_cpu_idle_any_host \
        -tick path/to/tickscript
    
  2. Replay the recording to test the task.

    $ kapacitor replay \
        -id b6d1de3f-b27f-4420-96ee-b0365d859d1c \
        -name alert_cpu_idle_any_host
    
  3. Edit the stream and test until its working

    $ kapacitor define \
        -type stream \
        -name alert_cpu_idle_any_host \
        -tick path/to/tickscript
    $ kapacitor replay \
        -id b6d1de3f-b27f-4420-96ee-b0365d859d1c \
        -name alert_cpu_idle_any_host
    
  4. Enable or push the stream once you are satisfied that it is working

    $ # enable the stream locally
    $ kapacitor enable alert_cpu_idle_any_host
    $ # or push the tested stream to a prod server
    $ kapacitor push -remote http://address_to_remote_kapacitor alert_cpu_idle_any_host
    

Batch workflow

  1. Start Kapacitor

    $ kapacitord
    
  2. Define a batch task . Like a stream a batch task defines what data to process and how, only it operates on batches of data instead of streams.

    $ kapacitor define \
        -type batch \
        -name alert_mean_cpu_idle_logs_by_dc \
        -tick path/to/tickscript
    
  3. Save a batch of data for replaying using the definition in the batch.

    $ kapacitor record batch alert_mean_cpu_idle_logs_by_dc
    e6d1de3f-b27f-4420-96ee-b0365d859d1c
    
  4. Replay the batch of data to the batch.

    $ kapacitor replay \
        -id e6d1de3f-b27f-4420-96ee-b0365d859d1c \
        -name alert_mean_cpu_idle_logs_by_dc
    
  5. Iterate on the batch definition until it works

    $ kapacitor define batch \
        -type batch \
        -name alert_mean_cpu_idle_logs_by_dc \
        -tick path/to/tickscript
    $ kapacitor replay \
        -id e6d1de3f-b27f-4420-96ee-b0365d859d1c \
        -name alert_mean_cpu_idle_logs_by_dc
    
  6. Once it works, enable locally or push to remote

    $ # enable the batch locally
    $ kapacitor enable alert_mean_cpu_idle_logs_by_dc
    $ # or push the tested batch to a prod server
    $ kapacitor push -remote http://address_to_remote_kapacitor alert_mean_cpu_idle_logs_by_dc
    

Data processing with pipelines

Processing data follows a pipeline and depending on the processing needs that pipeline can vary significantly. Kapacitor models the different data processing pipelines as a DAGs (Directed Acyclic Graphs) and allows the user to specify the structure of the DAG via a DSL.

Kapacitor allows you to define the DAG implicitly via operators and invocation chaining in an pipeline API. Similar to how Flink and Spark work.

TICKscript DSL

Kapacitor uses a DSL called TICKscript to define the DAG so that you are not required to write and compile Go code.

The following is an example TICKscript that triggers an alert if idle cpu drops below 30%. In the language the variable stream represents the stream of values.

stream
  .from("cpu_idle")
    .where("cpu = 'cpu-total'")
  .window()
    .period(10s)
    .every(5s)
  .mapReduce(influxql.mean("value"))
  .alert()
    .crit("value < 30")
    .email("oncall@example.com")

This script maintains a window of data for 10s and emits the current window every 5s. Then the average is calculated for each emitted window. Finally all values less than 30 pass through the where condition and make it to the alert node, which triggers the alert by sending an email.

The DAG that is constructed from the script looks like this:

![Alt text](http://g.gravizo.com/g? digraph G { rankdir=LR; stream -> window; window -> mean[label="every 10s"]; mean -> alert [label="<30"]; } )

Based on how the DAG is constructed you can use the TICKscript language to both construct the DAG and define what each node does via built-in functions. Notice how the map function took an argument of another function influxql.mean, this is an example of a built-in function that can be used to process the data stream. It will also be possible to define your own functions via plugins to Kapacitor and reference them in the DSL.

By adding groupBy statements we can see how to easily partition our data set and process each group independently.

stream
  .groupBy("dc")
  .window()
    .period(10s)
    .every(5s)
  .mapReduce(influxql.mean("value"))
  .alert()
    .crit("value < 30")
    .email("oncall@example.com")
TICKscript and batch processing

Batch processors work similarly to the stream processing.

Example TICKscript for batchs where we are running a query every minute and want to alert on cpu. The query: select mean(value) from cpu_idle group by dc, time(1m).

batch
  .query("select max(value) from cpu_idle where role = 'logs'")
  .period(15m)
  // or  .cron("*/15 * * * 0")
  .groupBy(time(1h), "dc")
  .alert()
  .crit("value < 30")
  .email("oncall@example.com")

The main difference is instead of a stream object we start with a batch object. Since batches are already windowed there is not need to define a new window.

What you can do with TICKscript
  • Define the DAG for your data pipeline needs.
  • Window data. Windowing can be done by time or by number of data points and various other conditions, see this.
  • Aggregate data. The list of aggregation functions currently supported by InfluxQL is probably a good place to start.
  • Transform data via built-in functions.
  • Transform data via custom functions.
  • Filter down streams/batches of data.
  • Emit data into a new stream.
  • Emit data into an InfluxDB database.
  • Trigger events/notifications.
What you can NOT do with TICKscript
  • Define custom functions in the language. You can call out to custom functions defined via a plugins but you cannot define the function itself within the DSL. The DSL will be too slow to actually process any of the data but is used simply to define the data flow.
Example TICKscripts

Several examples demonstrating various features of Kapacitor follow:

Setup a dead man's switch

If your stream stops sending data this may be serious cause for concern. Setting up a 'dead man's switch' is quite simple:

//Create dead man's switch
stream
  .fork()
  .window()
    .period(1m)
    .every(1m)
  .mapReduce(influxql.count("value"))
  .alert()
    .crit("count = 0")
    .email("oncall@example.com")

//Now define normal processing on the stream
stream
  .fork()
  ...
Setup flapping detection on alerts
stream
  .window()
  ...
  .alert()
    .crit("true")
    .flapping(0.2, 0.5)
    .email("oncall@example.com")
Aggregate alerts

If you are monitoring lots of stats for a service across multiple hosts and the host dies you would rather get a single alert that the host is dead and not 10 alerts for each stat.

For example say we are monitoring a redis cluster and we have stats cpu and redis.instantaneous_ops_per_sec. Using the following script we setup alerts for each host if cpu get too high or if redis.instantaneous_ops_per_sec drops too low.

// Alert on redis stats
var redis = stream.fork()
  .from("redis")
  .where("instantaneous_ops_per_sec < 10")
  .groupBy("host")
  .alert()
    .crit("true")
    .email("oncall@example.com")

var cpu = stream.fork()
  .from("cpu")
  .where("idle < 20")
  .groupBy("host")
  .alert()
    .crit("true")
    .email("oncall@example.com")

Now lets say we want to combine the alerts so that if a either alert triggers we can send them in the same alert.

var redis = stream
  .from("redis")
  .where("instantaneous_ops_per_sec < 10")

var cpu = stream
  .from("cpu")
  .where("idle < 20")

redis.union(cpu)
  .groupBy("host")
  .window()
    .period(10s)
    .every(10s)
  .alert()
    .crit("true")
    .email("oncall@example.com")

This will aggregate the union of all alerts every 10s by host. Then it will send out one alert with the aggregate information. You could easily add more alerts to the union like so.

redis.union(cpu, mem, disk, ...)

Or if you wanted to group by service instead of host just change the groupBy field.

redis.union(cpu)
  .groupBy("service")
  ...
Persisting results

Now lets say we want to perform some custom processing on a stream of data and then keep the resulting time series data.

stream
    ... //Define custom processing pipeline
    .influxDBOut()
        .database("mydb")
        .retentionPolicy("myrp")
        .measurement("m")

Or you simply need to keep the data cached so you can request it when you need it.

stream
    ... //Define custom processing pipeline
    .httpOut("custom_data_set")

Now you can make a request to http://kapacitorhost:9092/api/v1/<task_name>/custom_data_set. The data returned will be the current value of the result.

Using custom functions

What about when you want to do something not built-in to Kapacitor? Simply load your custom functions in the DSL like so:

var fMap = loadMapFunc("./mapFunction.py")
var fReduce = loadReduceFunc("./reduceFunction.py")
stream
	.from("cpu")
	.where("host", "=", "serverA")
	.window()
		.period(1s)
		.every(1s)
	.map(fMap("idle"))
	.reduce(fReduce)
	.httpOut("http://example.com/path")

The mapFunction.py and reduceFunction.py files contain python scripts that read data on an incoming stream perform their function and output the result. More on how to write these custom functions later...

Documentation

Overview

A data pipeline processing engine.

See the README for more complete examples and guides.

Code Organization:

The pipeline package provides an API for how nodes can be connected to form a pipeline. The individual implementations of each node exist in this kapacitor package. The reason for the separation is to keep the exported API from the pipeline package clean as it is consumed via the TICKscripts (a DSL for Kapacitor).

Other Concepts:

Stream vs Batch -- Use of the word 'stream' indicates data arrives a single data point at a time. Use of the word 'batch' indicates data arrives in sets or batches or data points.

Task -- A task represents a concrete workload to perform. It consists of a pipeline and an identifying name. Basic CRUD operations can be performed on tasks.

Task Master -- Responsible for executing a task in a specific environment.

Replay -- Replays static datasets against tasks.

Index

Constants

View Source
const (
	// List of names for top-level exported vars
	ClusterIDVarName = "cluster_id"
	ServerIDVarName  = "server_id"
	HostVarName      = "host"
	ProductVarName   = "product"
	VersionVarName   = "version"

	NumTasksVarName         = "num_tasks"
	NumEnabledTasksVarName  = "num_enabled_tasks"
	NumSubscriptionsVarName = "num_subscriptions"

	// The name of the product
	Product = "kapacitor"
)

Variables

View Source
var (
	// Global expvars
	NumTasks         = &expvar.Int{}
	NumEnabledTasks  = &expvar.Int{}
	NumSubscriptions = &expvar.Int{}
)
View Source
var ErrWrongTaskType = errors.New("wrong task type")

Functions

func CreateDBRPMap

func CreateDBRPMap(dbrps []DBRP) map[DBRP]bool

func EvalPredicate

func EvalPredicate(se *tick.StatefulExpr, fields models.Fields, tags map[string]string) (bool, error)

Evaluate a given expression as a boolean predicate against a set of fields and tags

func GetFloatVar

func GetFloatVar(name string) float64

Gets an exported var and returns its float value

func GetIntVar

func GetIntVar(name string) int64

Gets an exported var and returns its int value

func GetStatsData

func GetStatsData() ([]client.StatsData, error)

Return all stats data from the expvars.

func GetStringVar

func GetStringVar(name string) string

Gets an exported var and returns its unquoted string contents

func NewStatistics

func NewStatistics(name string, tags map[string]string) *expvar.Map

NewStatistics creates an expvar-based map. Within there "name" is the Measurement name, "tags" are the tags, and values are placed at the key "values". The "values" map is returned so that statistics can be set.

func WriteBatchForRecording

func WriteBatchForRecording(w io.Writer, b models.Batch) error

func WritePointForRecording

func WritePointForRecording(w io.Writer, p models.Point, precision string) error

Types

type AlertData

type AlertData struct {
	ID      string          `json:"id"`
	Message string          `json:"message"`
	Time    time.Time       `json:"time"`
	Level   AlertLevel      `json:"level"`
	Data    influxql.Result `json:"data"`
}

type AlertHandler

type AlertHandler func(ad *AlertData)

type AlertLevel

type AlertLevel int
const (
	OKAlert AlertLevel = iota
	InfoAlert
	WarnAlert
	CritAlert
)

func (AlertLevel) MarshalText

func (l AlertLevel) MarshalText() ([]byte, error)

func (AlertLevel) String

func (l AlertLevel) String() string

type AlertNode

type AlertNode struct {
	// contains filtered or unexported fields
}

func (*AlertNode) Err

func (n *AlertNode) Err() error

type BatchCollector

type BatchCollector interface {
	CollectBatch(models.Batch) error
	Close()
}

type BatchNode

type BatchNode struct {
	// contains filtered or unexported fields
}

func (*BatchNode) DBRPs

func (b *BatchNode) DBRPs() ([]DBRP, error)

Return list of databases and retention policies the batcher will query.

func (*BatchNode) Err

func (n *BatchNode) Err() error

func (*BatchNode) Queries

func (b *BatchNode) Queries(start, stop time.Time) []string

func (*BatchNode) Start

func (b *BatchNode) Start(batch BatchCollector)

type DBRP

type DBRP struct {
	Database        string `json:"db"`
	RetentionPolicy string `json:"rp"`
}

func (DBRP) String

func (d DBRP) String() string

type DerivativeNode

type DerivativeNode struct {
	// contains filtered or unexported fields
}

func (*DerivativeNode) Err

func (n *DerivativeNode) Err() error

type Edge

type Edge struct {
	// contains filtered or unexported fields
}

func (*Edge) Close

func (e *Edge) Close()

func (*Edge) CollectBatch

func (e *Edge) CollectBatch(b models.Batch) (err error)

func (*Edge) CollectMaps

func (e *Edge) CollectMaps(m *MapResult) (err error)

func (*Edge) CollectPoint

func (e *Edge) CollectPoint(p models.Point) (err error)

func (*Edge) Next

func (e *Edge) Next() (p models.PointInterface, ok bool)

func (*Edge) NextBatch

func (e *Edge) NextBatch() (b models.Batch, ok bool)

func (*Edge) NextMaps

func (e *Edge) NextMaps() (m *MapResult, ok bool)

func (*Edge) NextPoint

func (e *Edge) NextPoint() (p models.Point, ok bool)

type EvalNode

type EvalNode struct {
	// contains filtered or unexported fields
}

func (*EvalNode) Err

func (n *EvalNode) Err() error

type ExecutingTask

type ExecutingTask struct {
	Task *Task
	// contains filtered or unexported fields
}

A task that is ready for execution.

func NewExecutingTask

func NewExecutingTask(tm *TaskMaster, t *Task) (*ExecutingTask, error)

Create a new task from a defined kapacitor.

func (*ExecutingTask) BatchCount

func (et *ExecutingTask) BatchCount() (int, error)

func (*ExecutingTask) BatchQueries

func (et *ExecutingTask) BatchQueries(start, stop time.Time) ([][]string, error)

Get the next `num` batch queries that the batcher will run starting at time `start`.

func (*ExecutingTask) Err

func (et *ExecutingTask) Err() error

Wait till the task finishes and return any error

func (*ExecutingTask) GetOutput

func (et *ExecutingTask) GetOutput(name string) (Output, error)

Get a named output.

func (*ExecutingTask) StartBatching

func (et *ExecutingTask) StartBatching() error

Instruct source batch node to start querying and sending batches of data

type GroupByNode

type GroupByNode struct {
	// contains filtered or unexported fields
}

func (*GroupByNode) Err

func (n *GroupByNode) Err() error

type HTTPOutNode

type HTTPOutNode struct {
	// contains filtered or unexported fields
}

func (*HTTPOutNode) Endpoint

func (h *HTTPOutNode) Endpoint() string

func (*HTTPOutNode) Err

func (n *HTTPOutNode) Err() error

type InfluxDBOutNode

type InfluxDBOutNode struct {
	// contains filtered or unexported fields
}

func (*InfluxDBOutNode) Err

func (n *InfluxDBOutNode) Err() error

type JoinNode

type JoinNode struct {
	// contains filtered or unexported fields
}

func (*JoinNode) Err

func (n *JoinNode) Err() error

type LogService

type LogService interface {
	NewLogger(prefix string, flag int) *log.Logger
}

type MapFunc

type MapFunc func(in *tsdb.MapInput) interface{}

type MapInfo

type MapInfo struct {
	Field string
	Func  MapFunc
}

type MapNode

type MapNode struct {
	// contains filtered or unexported fields
}

func (*MapNode) Err

func (n *MapNode) Err() error

type MapResult

type MapResult struct {
	Name  string
	Group models.GroupID
	Dims  []string
	Tags  map[string]string
	TMax  time.Time
	Outs  []interface{}
}

type Node

type Node interface {
	pipeline.Node

	// wait for the node to finish processing and return any errors
	Err() error
	// contains filtered or unexported methods
}

A node that can be in an executor.

type Output

type Output interface {
	Endpoint() string
}

An output of a pipeline. Still need to improve this interface to expose different types of outputs.

type Query

type Query struct {
	// contains filtered or unexported fields
}

func NewQuery

func NewQuery(q string) (*Query, error)

func (*Query) DBRPs

func (q *Query) DBRPs() ([]DBRP, error)

Return the db rp pairs of the query

func (*Query) Dimensions

func (q *Query) Dimensions(dims []interface{}) error

Set the dimensions on the query

func (*Query) Fill

func (q *Query) Fill(option influxql.FillOption, value interface{})

func (*Query) Start

func (q *Query) Start(s time.Time)

Set the start time of the query

func (*Query) Stop

func (q *Query) Stop(s time.Time)

Set the stop time of the query

func (*Query) String

func (q *Query) String() string

type ReduceFunc

type ReduceFunc func(in []interface{}, tmax time.Time, useTMax bool, as string) interface{}

type ReduceNode

type ReduceNode struct {
	// contains filtered or unexported fields
}

func (*ReduceNode) Err

func (n *ReduceNode) Err() error

type Replay

type Replay struct {
	Setter clock.Setter
	// contains filtered or unexported fields
}

Replay engine that can replay static data sets against a specific executor and its tasks.

func NewReplay

func NewReplay(c clock.Clock) *Replay

Create a new replay engine.

func (*Replay) ReplayBatch

func (r *Replay) ReplayBatch(data []io.ReadCloser, batches []BatchCollector, recTime bool) <-chan error

Replay a data set against an executor. If source time is true then the replay will use the times stored in the recording instead of the clock time.

func (*Replay) ReplayStream

func (r *Replay) ReplayStream(data io.ReadCloser, stream StreamCollector, recTime bool, precision string) <-chan error

Replay a data set against an executor.

type Result

type Result influxql.Result

The result from an output.

func ResultFromJSON

func ResultFromJSON(in io.Reader) (r Result)

Unmarshal a Result object from JSON.

type SampleNode

type SampleNode struct {
	// contains filtered or unexported fields
}

func (*SampleNode) Err

func (n *SampleNode) Err() error

type SourceBatchNode

type SourceBatchNode struct {
	// contains filtered or unexported fields
}

func (*SourceBatchNode) Count

func (s *SourceBatchNode) Count() int

func (*SourceBatchNode) DBRPs

func (s *SourceBatchNode) DBRPs() ([]DBRP, error)

Return list of databases and retention policies the batcher will query.

func (*SourceBatchNode) Err

func (s *SourceBatchNode) Err() error

func (*SourceBatchNode) Queries

func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string

func (*SourceBatchNode) Start

func (s *SourceBatchNode) Start(collectors []*Edge)

type StreamCollector

type StreamCollector interface {
	CollectPoint(models.Point) error
	Close()
}

type StreamNode

type StreamNode struct {
	// contains filtered or unexported fields
}

func (*StreamNode) Err

func (n *StreamNode) Err() error

type Task

type Task struct {
	Name     string
	Pipeline *pipeline.Pipeline
	Type     TaskType
	DBRPs    []DBRP
}

The complete definition of a task, its name, pipeline and type.

func NewBatcher

func NewBatcher(name, script string, dbrps []DBRP) (*Task, error)

Create a new batcher task from a script.

func NewStreamer

func NewStreamer(name, script string, dbrps []DBRP) (*Task, error)

Create a new streamer task from a script.

func NewTask

func NewTask(name, script string, tt TaskType, dbrps []DBRP) (*Task, error)

func (*Task) Dot

func (t *Task) Dot() []byte

type TaskMaster

type TaskMaster struct {
	Stream       StreamCollector
	HTTPDService interface {
		AddRoutes([]httpd.Route) error
		DelRoutes([]httpd.Route)
		URL() string
	}
	InfluxDBService interface {
		NewClient() (*client.Client, error)
	}
	SMTPService interface {
		Global() bool
		SendMail(to []string, subject string, msg string)
	}
	VictorOpsService interface {
		Global() bool
		Alert(routingKey, messageType, message, entityID string, t time.Time, extra interface{}) error
	}
	PagerDutyService interface {
		Global() bool
		Alert(incidentKey, desc string, details interface{}) error
	}
	SlackService interface {
		Global() bool
		Alert(channel, message string, level AlertLevel) error
	}
	LogService LogService
	// contains filtered or unexported fields
}

An execution framework for a set of tasks.

func NewTaskMaster

func NewTaskMaster(l LogService) *TaskMaster

Create a new Executor with a given clock.

func (*TaskMaster) BatchCollectors

func (tm *TaskMaster) BatchCollectors(name string) []BatchCollector

func (*TaskMaster) Close

func (tm *TaskMaster) Close() error

func (*TaskMaster) DelFork

func (tm *TaskMaster) DelFork(name string)

func (*TaskMaster) New

func (tm *TaskMaster) New() *TaskMaster

Returns a new TaskMaster instance with the same services as the current one.

func (*TaskMaster) NewFork

func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP) *Edge

func (*TaskMaster) Open

func (tm *TaskMaster) Open() error

func (*TaskMaster) StartTask

func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error)

func (*TaskMaster) StopTask

func (tm *TaskMaster) StopTask(name string)

type TaskType

type TaskType int

The type of a task

const (
	StreamTask TaskType = iota
	BatchTask
)

func (TaskType) String

func (t TaskType) String() string

type UnionNode

type UnionNode struct {
	// contains filtered or unexported fields
}

func (*UnionNode) Err

func (n *UnionNode) Err() error

type WhereNode

type WhereNode struct {
	// contains filtered or unexported fields
}

func (*WhereNode) Err

func (n *WhereNode) Err() error

type WindowNode

type WindowNode struct {
	// contains filtered or unexported fields
}

func (*WindowNode) Err

func (n *WindowNode) Err() error

Directories

Path Synopsis
A clock that provides blocking calls that wait until absolute times have occurred.
A clock that provides blocking calls that wait until absolute times have occurred.
cmd
Contains integration and end-to-end tests
Contains integration and end-to-end tests
Provides a set of structures for passing data around Kapacitor.
Provides a set of structures for passing data around Kapacitor.
Provides an API for constructing data processing pipelines.
Provides an API for constructing data processing pipelines.
services
httpd
Provides an HTTP API exposing many components of Kapacitor.
Provides an HTTP API exposing many components of Kapacitor.
reporting
Sends anonymous reports to InfluxData
Sends anonymous reports to InfluxData
stats
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy.
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy.
udp
TICKscript is a simple invocation chaining DSL.
TICKscript is a simple invocation chaining DSL.
cmd/tickdoc
Tickdoc is a simple utility similiar to godoc that generates documentation from comments.
Tickdoc is a simple utility similiar to godoc that generates documentation from comments.
Provides an io.Writer that filters log messages based on a log level.
Provides an io.Writer that filters log messages based on a log level.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL