Snowflake Protocol Examples¶
Comprehensive code examples for using HeliosDB with Snowflake protocol compatibility.
Connection Examples¶
Python¶
import snowflake.connector
# Basic connection
conn = snowflake.connector.connect(
host="localhost",
port=443,
user="admin",
password="password",
account="heliosdb",
warehouse="COMPUTE_WH",
database="MYDB",
schema="PUBLIC"
)
# Execute query
cursor = conn.cursor()
cursor.execute("SELECT * FROM users LIMIT 10")
rows = cursor.fetchall()
for row in rows:
print(row)
# Using context manager
with snowflake.connector.connect(...) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM orders")
print(cursor.fetchone()[0])
JavaScript/Node.js¶
const snowflake = require('snowflake-sdk');
const connection = snowflake.createConnection({
host: 'localhost',
port: 443,
account: 'heliosdb',
username: 'admin',
password: 'password',
warehouse: 'COMPUTE_WH',
database: 'MYDB',
schema: 'PUBLIC'
});
// Connect and execute
connection.connect((err, conn) => {
if (err) {
console.error('Connection failed:', err);
return;
}
connection.execute({
sqlText: 'SELECT * FROM users LIMIT 10',
complete: (err, stmt, rows) => {
if (err) {
console.error('Query failed:', err);
return;
}
console.log('Results:', rows);
connection.destroy();
}
});
});
// Promise-based (with wrapper)
async function query(sql) {
return new Promise((resolve, reject) => {
connection.execute({
sqlText: sql,
complete: (err, stmt, rows) => {
if (err) reject(err);
else resolve(rows);
}
});
});
}
Time Travel Examples¶
Query Historical Data¶
-- Query data from a specific timestamp
SELECT * FROM orders
AT(TIMESTAMP => '2024-01-01 12:00:00')
WHERE status = 'completed';
-- Query data from 1 hour ago
SELECT * FROM inventory
AT(OFFSET => -3600);
-- Query data before a specific statement
SELECT * FROM products
BEFORE(STATEMENT => 'abc123-statement-id');
-- Track changes over time
SELECT *
FROM orders
CHANGES(INFORMATION => DEFAULT)
AT(TIMESTAMP => '2024-01-01')
END(TIMESTAMP => '2024-01-02');
Restore Dropped Data¶
-- Undrop a table
UNDROP TABLE deleted_orders;
-- Clone at historical point
CREATE TABLE orders_backup CLONE orders
AT(TIMESTAMP => '2024-01-01 00:00:00');
VARIANT Data Type¶
Working with Semi-Structured Data¶
-- Create table with VARIANT column
CREATE TABLE events (
id INT,
event_time TIMESTAMP,
data VARIANT
);
-- Insert JSON data
INSERT INTO events (id, event_time, data)
VALUES (
1,
CURRENT_TIMESTAMP(),
PARSE_JSON('{"user": "alice", "action": "login", "metadata": {"ip": "192.168.1.1"}}')
);
-- Query VARIANT data
SELECT
id,
data:user::STRING as user_name,
data:action::STRING as action,
data:metadata.ip::STRING as ip_address
FROM events;
-- Filter on VARIANT fields
SELECT *
FROM events
WHERE data:action::STRING = 'login'
AND data:metadata.ip::STRING LIKE '192.168.%';
Array Operations¶
-- Work with arrays in VARIANT
INSERT INTO events (id, data)
VALUES (
2,
PARSE_JSON('{"user": "bob", "tags": ["admin", "developer"]}')
);
-- Access array elements
SELECT
data:user::STRING as user,
data:tags[0]::STRING as first_tag,
ARRAY_SIZE(data:tags) as tag_count
FROM events;
-- Check array contains
SELECT *
FROM events
WHERE ARRAY_CONTAINS('admin'::VARIANT, data:tags);
Building Objects¶
-- Construct objects dynamically
SELECT
OBJECT_CONSTRUCT(
'id', id,
'name', name,
'email', email,
'metadata', OBJECT_CONSTRUCT(
'created', created_at,
'status', status
)
) as user_json
FROM users;
-- Construct arrays
SELECT
user_id,
ARRAY_AGG(OBJECT_CONSTRUCT('product', product_name, 'qty', quantity)) as items
FROM order_items
GROUP BY user_id;
FLATTEN Examples¶
Basic FLATTEN¶
-- Table with array data
CREATE TABLE orders (
id INT,
customer_id INT,
items VARIANT
);
INSERT INTO orders VALUES (
1,
100,
PARSE_JSON('[
{"product_id": 1, "name": "Widget", "price": 9.99, "qty": 2},
{"product_id": 2, "name": "Gadget", "price": 19.99, "qty": 1}
]')
);
-- Flatten array
SELECT
o.id as order_id,
o.customer_id,
f.value:product_id::INT as product_id,
f.value:name::STRING as product_name,
f.value:price::FLOAT as price,
f.value:qty::INT as quantity
FROM orders o,
LATERAL FLATTEN(input => o.items) f;
Recursive FLATTEN¶
-- Nested structure
CREATE TABLE configs (
id INT,
config VARIANT
);
INSERT INTO configs VALUES (
1,
PARSE_JSON('{
"app": {
"settings": {
"feature_a": true,
"feature_b": false
},
"limits": {
"max_users": 100
}
}
}')
);
-- Recursively flatten
SELECT
f.path as config_path,
f.key as config_key,
f.value as config_value
FROM configs c,
LATERAL FLATTEN(input => c.config, recursive => true) f
WHERE f.value IS NOT NULL
AND TYPEOF(f.value) != 'OBJECT';
Virtual Warehouse Management¶
-- Create a warehouse
CREATE WAREHOUSE analytics_wh WITH
WAREHOUSE_SIZE = 'LARGE'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE;
-- Resize warehouse
ALTER WAREHOUSE analytics_wh SET WAREHOUSE_SIZE = 'XLARGE';
-- Suspend/resume
ALTER WAREHOUSE analytics_wh SUSPEND;
ALTER WAREHOUSE analytics_wh RESUME;
-- Multi-cluster warehouse
CREATE WAREHOUSE scaling_wh WITH
WAREHOUSE_SIZE = 'MEDIUM'
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 4
SCALING_POLICY = 'STANDARD';
-- Switch warehouse for session
USE WAREHOUSE analytics_wh;
-- Check warehouse status
SHOW WAREHOUSES;
COPY INTO Examples¶
From Cloud Storage¶
-- From S3
COPY INTO my_table
FROM 's3://my-bucket/data/'
CREDENTIALS = (AWS_KEY_ID = 'xxx' AWS_SECRET_KEY = 'yyy')
FILE_FORMAT = (TYPE = 'PARQUET')
PATTERN = '.*\.parquet';
-- From Azure Blob
COPY INTO my_table
FROM 'azure://myaccount.blob.core.windows.net/container/path/'
CREDENTIALS = (AZURE_SAS_TOKEN = 'xxx')
FILE_FORMAT = (TYPE = 'JSON');
-- From GCS
COPY INTO my_table
FROM 'gcs://bucket/path/'
CREDENTIALS = (GCS_CREDENTIALS = 'xxx')
FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = ',' SKIP_HEADER = 1);
With Error Handling¶
-- Continue on error
COPY INTO my_table
FROM @my_stage/data/
FILE_FORMAT = (TYPE = 'JSON')
ON_ERROR = 'CONTINUE';
-- Skip file on error
COPY INTO my_table
FROM @my_stage/data/
FILE_FORMAT = (TYPE = 'CSV')
ON_ERROR = 'SKIP_FILE';
-- Abort on error (default)
COPY INTO my_table
FROM @my_stage/data/
FILE_FORMAT = (TYPE = 'PARQUET')
ON_ERROR = 'ABORT_STATEMENT';
Export Data¶
-- Export to stage
COPY INTO @my_stage/export/
FROM my_table
FILE_FORMAT = (TYPE = 'PARQUET')
HEADER = TRUE
OVERWRITE = TRUE;
-- Export with partitioning
COPY INTO @my_stage/export/
FROM (SELECT * FROM my_table WHERE date >= '2024-01-01')
PARTITION BY (date)
FILE_FORMAT = (TYPE = 'PARQUET');
Session Management¶
-- Set session parameters
ALTER SESSION SET TIMEZONE = 'UTC';
ALTER SESSION SET QUERY_TAG = 'analytics_pipeline';
ALTER SESSION SET DATE_OUTPUT_FORMAT = 'YYYY-MM-DD';
-- Use different contexts
USE DATABASE analytics;
USE SCHEMA reporting;
USE WAREHOUSE compute_wh;
-- Show current context
SELECT CURRENT_DATABASE(), CURRENT_SCHEMA(), CURRENT_WAREHOUSE();
Python Full Example¶
import snowflake.connector
from datetime import datetime, timedelta
# Connect
conn = snowflake.connector.connect(
host="localhost",
port=443,
user="admin",
password="password",
account="heliosdb"
)
cursor = conn.cursor()
# Set context
cursor.execute("USE WAREHOUSE compute_wh")
cursor.execute("USE DATABASE analytics")
cursor.execute("USE SCHEMA public")
# Time travel query
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute(f"""
SELECT COUNT(*)
FROM orders
AT(TIMESTAMP => '{yesterday}')
""")
print(f"Orders yesterday: {cursor.fetchone()[0]}")
# VARIANT query
cursor.execute("""
SELECT
data:user::STRING as user,
data:events::ARRAY as events
FROM user_activity
WHERE data:timestamp::TIMESTAMP > DATEADD(hour, -1, CURRENT_TIMESTAMP())
""")
for row in cursor.fetchall():
print(f"User: {row[0]}, Events: {row[1]}")
# FLATTEN
cursor.execute("""
SELECT
o.id,
f.value:name::STRING as product,
f.value:price::FLOAT as price
FROM orders o,
LATERAL FLATTEN(input => o.items) f
WHERE o.created_at > DATEADD(day, -7, CURRENT_TIMESTAMP())
""")
for row in cursor.fetchall():
print(f"Order {row[0]}: {row[1]} - ${row[2]}")
cursor.close()
conn.close()
Last Updated: January 2026