Redis高级数据结构实战:从缓存到分布式计算

一、HyperLogLog:亿级UV统计

统计每天的独立用户数(UV),如果用Set存储用户ID,1亿用户需要约800MB。HyperLogLog用12KB内存就能统计,误差率约0.81%:

import redis

r = redis.Redis()

# 用户每次访问时记录
def record_visit(user_id: str, page: str, date: str):
    key = f"uv:{page}:{date}"
    r.pfadd(key, user_id)  # 自动去重

# 统计当天UV
def get_uv(page: str, date: str) -> int:
    return r.pfcount(f"uv:{page}:{date}")

# 合并多天UV(跨天统计)
def get_weekly_uv(page: str, start_date: str, end_date: str) -> int:
    keys = [f"uv:{page}:{date}" for date in date_range(start_date, end_date)]
    dest_key = f"uv:{page}:weekly"
    r.pfmerge(dest_key, *keys)
    return r.pfcount(dest_key)

# 压测:100万用户,12KB内存 vs Set的800MB
import time
pipe = r.pipeline()
for i in range(1_000_000):
    pipe.pfadd("test_hll", f"user_{i}")
pipe.execute()
print(f"HLL统计结果: {r.pfcount('test_hll')}")  # 约1000000,误差<1%

二、Sorted Set:实时排行榜

# 游戏积分排行榜
class LeaderBoard:
    def __init__(self, name: str):
        self.key = f"leaderboard:{name}"

    def add_score(self, user_id: str, score: float):
        """添加或更新用户分数"""
        self.r.zadd(self.key, {user_id: score})

    def increment_score(self, user_id: str, delta: float) -> float:
        """原子性地增加分数"""
        return self.r.zincrby(self.key, delta, user_id)

    def get_rank(self, user_id: str) -> int:
        """获取排名(从0开始)"""
        rank = self.r.zrevrank(self.key, user_id)
        return rank + 1 if rank is not None else None

    def get_top_n(self, n: int = 10) -> list:
        """获取Top N,带分数"""
        results = self.r.zrevrange(self.key, 0, n-1, withscores=True)
        return [{"user_id": uid.decode(), "score": score, "rank": i+1}
                for i, (uid, score) in enumerate(results)]

    def get_around_me(self, user_id: str, range_count: int = 5) -> list:
        """获取我附近的排名(上下各N名)"""
        rank = self.r.zrevrank(self.key, user_id)
        start = max(0, rank - range_count)
        stop = rank + range_count
        results = self.r.zrevrange(self.key, start, stop, withscores=True)
        return results

三、Bitmap:用户签到统计

from datetime import date, timedelta

class CheckInSystem:
    def check_in(self, user_id: int, check_date: date = None):
        """用户签到"""
        if check_date is None:
            check_date = date.today()

        # key: checkin:YYYYMM,bit位置=用户ID
        key = f"checkin:{check_date.strftime('%Y%m')}"
        day = check_date.day

        # SETBIT key offset value
        r.setbit(key, user_id * 32 + day, 1)

    def get_monthly_checkin(self, user_id: int, year: int, month: int) -> list:
        """获取某月的签到记录"""
        key = f"checkin:{year:04d}{month:02d}"
        checkins = []
        days_in_month = 31

        for day in range(1, days_in_month + 1):
            offset = user_id * 32 + day
            if r.getbit(key, offset):
                checkins.append(day)
        return checkins

    def get_continuous_checkin_days(self, user_id: int) -> int:
        """计算当前连续签到天数"""
        today = date.today()
        continuous = 0
        current = today

        while True:
            key = f"checkin:{current.strftime('%Y%m')}"
            if r.getbit(key, user_id * 32 + current.day):
                continuous += 1
                current -= timedelta(days=1)
            else:
                break
        return continuous

四、Stream:轻量级消息队列

# Stream比List更适合做消息队列:
# - 支持消费者组(多消费者协作,不重复消费)
# - 消息持久化,消费失败可重试
# - 支持消息确认(ACK)

# 生产者
def publish_order_event(order_id: str, event_type: str, data: dict):
    r.xadd(
        "orders:events",      # stream名
        {                      # 消息内容
            "order_id": order_id,
            "event_type": event_type,
            "data": json.dumps(data)
        },
        maxlen=10000           # 最多保留10000条消息
    )

# 消费者组(初始化一次)
r.xgroup_create("orders:events", "order-processor", id="0", mkstream=True)

# 消费者
def consume_orders():
    consumer_name = f"worker-{os.getpid()}"

    while True:
        # 读取未消费的消息,阻塞等待最多1秒
        messages = r.xreadgroup(
            groupname="order-processor",
            consumername=consumer_name,
            streams={"orders:events": ">"},  # ">" = 只读新消息
            count=10,
            block=1000
        )

        if not messages:
            continue

        for stream, msgs in messages:
            for msg_id, data in msgs:
                try:
                    process_order_event(data)
                    # 处理成功,确认消息
                    r.xack("orders:events", "order-processor", msg_id)
                except Exception as e:
                    # 处理失败,不ACK,消息会留在PEL(待确认列表)中
                    log.error(f"处理消息{msg_id}失败: {e}")

# 处理超时未确认的消息(重新分配给其他消费者)
def reprocess_stuck_messages():
    stuck = r.xpending_range("orders:events", "order-processor",
                             min="-", max="+",
                             count=100,
                             idle=60000)  # 超过60秒未ACK
    for msg in stuck:
        r.xclaim("orders:events", "order-processor",
                "recovery-worker", 60000, [msg["message_id"]])

五、Lua脚本:原子操作

-- 秒杀库存扣减(原子操作,防超卖)
local key = KEYS[1]          -- 商品库存key
local user_key = KEYS[2]     -- 用户购买记录key
local user_id = ARGV[1]      -- 用户ID
local quantity = tonumber(ARGV[2])  -- 购买数量

-- 检查用户是否已购买
if redis.call('SISMEMBER', user_key, user_id) == 1 then
    return -1  -- 已购买,不能重复
end

-- 检查库存
local stock = tonumber(redis.call('GET', key))
if stock == nil or stock < quantity then
    return 0  -- 库存不足
end

-- 扣减库存并记录用户
redis.call('DECRBY', key, quantity)
redis.call('SADD', user_key, user_id)
return 1  -- 成功
# Python调用Lua脚本
script = r.register_script(lua_script)
result = script(keys=[f"stock:{product_id}", f"buyers:{product_id}"],
                args=[user_id, quantity])

Redis不只是缓存,合理使用高级数据结构,很多业务功能可以在Redis层直接完成,避免复杂的数据库操作。