Skip to content

MongoDB Protocol Examples

Practical code examples and usage patterns for HeliosDB's MongoDB compatibility.

Connection Examples

Python (PyMongo)

from pymongo import MongoClient

# Basic connection
client = MongoClient("mongodb://localhost:27017/")
db = client.my_database
collection = db.my_collection

# With authentication
client = MongoClient(
    "mongodb://user:password@localhost:27017/",
    authSource="admin",
    authMechanism="SCRAM-SHA-256"
)

# With TLS
client = MongoClient(
    "mongodb://localhost:27017/",
    tls=True,
    tlsCAFile="/path/to/ca.pem"
)

Node.js

const { MongoClient } = require('mongodb');

async function connect() {
    const client = new MongoClient('mongodb://localhost:27017/', {
        auth: { username: 'user', password: 'password' },
        authSource: 'admin',
        maxPoolSize: 50
    });

    await client.connect();
    const db = client.db('my_database');
    return db;
}

Java

import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;

MongoClient client = MongoClients.create(
    "mongodb://user:password@localhost:27017/?authSource=admin"
);
MongoDatabase database = client.getDatabase("my_database");

CRUD Operations

Insert Documents

# Insert one
result = collection.insert_one({
    "name": "Alice",
    "email": "alice@example.com",
    "age": 30,
    "created_at": datetime.now()
})
print(f"Inserted: {result.inserted_id}")

# Insert many
docs = [
    {"name": "Bob", "age": 25},
    {"name": "Charlie", "age": 35},
    {"name": "Diana", "age": 28}
]
result = collection.insert_many(docs)
print(f"Inserted {len(result.inserted_ids)} documents")

Find Documents

# Find one
user = collection.find_one({"name": "Alice"})

# Find many with filter
adults = collection.find({"age": {"$gte": 18}})
for user in adults:
    print(user)

# With projection
users = collection.find(
    {"age": {"$gte": 21}},
    {"name": 1, "email": 1, "_id": 0}
)

# With sorting and pagination
results = collection.find({"status": "active"}) \
    .sort("created_at", -1) \
    .skip(10) \
    .limit(5)

Update Documents

# Update one
collection.update_one(
    {"_id": ObjectId("...")},
    {"$set": {"status": "active"}}
)

# Update many
collection.update_many(
    {"category": "electronics"},
    {"$inc": {"view_count": 1}}
)

# Upsert
collection.update_one(
    {"email": "new@example.com"},
    {"$set": {"name": "New User"}},
    upsert=True
)

# Array operations
collection.update_one(
    {"_id": user_id},
    {"$push": {"tags": "premium"}}
)

collection.update_one(
    {"_id": user_id},
    {"$pull": {"notifications": {"read": True}}}
)

Delete Documents

# Delete one
collection.delete_one({"_id": ObjectId("...")})

# Delete many
result = collection.delete_many({"status": "archived"})
print(f"Deleted {result.deleted_count} documents")

Query Operators

Comparison Operators

# Greater than
collection.find({"age": {"$gt": 21}})

# Between (range)
collection.find({"age": {"$gte": 18, "$lte": 65}})

# In list
collection.find({"status": {"$in": ["active", "pending"]}})

# Not equal
collection.find({"role": {"$ne": "admin"}})

Logical Operators

# AND (implicit)
collection.find({"age": {"$gte": 18}, "status": "active"})

# OR
collection.find({
    "$or": [
        {"role": "admin"},
        {"permissions": {"$in": ["manage_users"]}}
    ]
})

# AND with OR
collection.find({
    "status": "active",
    "$or": [
        {"age": {"$lt": 18}},
        {"guardian": {"$exists": True}}
    ]
})

Array Operators

# Contains all elements
collection.find({"tags": {"$all": ["mongodb", "database"]}})

# Array element matches
collection.find({
    "items": {
        "$elemMatch": {
            "price": {"$gt": 100},
            "quantity": {"$gte": 5}
        }
    }
})

# Array size
collection.find({"tags": {"$size": 3}})
# Create text index
collection.create_index([("content", "text"), ("title", "text")])

# Text search
collection.find({"$text": {"$search": "mongodb database"}})

# With score
collection.find(
    {"$text": {"$search": "mongodb"}},
    {"score": {"$meta": "textScore"}}
).sort([("score", {"$meta": "textScore"})])

Aggregation Pipeline

Basic Aggregation

# Group and count
pipeline = [
    {"$match": {"status": "completed"}},
    {"$group": {
        "_id": "$category",
        "total": {"$sum": "$amount"},
        "count": {"$sum": 1},
        "average": {"$avg": "$amount"}
    }},
    {"$sort": {"total": -1}},
    {"$limit": 10}
]
results = collection.aggregate(pipeline)

Lookup (Join)

# Join orders with customers
pipeline = [
    {"$lookup": {
        "from": "customers",
        "localField": "customer_id",
        "foreignField": "_id",
        "as": "customer"
    }},
    {"$unwind": "$customer"},
    {"$project": {
        "order_id": 1,
        "total": 1,
        "customer_name": "$customer.name",
        "customer_email": "$customer.email"
    }}
]

Advanced Lookup with Pipeline

