MongoDB Protocol Examples¶
Practical code examples and usage patterns for HeliosDB's MongoDB compatibility.
Connection Examples¶
Python (PyMongo)¶
from pymongo import MongoClient
# Basic connection
client = MongoClient("mongodb://localhost:27017/")
db = client.my_database
collection = db.my_collection
# With authentication
client = MongoClient(
"mongodb://user:password@localhost:27017/",
authSource="admin",
authMechanism="SCRAM-SHA-256"
)
# With TLS
client = MongoClient(
"mongodb://localhost:27017/",
tls=True,
tlsCAFile="/path/to/ca.pem"
)
Node.js¶
const { MongoClient } = require('mongodb');
async function connect() {
const client = new MongoClient('mongodb://localhost:27017/', {
auth: { username: 'user', password: 'password' },
authSource: 'admin',
maxPoolSize: 50
});
await client.connect();
const db = client.db('my_database');
return db;
}
Java¶
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
MongoClient client = MongoClients.create(
"mongodb://user:password@localhost:27017/?authSource=admin"
);
MongoDatabase database = client.getDatabase("my_database");
CRUD Operations¶
Insert Documents¶
# Insert one
result = collection.insert_one({
"name": "Alice",
"email": "alice@example.com",
"age": 30,
"created_at": datetime.now()
})
print(f"Inserted: {result.inserted_id}")
# Insert many
docs = [
{"name": "Bob", "age": 25},
{"name": "Charlie", "age": 35},
{"name": "Diana", "age": 28}
]
result = collection.insert_many(docs)
print(f"Inserted {len(result.inserted_ids)} documents")
Find Documents¶
# Find one
user = collection.find_one({"name": "Alice"})
# Find many with filter
adults = collection.find({"age": {"$gte": 18}})
for user in adults:
print(user)
# With projection
users = collection.find(
{"age": {"$gte": 21}},
{"name": 1, "email": 1, "_id": 0}
)
# With sorting and pagination
results = collection.find({"status": "active"}) \
.sort("created_at", -1) \
.skip(10) \
.limit(5)
Update Documents¶
# Update one
collection.update_one(
{"_id": ObjectId("...")},
{"$set": {"status": "active"}}
)
# Update many
collection.update_many(
{"category": "electronics"},
{"$inc": {"view_count": 1}}
)
# Upsert
collection.update_one(
{"email": "new@example.com"},
{"$set": {"name": "New User"}},
upsert=True
)
# Array operations
collection.update_one(
{"_id": user_id},
{"$push": {"tags": "premium"}}
)
collection.update_one(
{"_id": user_id},
{"$pull": {"notifications": {"read": True}}}
)
Delete Documents¶
# Delete one
collection.delete_one({"_id": ObjectId("...")})
# Delete many
result = collection.delete_many({"status": "archived"})
print(f"Deleted {result.deleted_count} documents")
Query Operators¶
Comparison Operators¶
# Greater than
collection.find({"age": {"$gt": 21}})
# Between (range)
collection.find({"age": {"$gte": 18, "$lte": 65}})
# In list
collection.find({"status": {"$in": ["active", "pending"]}})
# Not equal
collection.find({"role": {"$ne": "admin"}})
Logical Operators¶
# AND (implicit)
collection.find({"age": {"$gte": 18}, "status": "active"})
# OR
collection.find({
"$or": [
{"role": "admin"},
{"permissions": {"$in": ["manage_users"]}}
]
})
# AND with OR
collection.find({
"status": "active",
"$or": [
{"age": {"$lt": 18}},
{"guardian": {"$exists": True}}
]
})
Array Operators¶
# Contains all elements
collection.find({"tags": {"$all": ["mongodb", "database"]}})
# Array element matches
collection.find({
"items": {
"$elemMatch": {
"price": {"$gt": 100},
"quantity": {"$gte": 5}
}
}
})
# Array size
collection.find({"tags": {"$size": 3}})
Text Search¶
# Create text index
collection.create_index([("content", "text"), ("title", "text")])
# Text search
collection.find({"$text": {"$search": "mongodb database"}})
# With score
collection.find(
{"$text": {"$search": "mongodb"}},
{"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})])
Aggregation Pipeline¶
Basic Aggregation¶
# Group and count
pipeline = [
{"$match": {"status": "completed"}},
{"$group": {
"_id": "$category",
"total": {"$sum": "$amount"},
"count": {"$sum": 1},
"average": {"$avg": "$amount"}
}},
{"$sort": {"total": -1}},
{"$limit": 10}
]
results = collection.aggregate(pipeline)
Lookup (Join)¶
# Join orders with customers
pipeline = [
{"$lookup": {
"from": "customers",
"localField": "customer_id",
"foreignField": "_id",
"as": "customer"
}},
{"$unwind": "$customer"},
{"$project": {
"order_id": 1,
"total": 1,
"customer_name": "$customer.name",
"customer_email": "$customer.email"
}}
]
Advanced Lookup with Pipeline¶
pipeline = [
{"$lookup": {
"from": "products",
"let": {"order_items": "$items"},
"pipeline": [
{"$match": {
"$expr": {"$in": ["$_id", "$$order_items"]}
}},
{"$project": {"name": 1, "price": 1}}
],
"as": "product_details"
}}
]
Faceted Aggregation¶
pipeline = [
{"$facet": {
"by_category": [
{"$group": {"_id": "$category", "count": {"$sum": 1}}}
],
"by_price_range": [
{"$bucket": {
"groupBy": "$price",
"boundaries": [0, 50, 100, 500, 1000],
"default": "Other",
"output": {"count": {"$sum": 1}}
}}
],
"statistics": [
{"$group": {
"_id": None,
"total_products": {"$sum": 1},
"avg_price": {"$avg": "$price"},
"max_price": {"$max": "$price"}
}}
]
}}
]
Window Functions¶
pipeline = [
{"$setWindowFields": {
"partitionBy": "$department",
"sortBy": {"salary": -1},
"output": {
"rank": {"$rank": {}},
"dept_avg": {
"$avg": "$salary",
"window": {"documents": ["unbounded", "unbounded"]}
}
}
}}
]
Change Streams¶
Watch Collection¶
# Basic change stream
with collection.watch() as stream:
for change in stream:
print(f"Operation: {change['operationType']}")
if 'fullDocument' in change:
print(f"Document: {change['fullDocument']}")
With Pipeline Filter¶
pipeline = [
{"$match": {
"operationType": {"$in": ["insert", "update"]},
"fullDocument.priority": "high"
}}
]
with collection.watch(pipeline) as stream:
for change in stream:
process_high_priority(change)
Resume After Disconnect¶
resume_token = None
try:
with collection.watch() as stream:
for change in stream:
process(change)
resume_token = change['_id']
except Exception:
# Resume from last position
with collection.watch(resume_after=resume_token) as stream:
for change in stream:
process(change)
Async Change Stream (Motor)¶
import motor.motor_asyncio
async def watch_changes():
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017/')
collection = client.mydb.mycollection
async with collection.watch() as stream:
async for change in stream:
print(change)
Transactions¶
Basic Transaction¶
with client.start_session() as session:
with session.start_transaction():
# Transfer money
accounts.update_one(
{"_id": "account_a"},
{"$inc": {"balance": -100}},
session=session
)
accounts.update_one(
{"_id": "account_b"},
{"$inc": {"balance": 100}},
session=session
)
# Transaction commits automatically on exit
With Error Handling¶
def transfer_with_retry(session):
while True:
try:
with session.start_transaction():
# Debit
accounts.update_one(
{"_id": "from_account", "balance": {"$gte": 100}},
{"$inc": {"balance": -100}},
session=session
)
# Credit
accounts.update_one(
{"_id": "to_account"},
{"$inc": {"balance": 100}},
session=session
)
# Log transaction
transactions.insert_one({
"from": "from_account",
"to": "to_account",
"amount": 100,
"timestamp": datetime.now()
}, session=session)
return # Success
except (ConnectionFailure, OperationFailure) as e:
if e.has_error_label("TransientTransactionError"):
continue # Retry
raise
with client.start_session() as session:
transfer_with_retry(session)
Index Management¶
Create Indexes¶
# Single field
collection.create_index("email")
# Compound
collection.create_index([("category", 1), ("price", -1)])
# Unique
collection.create_index("username", unique=True)
# Sparse
collection.create_index("optional_field", sparse=True)
# TTL (expire after 1 hour)
collection.create_index("created_at", expireAfterSeconds=3600)
# Text index
collection.create_index([("title", "text"), ("content", "text")])
# Geospatial
collection.create_index([("location", "2dsphere")])
Manage Indexes¶
# List indexes
for index in collection.list_indexes():
print(index)
# Drop index
collection.drop_index("email_1")
# Drop all indexes
collection.drop_indexes()
Migration from MongoDB¶
Export/Import¶
# Export from MongoDB
mongodump --uri="mongodb://source:27017/mydb" --out=/backup
# Import to HeliosDB
mongorestore --uri="mongodb://heliosdb:27017/mydb" /backup
Connection String Migration¶
import os
# Environment-based connection
MONGODB_URI = os.environ.get(
"MONGODB_URI",
"mongodb://localhost:27017/" # Default to HeliosDB
)
client = MongoClient(MONGODB_URI)
Compatibility Check¶
def check_compatibility():
"""Verify HeliosDB MongoDB compatibility"""
client = MongoClient("mongodb://localhost:27017/")
db = client.test_db
# Test CRUD
result = db.test.insert_one({"test": True})
assert result.inserted_id
doc = db.test.find_one({"test": True})
assert doc is not None
db.test.update_one({"test": True}, {"$set": {"verified": True}})
db.test.delete_one({"test": True})
# Test aggregation
db.test.insert_many([{"x": i} for i in range(10)])
result = list(db.test.aggregate([
{"$group": {"_id": None, "sum": {"$sum": "$x"}}}
]))
assert result[0]["sum"] == 45
print("All compatibility checks passed!")
Related: README.md | CONFIGURATION.md | COMPATIBILITY.md
Last Updated: December 2025