Skip to content

HTTP/REST API Examples

Comprehensive code examples for using HeliosDB's HTTP/REST API.

Authentication

API Key Authentication

# Using Authorization header
curl -X GET https://localhost:443/api/v1/tables \
  -H "Authorization: Bearer YOUR_API_KEY"

# Using X-API-Key header
curl -X GET https://localhost:443/api/v1/tables \
  -H "X-API-Key: YOUR_API_KEY"

JWT Authentication

import requests
import jwt
from datetime import datetime, timedelta

# Generate JWT token
def generate_token(secret, user_id):
    payload = {
        "sub": user_id,
        "iat": datetime.utcnow(),
        "exp": datetime.utcnow() + timedelta(hours=1),
        "iss": "heliosdb"
    }
    return jwt.encode(payload, secret, algorithm="HS256")

token = generate_token("your-secret", "user123")

# Use token
response = requests.get(
    "https://localhost:443/api/v1/tables",
    headers={"Authorization": f"Bearer {token}"}
)

Query Operations

Execute SQL Query

import requests

def execute_query(sql: str, params: dict = None):
    response = requests.post(
        "https://localhost:443/api/v1/query",
        headers={
            "Authorization": "Bearer YOUR_API_KEY",
            "Content-Type": "application/json"
        },
        json={
            "sql": sql,
            "params": params or {}
        }
    )
    return response.json()

# Simple query
result = execute_query("SELECT * FROM users WHERE status = 'active' LIMIT 10")
print(result["data"])

# Parameterized query
result = execute_query(
    "SELECT * FROM orders WHERE user_id = :user_id AND created_at > :date",
    {"user_id": 123, "date": "2024-01-01"}
)

Async Query Execution

import requests
import time

def execute_async_query(sql: str):
    # Start async query
    response = requests.post(
        "https://localhost:443/api/v1/query/async",
        headers={
            "Authorization": "Bearer YOUR_API_KEY",
            "Content-Type": "application/json"
        },
        json={"sql": sql}
    )
    query_id = response.json()["query_id"]

    # Poll for results
    while True:
        status_response = requests.get(
            f"https://localhost:443/api/v1/query/{query_id}",
            headers={"Authorization": "Bearer YOUR_API_KEY"}
        )
        result = status_response.json()

        if result["status"] == "completed":
            return result["data"]
        elif result["status"] == "failed":
            raise Exception(result["error"])

        time.sleep(1)

# Long-running query
result = execute_async_query("""
    SELECT customer_id, SUM(amount) as total
    FROM orders
    WHERE created_at > '2023-01-01'
    GROUP BY customer_id
    ORDER BY total DESC
""")

Streaming Results

import requests

def stream_query(sql: str):
    response = requests.post(
        "https://localhost:443/api/v1/query/stream",
        headers={
            "Authorization": "Bearer YOUR_API_KEY",
            "Content-Type": "application/json",
            "Accept": "application/x-ndjson"
        },
        json={"sql": sql},
        stream=True
    )

    for line in response.iter_lines():
        if line:
            row = json.loads(line)
            yield row

# Process large result set
for row in stream_query("SELECT * FROM large_table"):
    process_row(row)

CRUD Operations

Create (Insert)

// JavaScript/Node.js example
const axios = require('axios');

async function insertRows(tableName, rows) {
  const response = await axios.post(
    `https://localhost:443/api/v1/tables/${tableName}/rows`,
    { rows },
    {
      headers: {
        'Authorization': 'Bearer YOUR_API_KEY',
        'Content-Type': 'application/json'
      }
    }
  );
  return response.data;
}

// Single insert
await insertRows('users', [{
  name: 'Alice',
  email: 'alice@example.com',
  created_at: new Date().toISOString()
}]);

// Bulk insert
await insertRows('users', [
  { name: 'Bob', email: 'bob@example.com' },
  { name: 'Charlie', email: 'charlie@example.com' },
  { name: 'Diana', email: 'diana@example.com' }
]);

Read (Query)

import requests

def get_rows(table_name: str, filters: dict = None, limit: int = 100):
    params = {"limit": limit}
    if filters:
        params["filter"] = " AND ".join(
            f"{k}='{v}'" for k, v in filters.items()
        )

    response = requests.get(
        f"https://localhost:443/api/v1/tables/{table_name}/rows",
        headers={"Authorization": "Bearer YOUR_API_KEY"},
        params=params
    )
    return response.json()

# Get all active users
users = get_rows("users", filters={"status": "active"}, limit=50)

Update

import requests

def update_rows(table_name: str, filter_expr: str, updates: dict):
    response = requests.put(
        f"https://localhost:443/api/v1/tables/{table_name}/rows",
        headers={
            "Authorization": "Bearer YOUR_API_KEY",
            "Content-Type": "application/json"
        },
        json={
            "filter": filter_expr,
            "updates": updates
        }
    )
    return response.json()

# Update user status
result = update_rows(
    "users",
    "id = 123",
    {"status": "inactive", "updated_at": "2024-01-15T10:00:00Z"}
)
print(f"Updated {result['affected_rows']} rows")