pipeline = [
    {"$lookup": {
        "from": "products",
        "let": {"order_items": "$items"},
        "pipeline": [
            {"$match": {
                "$expr": {"$in": ["$_id", "$$order_items"]}
            }},
            {"$project": {"name": 1, "price": 1}}
        ],
        "as": "product_details"
    }}
]

Faceted Aggregation

pipeline = [
    {"$facet": {
        "by_category": [
            {"$group": {"_id": "$category", "count": {"$sum": 1}}}
        ],
        "by_price_range": [
            {"$bucket": {
                "groupBy": "$price",
                "boundaries": [0, 50, 100, 500, 1000],
                "default": "Other",
                "output": {"count": {"$sum": 1}}
            }}
        ],
        "statistics": [
            {"$group": {
                "_id": None,
                "total_products": {"$sum": 1},
                "avg_price": {"$avg": "$price"},
                "max_price": {"$max": "$price"}
            }}
        ]
    }}
]

Window Functions

pipeline = [
    {"$setWindowFields": {
        "partitionBy": "$department",
        "sortBy": {"salary": -1},
        "output": {
            "rank": {"$rank": {}},
            "dept_avg": {
                "$avg": "$salary",
                "window": {"documents": ["unbounded", "unbounded"]}
            }
        }
    }}
]

Change Streams

Watch Collection

# Basic change stream
with collection.watch() as stream:
    for change in stream:
        print(f"Operation: {change['operationType']}")
        if 'fullDocument' in change:
            print(f"Document: {change['fullDocument']}")

With Pipeline Filter

pipeline = [
    {"$match": {
        "operationType": {"$in": ["insert", "update"]},
        "fullDocument.priority": "high"
    }}
]

with collection.watch(pipeline) as stream:
    for change in stream:
        process_high_priority(change)

Resume After Disconnect

resume_token = None

try:
    with collection.watch() as stream:
        for change in stream:
            process(change)
            resume_token = change['_id']
except Exception:
    # Resume from last position
    with collection.watch(resume_after=resume_token) as stream:
        for change in stream:
            process(change)

Async Change Stream (Motor)

import motor.motor_asyncio

async def watch_changes():
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017/')
    collection = client.mydb.mycollection

    async with collection.watch() as stream:
        async for change in stream:
            print(change)

Transactions

Basic Transaction

with client.start_session() as session:
    with session.start_transaction():
        # Transfer money
        accounts.update_one(
            {"_id": "account_a"},
            {"$inc": {"balance": -100}},
            session=session
        )
        accounts.update_one(
            {"_id": "account_b"},
            {"$inc": {"balance": 100}},
            session=session
        )
        # Transaction commits automatically on exit

With Error Handling

def transfer_with_retry(session):
    while True:
        try:
            with session.start_transaction():
                # Debit
                accounts.update_one(
                    {"_id": "from_account", "balance": {"$gte": 100}},
                    {"$inc": {"balance": -100}},
                    session=session
                )
                # Credit
                accounts.update_one(
                    {"_id": "to_account"},
                    {"$inc": {"balance": 100}},
                    session=session
                )
                # Log transaction
                transactions.insert_one({
                    "from": "from_account",
                    "to": "to_account",
                    "amount": 100,
                    "timestamp": datetime.now()
                }, session=session)
            return  # Success
        except (ConnectionFailure, OperationFailure) as e:
            if e.has_error_label("TransientTransactionError"):
                continue  # Retry
            raise

with client.start_session() as session:
    transfer_with_retry(session)

Index Management

Create Indexes

# Single field
collection.create_index("email")

# Compound
collection.create_index([("category", 1), ("price", -1)])

# Unique
collection.create_index("username", unique=True)

# Sparse
collection.create_index("optional_field", sparse=True)

# TTL (expire after 1 hour)
collection.create_index("created_at", expireAfterSeconds=3600)

# Text index
collection.create_index([("title", "text"), ("content", "text")])

# Geospatial
collection.create_index([("location", "2dsphere")])

Manage Indexes

# List indexes
for index in collection.list_indexes():
    print(index)

# Drop index
collection.drop_index("email_1")

# Drop all indexes
collection.drop_indexes()

Migration from MongoDB

Export/Import

# Export from MongoDB
mongodump --uri="mongodb://source:27017/mydb" --out=/backup

# Import to HeliosDB
mongorestore --uri="mongodb://heliosdb:27017/mydb" /backup

Connection String Migration

import os

# Environment-based connection
MONGODB_URI = os.environ.get(
    "MONGODB_URI",
    "mongodb://localhost:27017/"  # Default to HeliosDB
)

client = MongoClient(MONGODB_URI)

Compatibility Check

def check_compatibility():
    """Verify HeliosDB MongoDB compatibility"""
    client = MongoClient("mongodb://localhost:27017/")
    db = client.test_db

    # Test CRUD
    result = db.test.insert_one({"test": True})
    assert result.inserted_id

    doc = db.test.find_one({"test": True})
    assert doc is not None

    db.test.update_one({"test": True}, {"$set": {"verified": True}})
    db.test.delete_one({"test": True})

    # Test aggregation
    db.test.insert_many([{"x": i} for i in range(10)])
    result = list(db.test.aggregate([
        {"$group": {"_id": None, "sum": {"$sum": "$x"}}}
    ]))
    assert result[0]["sum"] == 45

    print("All compatibility checks passed!")

Related: README.md | CONFIGURATION.md | COMPATIBILITY.md

Last Updated: December 2025