Conduit Connector for Adobe Marketo
Marketo source connector for Conduit which pulls and syncs the Leads(People)
object from Marketo Engage.
Configuration
The config passed to Configure
can contain the following fields.
name |
part of |
description |
required |
default value |
example |
clientID |
source |
The Client ID for Marketo Instance |
true |
NONE |
1de3017c-fe42-4f20-8013-798678c956a9 |
clientSecret |
source |
The Client Secret for Marketo Instance |
true |
NONE |
ZZZv0Mev29vNm5vIyMwTa43lioVoBT7N |
clientEndpoint |
source |
The Endpoint for Marketo Instance |
true |
NONE |
https://<instance>.mktorest.com |
pollingPeriod |
source |
Polling time for CDC mode. Less than 10s is not recommended |
false |
1m |
10s , 1m , 5m , 10m , 30m , 1h |
snapshotInitialDate |
source |
The date from which the snapshot iterator initially starts getting records. |
false |
Creation date of the oldest record. |
2006-01-02T15:04:05Z07:00 |
fields |
source |
comma seperated fields to fetch from Marketo Leads |
false |
id, createdAt, updatedAt, firstName, lastName, email |
company, jobTitle, phone, personSource etc... |
Note: By default id, createdAt, updatedAt
is prepended to fields
config. So no need to add that explictly. For eg: if you want to request email, company, phone
fields, then it will be requested as id, createdAt, updatedAt, email, company, phone
Source
Marketo source connector connects to Marketo instance through the REST API with provided configuration, using clientID
and clientSecret
. Once connector is started Configure
method is called to parse configurations and validate them. After that Open
method is called to establish connection to Marketo instance with provided position. Once connection is established Read
method is called which calls current iterator's Next
method to fetch next record. Teardown
is called when connector is stopped.
Snapshot Iterator
Snapshot iterator is used first to extract bulk data from Marketo instance. Bulk Lead Extract API is used with createdAt
filter which permits datetime ranges up to 31 days, so we will need to run multiple jobs and combine the results. In order to get started we need to find the oldest lead created in the instance. To know the date querry all folders with maxdepth of 1 which will give us a list of all the top-level folders in the instance. Then collecting createdAt
dates, parse them, and find the oldest date. This method works because some default, top-level folders are created with the instance and no leads could be created before then. fields
from config also requested along with createdAt
filter. To find the available fields for your target instance using the Describe Lead 2 endpoint which return an exhaustive list including both standard and custom fields.
Exporting Job involves 4 APIS
- Create a Job ->
/bulk/v1/leads/export/create.json
- Enqueue a Job ->
/bulk/v1/leads/export/{{exportID}}/enqueue.json
- Wait for Job to Complete ->
/bulk/v1/leads/export/{{exportID}}/status.json
- Get Your Leads ->
/bulk/v1/leads/export/{{exportID}}/file.json
After each cycle, obtained records will be flushed to conduit. Once all cycles(export jobs) are completed, connector switches to CDC mode.
Change Data Capture Iterator
Once Snapshot iterator is completed, connector automatically switches to CDC iterator. CDC events are captured using two REST endpoints, Get Lead Changes, Get Lead by Id. In CDC we are intrested in New Lead (12)
and Change Data Value (13)
events. Hence once done with Get Lead Changes api, we filter for these activityTypeId
12 and 13. Once we have list of changed leads ID's, we'll query each leads with Get Lead by Id API to get the changed data for leads. Deleted Leads API is used in order to capture the delete events. Output record will have a metadata of "action":"delete" to handle deletions by Conduit destination connector. No metadata is added for other CDC events such as New leads and Update leads.
From config pollingPeriod
will be used to poll CDC events.
Position Handling
Name |
type |
desc |
Key |
string |
unique id for the record |
CreatedAt |
time.Time |
UTC time |
UpdatedAt |
time.Time |
UTC time |
Type |
IteratorType(int) |
0=snapshot(default), 1=CDC |
To build
Run make build
to build the connector.
Testing
Run make test
to run all the unit tests. Run make test-integration
to run the integration tests.
The Docker compose file at test/docker-compose.yml
can be used to run the required resource locally.
Known Issues & Limitations
- In snapshot mode, the total amount of data that you can export from Marketo is limited to 500MB per day unless you have purchased a higher data limit. This 500MB limit resets daily at 12:00AM CST. Once the limit is hit pipeline stops with error. In order to pull rest of the records you need to run the pipeline again next day.
- Concurrency Limit: Maximum of 10 concurrent API calls.
- Rate Limit: API access per instance limited to 100 calls per 20 seconds.
- Daily Quota: Subscriptions are allocated 50,000 API calls per day (which resets daily at 12:00AM CST). You can increase your daily quota through your account manager.
- The connector is able to send record's
Key
as sdk.RawData
only.