Skip to content

Redis RESP3 Examples

Practical examples for HeliosDB's Redis RESP3 protocol support.

Connection Examples

Python (redis-py)

import redis

# Basic connection
client = redis.Redis(
    host='localhost',
    port=6379,
    protocol=3,
    decode_responses=True
)

# Test connection
print(client.ping())  # True

Node.js (node-redis)

const { createClient } = require('redis');

const client = createClient({
    url: 'redis://localhost:6379'
});

await client.connect();
console.log(await client.ping());  // PONG

Go (go-redis)

import "github.com/redis/go-redis/v9"

client := redis.NewClient(&redis.Options{
    Addr:     "localhost:6379",
    Protocol: 3,
})

ctx := context.Background()
pong, _ := client.Ping(ctx).Result()
fmt.Println(pong)  // PONG

String Operations

Basic CRUD

# SET and GET
client.set('user:1:name', 'Alice')
name = client.get('user:1:name')  # 'Alice'

# SET with options
client.set('session:xyz', 'token123', ex=3600)  # Expire in 1 hour
client.set('lock:resource', 'owner1', nx=True)  # Set if not exists

# Multi-set operations
client.mset({
    'user:1:name': 'Alice',
    'user:1:email': 'alice@example.com',
    'user:1:age': '28'
})

values = client.mget('user:1:name', 'user:1:email', 'user:1:age')
# ['Alice', 'alice@example.com', '28']

Counters

# Increment/Decrement
client.set('counter', '10')
client.incr('counter')       # 11
client.incrby('counter', 5)  # 16
client.decr('counter')       # 15
client.incrbyfloat('score', 0.5)

List Operations

Queue Implementation

# Push operations
client.rpush('queue:jobs', 'job1', 'job2', 'job3')

# Pop operations
job = client.rpop('queue:jobs')  # FIFO

# Blocking pop (wait for item)
job = client.brpop('queue:jobs', timeout=10)

# Multiple queue blocking
result = client.brpop(['queue:1', 'queue:2', 'queue:3'], timeout=10)

List Range Queries

# Get range
items = client.lrange('queue:jobs', 0, -1)   # All items
first_5 = client.lrange('queue:jobs', 0, 4)  # First 5

# Trim
client.ltrim('queue:jobs', 0, 99)  # Keep first 100

Set Operations

Membership Testing

# Add to set
client.sadd('users:online', 'alice', 'bob', 'charlie')

# Check membership
is_online = client.sismember('users:online', 'alice')  # True

# Get all members
members = client.smembers('users:online')

Set Algebra

client.sadd('users:premium', 'alice', 'dave')
client.sadd('users:active', 'bob', 'charlie', 'dave')

# Union
combined = client.sunion('users:premium', 'users:active')
# {'alice', 'bob', 'charlie', 'dave'}

# Intersection
both = client.sinter('users:premium', 'users:active')
# {'dave'}

# Difference
diff = client.sdiff('users:premium', 'users:active')
# {'alice'}

Sorted Set Operations

Leaderboard

# Add with scores
client.zadd('leaderboard:monthly', {
    'alice': 1500,
    'bob': 1200,
    'charlie': 1800
})

# Increment score
client.zincrby('leaderboard:monthly', 100, 'alice')

# Get top 3
top_3 = client.zrevrange('leaderboard:monthly', 0, 2, withscores=True)
# [(b'charlie', 1800.0), (b'alice', 1600.0), (b'bob', 1200.0)]

# Get rank
rank = client.zrevrank('leaderboard:monthly', 'alice')  # 1 (0-indexed)

Hash Operations

Object Storage

# Set hash fields
client.hset('user:1', mapping={
    'name': 'Alice',
    'email': 'alice@example.com',
    'age': '28',
    'city': 'NYC'
})

# Get single field
name = client.hget('user:1', 'name')

# Get all fields
user_data = client.hgetall('user:1')

# Increment numeric field
client.hincrby('user:1', 'visits', 1)

Pub/Sub Messaging

Publisher

def publisher(client):
    import time
    for i in range(5):
        client.publish('news:sports', f'Score update {i}')
        time.sleep(1)

Subscriber

def subscriber(client):
    pubsub = client.pubsub()
    pubsub.subscribe('news:sports')

    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"Received: {message['data']}")

Pattern Subscription

pubsub = client.pubsub()
pubsub.psubscribe('news:*')  # All news channels

for message in pubsub.listen():
    if message['type'] == 'pmessage':
        print(f"Channel: {message['channel']}, Data: {message['data']}")

Streams

Basic Stream Operations

# Add to stream
stream_id = client.xadd('events', {
    'type': 'purchase',
    'user': 'alice',
    'amount': '99.99'
})

# Read from stream
events = client.xread({'events': '0'}, count=10)

Consumer Groups

# Create consumer group
client.xgroup_create('events', 'mygroup', id='0', mkstream=True)

# Read as consumer
events = client.xreadgroup(
    groupname='mygroup',
    consumername='consumer1',
    streams={'events': '>'},
    count=10
)

# Acknowledge processing
for stream, messages in events:
    for msg_id, data in messages:
        # Process message
        client.xack('events', 'mygroup', msg_id)

Transactions

Basic Transaction

# Start transaction
pipe = client.pipeline()
pipe.incr('counter')
pipe.hset('stats', 'last_update', time.time())
pipe.lpush('history', 'update')
results = pipe.execute()

Optimistic Locking

with client.pipeline() as pipe:
    while True:
        try:
            pipe.watch('balance')
            balance = int(client.get('balance') or 0)

            if balance >= 100:
                pipe.multi()
                pipe.decrby('balance', 100)
                pipe.execute()
                break
            else:
                pipe.unwatch()
                raise Exception('Insufficient balance')
        except redis.WatchError:
            continue  # Retry

