Redis 企业级应用

企业级 Redis 应用实战 🚀

1.1 延时任务队列(Delay Queue)

🧩 应用场景:

  • 电商:订单支付超时自动取消、逾期提醒
  • 通知系统:发送延时消息
  • 缓存失效处理补偿任务等

⚙️ 原理设计:

  • 利用 ZSET(有序集合),score 存储任务执行时间戳
  • 通过 ZRANGEBYSCORE(min, now) 拉取到期任务
  • 消费后 ZREM 删除任务
  • 批量查询 + 限流控制 + 拆分槽位优化轮询性能

✅ Python 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time, redis

r = redis.Redis(decode_responses=True)
DELAY_QUEUE = "delay:zset"

# 添加延时任务
def enqueue(task_id, delay_seconds):
score = int(time.time()) + delay_seconds
r.zadd(DELAY_QUEUE, {task_id: score})
print(f"[加入延时任务] {task_id}, 执行时间戳 {score}")

# 拉取并消费延迟任务
def process_due(max_items=10):
now = int(time.time())
tasks = r.zrangebyscore(DELAY_QUEUE, 0, now, 0, max_items)
for task in tasks:
print(f"[处理任务] {task}")
r.zrem(DELAY_QUEUE, task)

# 企业优化建议:
# - 分片 ZSET 降低扫描压力
# - 多进程加锁竞抢任务
# - 时间轮结构或 Redis Streams 延时代替

💡 Tips:避免轮询死循环,使用 LIMIT + sleepBLPOP 混合执行策略 ([MoldStud][1], [Medium][2], [Medium][3])。


1.2 分布式锁(Distributed Lock)

🧠 为什么需要:

  • 分布式环境中防止并发访问相同资源,如订单幂等操作、资金扣款等。

✅ 单节点锁方案(推荐用于效率优化类场景)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
lock_key = "lock:order:123"
lock_val = str(uuid.uuid4())

# 获取锁
r.set(lock_key, lock_val, nx=True, ex=5)

# 释放锁,确保只有持有者能删
unlock_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
r.eval(unlock_script, 1, lock_key, lock_val)

🔍 RedLock 算法(多节点一致性方案)

  • 在多个独立主节点上依次申请锁
  • 若成功获得多数锁且总耗时小于 TTL,则认为获取成功
  • 功能更强但复杂度高,依赖同步时间与网络一致性,适合强一致场景

⚠️ 注意事项与风险

问题 说明
长事务 TTL 设置需略大于业务处理时间
死锁 使用 Lua 解锁,避免误删其他客户端锁
Redlock 风险 Timing assumptions 不可靠时可能失效 ([martin.kleppmann.com][4])

1.3 Redis 做消息队列(MQ)

📥 List + 阻塞弹出(BLPOP/RPOP)

  • 简单场景适用,单消费者或少量消费者
  • 支持优先队列结构也可用多个 list 模拟多个优先级
1
2
3
4
5
6
7
8
QUEUE = "mq:list"
def produce(msg):
r.lpush(QUEUE, msg)
def consume():
while True:
item = r.brpop(QUEUE, timeout=5)
if item:
print("处理:", item[1])

🔄 Redis Stream 消费组(>=5.0)

  • 支持持久消息、消费确认、失败重试、多个消费者组
  • 经济替代 Kafka 的高性能场景
1
2
3
r.xadd("stream", {"event": "login", "user": "alice"})
r.xgroup_create("stream", "grp", mkstream=True)
r.xreadgroup("grp", "consumer1", {"stream": ">"}, count=1, block=2000)

🔍 Stream 模拟 Kafka 特性 + Python 示例说明场景 。


1.4 实时统计系统

🎯 UV/PV 去重统计(HyperLogLog)

  • UV(独立访客)去重能力强,内存仅 ~12KB
1
2
r.pfadd("uv:202507", "user123")
print("UV 估计值:", r.pfcount("uv:202507"))

🧮 在线用户布阵(BitMap)

  • 高效存储用户状态,支持快速统计
1
2
r.setbit("online", user_id, 1)
r.bitcount("online")

✅ 总结与建议

  • 使用前推荐评估业务严苛程度选锁策略
  • 延时队列尽量配合分布式锁避免重复处理
  • 消息队列根据消费模型选择 List 或 Stream
  • 统计类任务推荐 HyperLogLog 和 BitMap 存储结构
  • 强调监控锁状态与延时队列处理效率(Prometheus + Redis Insight)

Redis 企业级应用
https://dreamshao.github.io/2025/07/16/redis应用实战/
作者
Yun Shao
发布于
2025年7月16日
许可协议