Delete

import requests

def delete_rows(table_name: str, filter_expr: str):
    response = requests.delete(
        f"https://localhost:443/api/v1/tables/{table_name}/rows",
        headers={
            "Authorization": "Bearer YOUR_API_KEY",
            "Content-Type": "application/json"
        },
        json={"filter": filter_expr}
    )
    return response.json()

# Delete old sessions
result = delete_rows("sessions", "expires_at < '2024-01-01'")
print(f"Deleted {result['affected_rows']} rows")

Vector Operations

import requests
import numpy as np

def vector_search(vector: list, top_k: int = 10, filter_expr: str = None):
    payload = {
        "vector": vector,
        "top_k": top_k,
        "include_metadata": True
    }
    if filter_expr:
        payload["filter"] = filter_expr

    response = requests.post(
        "https://localhost:443/api/v1/vectors/search",
        headers={
            "Authorization": "Bearer YOUR_API_KEY",
            "Content-Type": "application/json"
        },
        json=payload
    )
    return response.json()

# Search for similar documents
query_embedding = [0.1, 0.2, 0.3, ...]  # Your embedding
results = vector_search(query_embedding, top_k=5)

for match in results["matches"]:
    print(f"ID: {match['id']}, Score: {match['score']}")

Upsert Vectors

import requests

def upsert_vectors(vectors: list):
    response = requests.post(
        "https://localhost:443/api/v1/vectors/upsert",
        headers={
            "Authorization": "Bearer YOUR_API_KEY",
            "Content-Type": "application/json"
        },
        json={"vectors": vectors}
    )
    return response.json()

# Upsert with metadata
vectors = [
    {
        "id": "doc1",
        "values": [0.1, 0.2, 0.3, ...],
        "metadata": {"title": "Document 1", "category": "tech"}
    },
    {
        "id": "doc2",
        "values": [0.4, 0.5, 0.6, ...],
        "metadata": {"title": "Document 2", "category": "science"}
    }
]
result = upsert_vectors(vectors)
print(f"Upserted {result['upserted_count']} vectors")

Schema Management

Create Table

import requests

def create_table(name: str, schema: dict):
    response = requests.post(
        "https://localhost:443/api/v1/tables",
        headers={
            "Authorization": "Bearer YOUR_API_KEY",
            "Content-Type": "application/json"
        },
        json={
            "name": name,
            "columns": schema["columns"],
            "primary_key": schema.get("primary_key"),
            "indexes": schema.get("indexes", [])
        }
    )
    return response.json()

# Create users table
create_table("users", {
    "columns": [
        {"name": "id", "type": "BIGINT", "nullable": False},
        {"name": "name", "type": "VARCHAR(255)", "nullable": False},
        {"name": "email", "type": "VARCHAR(255)", "nullable": False},
        {"name": "created_at", "type": "TIMESTAMP", "default": "NOW()"}
    ],
    "primary_key": ["id"],
    "indexes": [
        {"name": "idx_email", "columns": ["email"], "unique": True}
    ]
})

Create Index

import requests

def create_index(table_name: str, index_name: str, columns: list, index_type: str = "btree"):
    response = requests.post(
        "https://localhost:443/api/v1/indexes",
        headers={
            "Authorization": "Bearer YOUR_API_KEY",
            "Content-Type": "application/json"
        },
        json={
            "table": table_name,
            "name": index_name,
            "columns": columns,
            "type": index_type
        }
    )
    return response.json()

# Create vector index
create_index("documents", "idx_embedding", ["embedding"], "hnsw")

Error Handling

import requests

def safe_query(sql: str):
    try:
        response = requests.post(
            "https://localhost:443/api/v1/query",
            headers={
                "Authorization": "Bearer YOUR_API_KEY",
                "Content-Type": "application/json"
            },
            json={"sql": sql},
            timeout=30
        )
        response.raise_for_status()
        return response.json()

    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 400:
            error = e.response.json()["error"]
            print(f"Query error: {error['message']}")
        elif e.response.status_code == 429:
            retry_after = e.response.headers.get("Retry-After", 60)
            print(f"Rate limited. Retry after {retry_after} seconds")
        elif e.response.status_code == 401:
            print("Authentication failed")
        else:
            print(f"HTTP error: {e}")
        return None

    except requests.exceptions.Timeout:
        print("Request timed out")
        return None

    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")
        return None

Pagination

import requests

def paginate_query(sql: str, page_size: int = 100):
    page = 1
    while True:
        response = requests.post(
            "https://localhost:443/api/v1/query",
            headers={
                "Authorization": "Bearer YOUR_API_KEY",
                "Content-Type": "application/json"
            },
            json={
                "sql": sql,
                "limit": page_size,
                "offset": (page - 1) * page_size
            }
        )
        result = response.json()

        if not result["data"]:
            break

        for row in result["data"]:
            yield row

        if len(result["data"]) < page_size:
            break

        page += 1

# Iterate through all orders
for order in paginate_query("SELECT * FROM orders ORDER BY id"):
    process_order(order)

Last Updated: January 2026