Lua Scripting

Basic Script

# Rate limiter script
rate_limit_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = tonumber(redis.call('GET', key) or 0)

if current < limit then
    redis.call('INCR', key)
    redis.call('EXPIRE', key, window)
    return 1
else
    return 0
end
"""

script = client.register_script(rate_limit_script)
result = script(keys=['rate:user:123'], args=[100, 60])

Real-World Patterns

Rate Limiter

class RateLimiter:
    def __init__(self, client, limit=100, window=60):
        self.client = client
        self.limit = limit
        self.window = window

    def is_allowed(self, user_id):
        key = f'rate:{user_id}'
        current = self.client.incr(key)
        if current == 1:
            self.client.expire(key, self.window)
        return current <= self.limit

limiter = RateLimiter(client, limit=100, window=60)
if limiter.is_allowed('user:123'):
    # Process request
    pass

Distributed Lock

import uuid

class DistributedLock:
    def __init__(self, client, name, timeout=10):
        self.client = client
        self.name = f'lock:{name}'
        self.token = str(uuid.uuid4())
        self.timeout = timeout

    def acquire(self):
        return self.client.set(
            self.name, self.token,
            nx=True, ex=self.timeout
        )

    def release(self):
        if self.client.get(self.name) == self.token:
            self.client.delete(self.name)

    def __enter__(self):
        if not self.acquire():
            raise Exception('Could not acquire lock')
        return self

    def __exit__(self, *args):
        self.release()

with DistributedLock(client, 'critical_resource'):
    # Critical section
    pass

Session Manager

import json
import uuid
import time

class SessionManager:
    def __init__(self, client, ttl=3600):
        self.client = client
        self.ttl = ttl

    def create(self, user_id, data):
        session_id = str(uuid.uuid4())
        session_key = f'session:{session_id}'

        self.client.hset(session_key, mapping={
            'user_id': user_id,
            'created': time.time(),
            'data': json.dumps(data)
        })
        self.client.expire(session_key, self.ttl)

        # Index by user
        self.client.sadd(f'user_sessions:{user_id}', session_id)

        return session_id

    def get(self, session_id):
        session_key = f'session:{session_id}'
        data = self.client.hgetall(session_key)
        if data:
            data['data'] = json.loads(data.get('data', '{}'))
        return data

    def destroy(self, session_id):
        session = self.get(session_id)
        if session:
            user_id = session['user_id']
            self.client.delete(f'session:{session_id}')
            self.client.srem(f'user_sessions:{user_id}', session_id)

Job Queue

import json
import time

class JobQueue:
    def __init__(self, client, name):
        self.client = client
        self.name = name
        self.pending = f'queue:{name}:pending'
        self.processing = f'queue:{name}:processing'
        self.failed = f'queue:{name}:failed'

    def enqueue(self, job_data, priority=0):
        job_id = str(uuid.uuid4())
        job = {
            'id': job_id,
            'data': job_data,
            'created': time.time(),
            'attempts': 0
        }
        self.client.zadd(self.pending, {json.dumps(job): priority})
        return job_id

    def dequeue(self, timeout=0):
        # Move from pending to processing
        result = self.client.bzpopmin(self.pending, timeout=timeout)
        if result:
            job = json.loads(result[1])
            job['attempts'] += 1
            self.client.hset(self.processing, job['id'], json.dumps(job))
            return job
        return None

    def complete(self, job_id):
        self.client.hdel(self.processing, job_id)

    def fail(self, job_id, max_retries=3):
        job_json = self.client.hget(self.processing, job_id)
        if job_json:
            job = json.loads(job_json)
            self.client.hdel(self.processing, job_id)

            if job['attempts'] < max_retries:
                # Re-queue with delay
                self.client.zadd(
                    self.pending,
                    {json.dumps(job): time.time() + 60}
                )
            else:
                # Move to failed queue
                self.client.lpush(self.failed, json.dumps(job))

Geospatial Examples

Location-Based Services

# Add locations
client.geoadd('restaurants', [
    (-122.4194, 37.7749, 'sf_diner'),
    (-122.4094, 37.7849, 'sf_cafe'),
    (-122.4294, 37.7649, 'sf_bistro')
])

# Find nearby
nearby = client.georadius(
    'restaurants',
    longitude=-122.4194,
    latitude=37.7749,
    radius=5,
    unit='km',
    withdist=True
)

# Get distance
distance = client.geodist('restaurants', 'sf_diner', 'sf_cafe', unit='km')

HyperLogLog Examples

Unique Visitors

# Add visitors
for user_id in ['user:1', 'user:2', 'user:3', 'user:1']:  # user:1 duplicate
    client.pfadd('unique_visitors:today', user_id)

# Count unique
count = client.pfcount('unique_visitors:today')  # 3

# Merge multiple days
client.pfmerge(
    'unique_visitors:week',
    'unique_visitors:mon',
    'unique_visitors:tue',
    'unique_visitors:wed'
)

Pipelining for Performance

# Batch operations
pipe = client.pipeline(transaction=False)
for i in range(1000):
    pipe.set(f'key:{i}', f'value:{i}')
results = pipe.execute()

# Mixed operations
pipe = client.pipeline()
pipe.incr('counter')
pipe.lpush('log', 'entry')
pipe.hset('stats', 'last', time.time())
pipe.expire('temp_key', 300)
results = pipe.execute()

Related: README.md | CONFIGURATION.md | COMPATIBILITY.md

Last Updated: December 2025