Export Events to CSV

Paginate and export visitor data to CSV with multi-day chunking

Use Case

  • Export resolved visitor data to CSV for CRM import, reporting, or offline analysis
  • Handle multi-day exports by chunking into 24-hour windows
  • Paginate through large result sets

Prerequisites


Steps

1. Estimate the Export Size

Before fetching rows, check how many you'll get:

END_MS=$(python3 -c "import time; print(int(time.time()*1000))")
START_MS=$(python3 -c "import time; print(int((time.time()-86400)*1000))")

curl "https://apiv3.delivr.ai/api/v1/event_counts?pixel_id=YOUR_PIXEL_ID&start_ms=$START_MS&end_ms=$END_MS&filter=resolved%3Aeq%3Atrue" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN" \
  -H "x-organization-id: YOUR_ORGANIZATION_ID"

Response:

{
  "count": 153,
  "meta": {
    "files_scanned": 29,
    "buckets_scanned": 1,
    "took_ms": 58
  }
}

2. Choose Your Fields

Use select to return only the fields you need. This reduces response size and makes the CSV cleaner.

select=first_name,last_name,email,company_name,company_domain,job_title,event_data,timestamp

Fields that are null for a given row are omitted from the response. See the Events API schema endpoint for all available fields.

3. Export with Pagination

The API returns a maximum of limit rows per request. Use offset to paginate.

import csv
import time
import requests

TOKEN = "your_jwt_token"
ORG_ID = "your_organization_id"
PIXEL_ID = "your_pixel_id"

HEADERS = {
    "Authorization": f"Bearer {TOKEN}",
    "x-organization-id": ORG_ID,
}

FIELDS = [
    "first_name", "last_name", "email", "current_business_email",
    "company_name", "company_domain", "job_title", "department",
    "seniority_level", "company_industry", "company_employee_count_range",
    "linkedin_url", "event_data", "timestamp",
]

# Last 24 hours
end_ms = int(time.time() * 1000)
start_ms = end_ms - (24 * 60 * 60 * 1000)

all_rows = []
offset = 0
limit = 100

while True:
    resp = requests.get(
        "https://apiv3.delivr.ai/api/v1/events",
        headers=HEADERS,
        params={
            "pixel_id": PIXEL_ID,
            "start_ms": start_ms,
            "end_ms": end_ms,
            "limit": limit,
            "offset": offset,
            "filter": "resolved:eq:true",
            "select": ",".join(FIELDS),
            "orderby": "timestamp:desc",
        },
    )
    data = resp.json()
    rows = data.get("rows", [])
    all_rows.extend(rows)

    if len(rows) < limit:
        break  # No more rows
    offset += limit

# Write CSV
with open("visitors.csv", "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=FIELDS, extrasaction="ignore")
    writer.writeheader()
    for row in all_rows:
        writer.writerow(row)

print(f"Exported {len(all_rows)} rows to visitors.csv")

Multi-Day Export

The Events API has a 25-hour maximum time window per request. For longer ranges, loop through 24-hour chunks.

import csv
import time
import datetime
import requests

TOKEN = "your_jwt_token"
ORG_ID = "your_organization_id"
PIXEL_ID = "your_pixel_id"

HEADERS = {
    "Authorization": f"Bearer {TOKEN}",
    "x-organization-id": ORG_ID,
}

FIELDS = [
    "first_name", "last_name", "email", "current_business_email",
    "company_name", "company_domain", "job_title", "timestamp",
]

ONE_DAY_MS = 24 * 60 * 60 * 1000
DAYS_BACK = 7

now_ms = int(time.time() * 1000)
all_rows = []

for day in range(DAYS_BACK):
    end_ms = now_ms - (day * ONE_DAY_MS)
    start_ms = end_ms - ONE_DAY_MS
    offset = 0

    while True:
        resp = requests.get(
            "https://apiv3.delivr.ai/api/v1/events",
            headers=HEADERS,
            params={
                "pixel_id": PIXEL_ID,
                "start_ms": start_ms,
                "end_ms": end_ms,
                "limit": 100,
                "offset": offset,
                "filter": "resolved:eq:true",
                "select": ",".join(FIELDS),
            },
        )
        rows = resp.json().get("rows", [])
        all_rows.extend(rows)

        if len(rows) < 100:
            break
        offset += 100

    date_str = datetime.datetime.fromtimestamp(
        start_ms / 1000, tz=datetime.timezone.utc
    ).strftime("%Y-%m-%d")
    print(f"  {date_str}: fetched to offset {offset + len(rows)}")

with open("visitors_7day.csv", "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=FIELDS, extrasaction="ignore")
    writer.writeheader()
    for row in all_rows:
        writer.writerow(row)

print(f"Exported {len(all_rows)} rows to visitors_7day.csv")

Variations

Deduplicated Export (One Row Per Person)

Add distinct=hem to get one row per unique visitor instead of one row per page view:

params={
    "pixel_id": PIXEL_ID,
    "start_ms": start_ms,
    "end_ms": end_ms,
    "limit": 100,
    "offset": offset,
    "filter": "resolved:eq:true",
    "distinct": "hem",
    "has_valuable_data": "true",
    "select": ",".join(FIELDS),
}

Deduplicated by Company

One row per unique company:

"distinct": "company_domain",
"has_valuable_data": "true",

Sort Order

Use orderby to control row order:

orderby=timestamp:desc    # Newest first
orderby=timestamp:asc     # Oldest first

Notes

  • The select parameter only returns fields that have values. If a field is null for a row, it's omitted from the response. Your CSV writer should handle missing keys gracefully (the extrasaction="ignore" and DictWriter handle this).
  • offset pagination with no overlap is confirmed -- each page returns distinct rows.
  • For very large exports (10,000+ rows per day), consider adding a short delay between requests to avoid rate limits.
  • The has_valuable_data parameter filters to rows with either a displayable email or a name + company.

Next Steps