Redis RESP3 Examples¶
Practical examples for HeliosDB's Redis RESP3 protocol support.
Connection Examples¶
Python (redis-py)¶
import redis
# Basic connection
client = redis.Redis(
host='localhost',
port=6379,
protocol=3,
decode_responses=True
)
# Test connection
print(client.ping()) # True
Node.js (node-redis)¶
const { createClient } = require('redis');
const client = createClient({
url: 'redis://localhost:6379'
});
await client.connect();
console.log(await client.ping()); // PONG
Go (go-redis)¶
import "github.com/redis/go-redis/v9"
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Protocol: 3,
})
ctx := context.Background()
pong, _ := client.Ping(ctx).Result()
fmt.Println(pong) // PONG
String Operations¶
Basic CRUD¶
# SET and GET
client.set('user:1:name', 'Alice')
name = client.get('user:1:name') # 'Alice'
# SET with options
client.set('session:xyz', 'token123', ex=3600) # Expire in 1 hour
client.set('lock:resource', 'owner1', nx=True) # Set if not exists
# Multi-set operations
client.mset({
'user:1:name': 'Alice',
'user:1:email': 'alice@example.com',
'user:1:age': '28'
})
values = client.mget('user:1:name', 'user:1:email', 'user:1:age')
# ['Alice', 'alice@example.com', '28']
Counters¶
# Increment/Decrement
client.set('counter', '10')
client.incr('counter') # 11
client.incrby('counter', 5) # 16
client.decr('counter') # 15
client.incrbyfloat('score', 0.5)
List Operations¶
Queue Implementation¶
# Push operations
client.rpush('queue:jobs', 'job1', 'job2', 'job3')
# Pop operations
job = client.rpop('queue:jobs') # FIFO
# Blocking pop (wait for item)
job = client.brpop('queue:jobs', timeout=10)
# Multiple queue blocking
result = client.brpop(['queue:1', 'queue:2', 'queue:3'], timeout=10)
List Range Queries¶
# Get range
items = client.lrange('queue:jobs', 0, -1) # All items
first_5 = client.lrange('queue:jobs', 0, 4) # First 5
# Trim
client.ltrim('queue:jobs', 0, 99) # Keep first 100
Set Operations¶
Membership Testing¶
# Add to set
client.sadd('users:online', 'alice', 'bob', 'charlie')
# Check membership
is_online = client.sismember('users:online', 'alice') # True
# Get all members
members = client.smembers('users:online')
Set Algebra¶
client.sadd('users:premium', 'alice', 'dave')
client.sadd('users:active', 'bob', 'charlie', 'dave')
# Union
combined = client.sunion('users:premium', 'users:active')
# {'alice', 'bob', 'charlie', 'dave'}
# Intersection
both = client.sinter('users:premium', 'users:active')
# {'dave'}
# Difference
diff = client.sdiff('users:premium', 'users:active')
# {'alice'}
Sorted Set Operations¶
Leaderboard¶
# Add with scores
client.zadd('leaderboard:monthly', {
'alice': 1500,
'bob': 1200,
'charlie': 1800
})
# Increment score
client.zincrby('leaderboard:monthly', 100, 'alice')
# Get top 3
top_3 = client.zrevrange('leaderboard:monthly', 0, 2, withscores=True)
# [(b'charlie', 1800.0), (b'alice', 1600.0), (b'bob', 1200.0)]
# Get rank
rank = client.zrevrank('leaderboard:monthly', 'alice') # 1 (0-indexed)
Hash Operations¶
Object Storage¶
# Set hash fields
client.hset('user:1', mapping={
'name': 'Alice',
'email': 'alice@example.com',
'age': '28',
'city': 'NYC'
})
# Get single field
name = client.hget('user:1', 'name')
# Get all fields
user_data = client.hgetall('user:1')
# Increment numeric field
client.hincrby('user:1', 'visits', 1)
Pub/Sub Messaging¶
Publisher¶
def publisher(client):
import time
for i in range(5):
client.publish('news:sports', f'Score update {i}')
time.sleep(1)
Subscriber¶
def subscriber(client):
pubsub = client.pubsub()
pubsub.subscribe('news:sports')
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data']}")
Pattern Subscription¶
pubsub = client.pubsub()
pubsub.psubscribe('news:*') # All news channels
for message in pubsub.listen():
if message['type'] == 'pmessage':
print(f"Channel: {message['channel']}, Data: {message['data']}")
Streams¶
Basic Stream Operations¶
# Add to stream
stream_id = client.xadd('events', {
'type': 'purchase',
'user': 'alice',
'amount': '99.99'
})
# Read from stream
events = client.xread({'events': '0'}, count=10)
Consumer Groups¶
# Create consumer group
client.xgroup_create('events', 'mygroup', id='0', mkstream=True)
# Read as consumer
events = client.xreadgroup(
groupname='mygroup',
consumername='consumer1',
streams={'events': '>'},
count=10
)
# Acknowledge processing
for stream, messages in events:
for msg_id, data in messages:
# Process message
client.xack('events', 'mygroup', msg_id)
Transactions¶
Basic Transaction¶
# Start transaction
pipe = client.pipeline()
pipe.incr('counter')
pipe.hset('stats', 'last_update', time.time())
pipe.lpush('history', 'update')
results = pipe.execute()
Optimistic Locking¶
with client.pipeline() as pipe:
while True:
try:
pipe.watch('balance')
balance = int(client.get('balance') or 0)
if balance >= 100:
pipe.multi()
pipe.decrby('balance', 100)
pipe.execute()
break
else:
pipe.unwatch()
raise Exception('Insufficient balance')
except redis.WatchError:
continue # Retry
Lua Scripting¶
Basic Script¶
# Rate limiter script
rate_limit_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = tonumber(redis.call('GET', key) or 0)
if current < limit then
redis.call('INCR', key)
redis.call('EXPIRE', key, window)
return 1
else
return 0
end
"""
script = client.register_script(rate_limit_script)
result = script(keys=['rate:user:123'], args=[100, 60])
Real-World Patterns¶
Rate Limiter¶
class RateLimiter:
def __init__(self, client, limit=100, window=60):
self.client = client
self.limit = limit
self.window = window
def is_allowed(self, user_id):
key = f'rate:{user_id}'
current = self.client.incr(key)
if current == 1:
self.client.expire(key, self.window)
return current <= self.limit
limiter = RateLimiter(client, limit=100, window=60)
if limiter.is_allowed('user:123'):
# Process request
pass
Distributed Lock¶
import uuid
class DistributedLock:
def __init__(self, client, name, timeout=10):
self.client = client
self.name = f'lock:{name}'
self.token = str(uuid.uuid4())
self.timeout = timeout
def acquire(self):
return self.client.set(
self.name, self.token,
nx=True, ex=self.timeout
)
def release(self):
if self.client.get(self.name) == self.token:
self.client.delete(self.name)
def __enter__(self):
if not self.acquire():
raise Exception('Could not acquire lock')
return self
def __exit__(self, *args):
self.release()
with DistributedLock(client, 'critical_resource'):
# Critical section
pass
Session Manager¶
import json
import uuid
import time
class SessionManager:
def __init__(self, client, ttl=3600):
self.client = client
self.ttl = ttl
def create(self, user_id, data):
session_id = str(uuid.uuid4())
session_key = f'session:{session_id}'
self.client.hset(session_key, mapping={
'user_id': user_id,
'created': time.time(),
'data': json.dumps(data)
})
self.client.expire(session_key, self.ttl)
# Index by user
self.client.sadd(f'user_sessions:{user_id}', session_id)
return session_id
def get(self, session_id):
session_key = f'session:{session_id}'
data = self.client.hgetall(session_key)
if data:
data['data'] = json.loads(data.get('data', '{}'))
return data
def destroy(self, session_id):
session = self.get(session_id)
if session:
user_id = session['user_id']
self.client.delete(f'session:{session_id}')
self.client.srem(f'user_sessions:{user_id}', session_id)
Job Queue¶
import json
import time
class JobQueue:
def __init__(self, client, name):
self.client = client
self.name = name
self.pending = f'queue:{name}:pending'
self.processing = f'queue:{name}:processing'
self.failed = f'queue:{name}:failed'
def enqueue(self, job_data, priority=0):
job_id = str(uuid.uuid4())
job = {
'id': job_id,
'data': job_data,
'created': time.time(),
'attempts': 0
}
self.client.zadd(self.pending, {json.dumps(job): priority})
return job_id
def dequeue(self, timeout=0):
# Move from pending to processing
result = self.client.bzpopmin(self.pending, timeout=timeout)
if result:
job = json.loads(result[1])
job['attempts'] += 1
self.client.hset(self.processing, job['id'], json.dumps(job))
return job
return None
def complete(self, job_id):
self.client.hdel(self.processing, job_id)
def fail(self, job_id, max_retries=3):
job_json = self.client.hget(self.processing, job_id)
if job_json:
job = json.loads(job_json)
self.client.hdel(self.processing, job_id)
if job['attempts'] < max_retries:
# Re-queue with delay
self.client.zadd(
self.pending,
{json.dumps(job): time.time() + 60}
)
else:
# Move to failed queue
self.client.lpush(self.failed, json.dumps(job))
Geospatial Examples¶
Location-Based Services¶
# Add locations
client.geoadd('restaurants', [
(-122.4194, 37.7749, 'sf_diner'),
(-122.4094, 37.7849, 'sf_cafe'),
(-122.4294, 37.7649, 'sf_bistro')
])
# Find nearby
nearby = client.georadius(
'restaurants',
longitude=-122.4194,
latitude=37.7749,
radius=5,
unit='km',
withdist=True
)
# Get distance
distance = client.geodist('restaurants', 'sf_diner', 'sf_cafe', unit='km')
HyperLogLog Examples¶
Unique Visitors¶
# Add visitors
for user_id in ['user:1', 'user:2', 'user:3', 'user:1']: # user:1 duplicate
client.pfadd('unique_visitors:today', user_id)
# Count unique
count = client.pfcount('unique_visitors:today') # 3
# Merge multiple days
client.pfmerge(
'unique_visitors:week',
'unique_visitors:mon',
'unique_visitors:tue',
'unique_visitors:wed'
)
Pipelining for Performance¶
# Batch operations
pipe = client.pipeline(transaction=False)
for i in range(1000):
pipe.set(f'key:{i}', f'value:{i}')
results = pipe.execute()
# Mixed operations
pipe = client.pipeline()
pipe.incr('counter')
pipe.lpush('log', 'entry')
pipe.hset('stats', 'last', time.time())
pipe.expire('temp_key', 300)
results = pipe.execute()
Related: README.md | CONFIGURATION.md | COMPATIBILITY.md
Last Updated: December 2025