HTTP/REST API Examples¶
Comprehensive code examples for using HeliosDB's HTTP/REST API.
Authentication¶
API Key Authentication¶
# Using Authorization header
curl -X GET https://localhost:443/api/v1/tables \
-H "Authorization: Bearer YOUR_API_KEY"
# Using X-API-Key header
curl -X GET https://localhost:443/api/v1/tables \
-H "X-API-Key: YOUR_API_KEY"
JWT Authentication¶
import requests
import jwt
from datetime import datetime, timedelta
# Generate JWT token
def generate_token(secret, user_id):
payload = {
"sub": user_id,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(hours=1),
"iss": "heliosdb"
}
return jwt.encode(payload, secret, algorithm="HS256")
token = generate_token("your-secret", "user123")
# Use token
response = requests.get(
"https://localhost:443/api/v1/tables",
headers={"Authorization": f"Bearer {token}"}
)
Query Operations¶
Execute SQL Query¶
import requests
def execute_query(sql: str, params: dict = None):
response = requests.post(
"https://localhost:443/api/v1/query",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"sql": sql,
"params": params or {}
}
)
return response.json()
# Simple query
result = execute_query("SELECT * FROM users WHERE status = 'active' LIMIT 10")
print(result["data"])
# Parameterized query
result = execute_query(
"SELECT * FROM orders WHERE user_id = :user_id AND created_at > :date",
{"user_id": 123, "date": "2024-01-01"}
)
Async Query Execution¶
import requests
import time
def execute_async_query(sql: str):
# Start async query
response = requests.post(
"https://localhost:443/api/v1/query/async",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={"sql": sql}
)
query_id = response.json()["query_id"]
# Poll for results
while True:
status_response = requests.get(
f"https://localhost:443/api/v1/query/{query_id}",
headers={"Authorization": "Bearer YOUR_API_KEY"}
)
result = status_response.json()
if result["status"] == "completed":
return result["data"]
elif result["status"] == "failed":
raise Exception(result["error"])
time.sleep(1)
# Long-running query
result = execute_async_query("""
SELECT customer_id, SUM(amount) as total
FROM orders
WHERE created_at > '2023-01-01'
GROUP BY customer_id
ORDER BY total DESC
""")
Streaming Results¶
import requests
def stream_query(sql: str):
response = requests.post(
"https://localhost:443/api/v1/query/stream",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json",
"Accept": "application/x-ndjson"
},
json={"sql": sql},
stream=True
)
for line in response.iter_lines():
if line:
row = json.loads(line)
yield row
# Process large result set
for row in stream_query("SELECT * FROM large_table"):
process_row(row)
CRUD Operations¶
Create (Insert)¶
// JavaScript/Node.js example
const axios = require('axios');
async function insertRows(tableName, rows) {
const response = await axios.post(
`https://localhost:443/api/v1/tables/${tableName}/rows`,
{ rows },
{
headers: {
'Authorization': 'Bearer YOUR_API_KEY',
'Content-Type': 'application/json'
}
}
);
return response.data;
}
// Single insert
await insertRows('users', [{
name: 'Alice',
email: 'alice@example.com',
created_at: new Date().toISOString()
}]);
// Bulk insert
await insertRows('users', [
{ name: 'Bob', email: 'bob@example.com' },
{ name: 'Charlie', email: 'charlie@example.com' },
{ name: 'Diana', email: 'diana@example.com' }
]);
Read (Query)¶
import requests
def get_rows(table_name: str, filters: dict = None, limit: int = 100):
params = {"limit": limit}
if filters:
params["filter"] = " AND ".join(
f"{k}='{v}'" for k, v in filters.items()
)
response = requests.get(
f"https://localhost:443/api/v1/tables/{table_name}/rows",
headers={"Authorization": "Bearer YOUR_API_KEY"},
params=params
)
return response.json()
# Get all active users
users = get_rows("users", filters={"status": "active"}, limit=50)
Update¶
import requests
def update_rows(table_name: str, filter_expr: str, updates: dict):
response = requests.put(
f"https://localhost:443/api/v1/tables/{table_name}/rows",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"filter": filter_expr,
"updates": updates
}
)
return response.json()
# Update user status
result = update_rows(
"users",
"id = 123",
{"status": "inactive", "updated_at": "2024-01-15T10:00:00Z"}
)
print(f"Updated {result['affected_rows']} rows")
Delete¶
import requests
def delete_rows(table_name: str, filter_expr: str):
response = requests.delete(
f"https://localhost:443/api/v1/tables/{table_name}/rows",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={"filter": filter_expr}
)
return response.json()
# Delete old sessions
result = delete_rows("sessions", "expires_at < '2024-01-01'")
print(f"Deleted {result['affected_rows']} rows")
Vector Operations¶
Vector Search¶
import requests
import numpy as np
def vector_search(vector: list, top_k: int = 10, filter_expr: str = None):
payload = {
"vector": vector,
"top_k": top_k,
"include_metadata": True
}
if filter_expr:
payload["filter"] = filter_expr
response = requests.post(
"https://localhost:443/api/v1/vectors/search",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json=payload
)
return response.json()
# Search for similar documents
query_embedding = [0.1, 0.2, 0.3, ...] # Your embedding
results = vector_search(query_embedding, top_k=5)
for match in results["matches"]:
print(f"ID: {match['id']}, Score: {match['score']}")
Upsert Vectors¶
import requests
def upsert_vectors(vectors: list):
response = requests.post(
"https://localhost:443/api/v1/vectors/upsert",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={"vectors": vectors}
)
return response.json()
# Upsert with metadata
vectors = [
{
"id": "doc1",
"values": [0.1, 0.2, 0.3, ...],
"metadata": {"title": "Document 1", "category": "tech"}
},
{
"id": "doc2",
"values": [0.4, 0.5, 0.6, ...],
"metadata": {"title": "Document 2", "category": "science"}
}
]
result = upsert_vectors(vectors)
print(f"Upserted {result['upserted_count']} vectors")
Schema Management¶
Create Table¶
import requests
def create_table(name: str, schema: dict):
response = requests.post(
"https://localhost:443/api/v1/tables",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"name": name,
"columns": schema["columns"],
"primary_key": schema.get("primary_key"),
"indexes": schema.get("indexes", [])
}
)
return response.json()
# Create users table
create_table("users", {
"columns": [
{"name": "id", "type": "BIGINT", "nullable": False},
{"name": "name", "type": "VARCHAR(255)", "nullable": False},
{"name": "email", "type": "VARCHAR(255)", "nullable": False},
{"name": "created_at", "type": "TIMESTAMP", "default": "NOW()"}
],
"primary_key": ["id"],
"indexes": [
{"name": "idx_email", "columns": ["email"], "unique": True}
]
})
Create Index¶
import requests
def create_index(table_name: str, index_name: str, columns: list, index_type: str = "btree"):
response = requests.post(
"https://localhost:443/api/v1/indexes",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"table": table_name,
"name": index_name,
"columns": columns,
"type": index_type
}
)
return response.json()
# Create vector index
create_index("documents", "idx_embedding", ["embedding"], "hnsw")
Error Handling¶
import requests
def safe_query(sql: str):
try:
response = requests.post(
"https://localhost:443/api/v1/query",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={"sql": sql},
timeout=30
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 400:
error = e.response.json()["error"]
print(f"Query error: {error['message']}")
elif e.response.status_code == 429:
retry_after = e.response.headers.get("Retry-After", 60)
print(f"Rate limited. Retry after {retry_after} seconds")
elif e.response.status_code == 401:
print("Authentication failed")
else:
print(f"HTTP error: {e}")
return None
except requests.exceptions.Timeout:
print("Request timed out")
return None
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return None
Pagination¶
import requests
def paginate_query(sql: str, page_size: int = 100):
page = 1
while True:
response = requests.post(
"https://localhost:443/api/v1/query",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"sql": sql,
"limit": page_size,
"offset": (page - 1) * page_size
}
)
result = response.json()
if not result["data"]:
break
for row in result["data"]:
yield row
if len(result["data"]) < page_size:
break
page += 1
# Iterate through all orders
for order in paginate_query("SELECT * FROM orders ORDER BY id"):
process_order(order)
Last Updated: January 2026