1.登录与Cookie缓存

使用一个hash存储登录cookie令牌与已经登录用户之间的映射,检查用户是否已经登录,返回ID

def check_token(conn, token):
    return conn.hget('login:', token)#hget 获取哈希值key的value

更新令牌

def update_token(conn, token, user, item=None):
    # 获取当前时间戳。
    timestamp = time.time()
    # 维持令牌与已登录用户之间的映射。hset设置哈希值
    conn.hset('login:', token, user)
    # 记录令牌最后一次出现的时间。zadd向有序集合添加成员
    conn.zadd('recent:', token, timestamp)
    if item:
        # 记录用户浏览过的商品。
        conn.zadd('viewed:' + token, item, timestamp)
        # 移除旧的记录,只保留用户最近浏览过的25个商品。zremrangebyrank移除有序集和中给定排名区间的所有成员
        conn.zremrangebyrank('viewed:' + token, 0, -26)

清理旧会话

def clean_sessions(conn):
    while not QUIT:
        # 找出目前已有令牌的数量。zcard获取有序集和的成员数
        size = conn.zcard('recent:')
        # 令牌数量未超过限制,休眠并在之后重新检查。
        if size <= LIMIT:
            time.sleep(1)
            continue

        #  获取需要移除的令牌ID。
        end_index = min(size - LIMIT, 100)
        #zrange 通过索引区间返回有序集合指定区间成员
        tokens = conn.zrange('recent:', 0, end_index-1)

        # 为那些将要被删除的令牌构建键名。
        session_keys = []
        for token in tokens:
            session_keys.append('viewed:' + token)

        # 移除最旧的那些令牌。
        conn.delete(*session_keys)
        #hdel删除哈希字段
        conn.hdel('login:', *tokens)
        #zrem移除有序集合中给定的字典区间的所有成员
        conn.zrem('recent:', *tokens)

2 使用Redis实现购物车

使用cookie实现的购物车

购物车的定义:商品ID 商品订购数量

更新购物车:(增加&减少)

def add_to_cart(conn, session, item, count):
    if count <= 0:
        # 从购物车里面移除指定的商品。
        conn.hrem('cart:' + session, item) 
    else:
        # 将指定的商品添加到购物车。
        conn.hset('cart:' + session, item, count) 

清除会话的购物车

def clean_full_sessions(conn):
    while not QUIT:
        size = conn.zcard('recent:')
        if size <= LIMIT:
            time.sleep(1)
            continue

        end_index = min(size - LIMIT, 100)
        sessions = conn.zrange('recent:', 0, end_index-1)

        session_keys = []
        for sess in sessions:
            session_keys.append('viewed:' + sess)
            session_keys.append('cart:' + sess)   # 新增加的这行代码用于删除旧会话对应用户的购物车。

        conn.delete(*session_keys)
        conn.hdel('login:', *sessions)
        conn.zrem('recent:', *sessions)

3 网页缓存

创造一个层来调用Redis缓存函数:对于一个不能缓存的请求,函数直接生成并返回页面;对于可以缓存的请求,函数首先会尝试从缓存中取出并返回被缓存的页面,如果缓存界面不存在,函数生成页面并在Redis中缓存5分钟,最后将页面返回给函数调用者。

def cache_request(conn, request, callback):
    # 对于不能被缓存的请求,直接调用回调函数。
    if not can_cache(conn, request):
        return callback(request)

    # 将请求转换成一个简单的字符串键,方便之后进行查找。
    page_key = 'cache:' + hash_request(request) 
    # 尝试查找被缓存的页面。get获取字符串
    content = conn.get(page_key)

    if not content:
        # 如果页面还没有被缓存,那么生成页面。
        content = callback(request)
        # 将新生成的页面放到缓存里面。 setex将值 value 关联到 key ,并将 key 的生存时间设为 seconds !!!生存时间
        conn.setex(page_key, content, 300)

    # 返回页面。
    return content

4 数据行缓存

编写一个持续运行的守护进程函数,让这个函数将指定的数据行缓存到Redis中,不定期的对这些缓存进行更新。缓存函数将数据行编码为JSON

两个有序集合记录在何时对缓存进行更新:第一个有序集和为调度有序集合,它的成员为数据行的行ID,score是时间戳;第二个有序集合为延时有序集合,成员是行ID,score是指定数据行的缓存需要每隔多久更新。

(Redis不支持嵌套多个结构,就像hash里面不能有列表)

def schedule_row_cache(conn, row_id, delay):
    # 先设置数据行的延迟值。
    conn.zadd('delay:', row_id, delay) 
    # 立即缓存数据行。
    conn.zadd('schedule:', row_id, time.time()) 
def cache_rows(conn):
    while not QUIT:
        # 尝试获取下一个需要被缓存的数据行以及该行的调度时间戳,
        # 命令会返回一个包含零个或一个元组(tuple)的列表。
        next = conn.zrange('schedule:', 0, 0, withscores=True) 
        now = time.time()
        if not next or next[0][1] > now:
            # 暂时没有行需要被缓存,休眠50毫秒后重试。
            time.sleep(.05) 
            continue

        row_id = next[0][0]
        # 获取下一次调度前的延迟时间。
        delay = conn.zscore('delay:', row_id)
        if delay <= 0:
            # 不必再缓存这个行,将它从缓存中移除。
            conn.zrem('delay:', row_id) 
            conn.zrem('schedule:', row_id)
            conn.delete('inv:' + row_id)
            continue

        # 读取数据行。
        row = Inventory.get(row_id)
        # 更新调度时间并设置缓存值。
        conn.zadd('schedule:', row_id, now + delay)         
        conn.set('inv:' + row_id, json.dumps(row.to_dict())) 

5 网页分析

贸然缓存所有数据,内存不够

修改update_token

新添加的代码记录了所有商品浏览次数,并根据浏览次数排序

# 代码清单 2-9
# <start id="_1311_14471_8298"/>
def update_token(conn, token, user, item=None):
    timestamp = time.time()
    conn.hset('login:', token, user)
    conn.zadd('recent:', token, timestamp)
    if item:
        conn.zadd('viewed:' + token, item, timestamp)
        conn.zremrangebyrank('viewed:' + token, 0, -26)
        conn.zincrby('viewed:', item, -1)                   # 这行代码是新添加的。
# <end id="_1311_14471_8298"/>


# 代码清单 2-10
# <start id="_1311_14471_8288"/>
def rescale_viewed(conn):
    while not QUIT:
        # 删除所有排名在20 000名之后的商品。
        conn.zremrangebyrank('viewed:', 20000, -1)
        # 将浏览次数降低为原来的一半
        #zinterstore计算给定有序集并将结果存储
        conn.zinterstore('viewed:', {'viewed:': .5}) 
        # 5分钟之后再执行这个操作。
        time.sleep(300) 
# <end id="_1311_14471_8288"/>


# 代码清单 2-11
# <start id="_1311_14471_8289"/>
def can_cache(conn, request):
    # 尝试从页面里面取出商品ID。
    item_id = extract_item_id(request)
    # 检查这个页面能否被缓存以及这个页面是否为商品页面。
    if not item_id or is_dynamic(request):
        return False
    # 取得商品的浏览次数排名。
    rank = conn.zrank('viewed:', item_id)
    # 根据商品的浏览次数排名来判断是否需要缓存这个页面。
    return rank is not None and rank < 10000 

测试完整代码

# coding: utf-8

import json
import threading
import time
import unittest
import urlparse
import uuid

QUIT = False

# 代码清单 2-1
# <start id="_1311_14471_8266"/>
def check_token(conn, token):
    return conn.hget('login:', token)   # 尝试获取并返回令牌对应的用户。
# <end id="_1311_14471_8266"/>


# 代码清单 2-2
# <start id="_1311_14471_8265"/>
def update_token(conn, token, user, item=None):
    # 获取当前时间戳。
    timestamp = time.time()
    # 维持令牌与已登录用户之间的映射。
    conn.hset('login:', token, user)
    # 记录令牌最后一次出现的时间。
    conn.zadd('recent:', token, timestamp)
    if item:
        # 记录用户浏览过的商品。
        conn.zadd('viewed:' + token, item, timestamp)
        # 移除旧的记录,只保留用户最近浏览过的25个商品。
        conn.zremrangebyrank('viewed:' + token, 0, -26)
# <end id="_1311_14471_8265"/>


# 代码清单 2-3
# <start id="_1311_14471_8270"/>
QUIT = False
LIMIT = 10000000

def clean_sessions(conn):
    while not QUIT:
        # 找出目前已有令牌的数量。
        size = conn.zcard('recent:')
        # 令牌数量未超过限制,休眠并在之后重新检查。
        if size <= LIMIT:
            time.sleep(1)
            continue

        #  获取需要移除的令牌ID。
        end_index = min(size - LIMIT, 100)
        tokens = conn.zrange('recent:', 0, end_index-1)

        # 为那些将要被删除的令牌构建键名。
        session_keys = []
        for token in tokens:
            session_keys.append('viewed:' + token)

        # 移除最旧的那些令牌。
        conn.delete(*session_keys)
        conn.hdel('login:', *tokens)
        conn.zrem('recent:', *tokens)
# <end id="_1311_14471_8270"/>


# 代码清单 2-4
# <start id="_1311_14471_8279"/>
def add_to_cart(conn, session, item, count):
    if count <= 0:
        # 从购物车里面移除指定的商品。
        conn.hrem('cart:' + session, item) 
    else:
        # 将指定的商品添加到购物车。
        conn.hset('cart:' + session, item, count) 
# <end id="_1311_14471_8279"/>


# 代码清单 2-5
# <start id="_1311_14471_8271"/>
def clean_full_sessions(conn):
    while not QUIT:
        size = conn.zcard('recent:')
        if size <= LIMIT:
            time.sleep(1)
            continue

        end_index = min(size - LIMIT, 100)
        sessions = conn.zrange('recent:', 0, end_index-1)

        session_keys = []
        for sess in sessions:
            session_keys.append('viewed:' + sess)
            session_keys.append('cart:' + sess)   # 新增加的这行代码用于删除旧会话对应用户的购物车。

        conn.delete(*session_keys)
        conn.hdel('login:', *sessions)
        conn.zrem('recent:', *sessions)
# <end id="_1311_14471_8271"/>


# 代码清单 2-6
# <start id="_1311_14471_8291"/>
def cache_request(conn, request, callback):
    # 对于不能被缓存的请求,直接调用回调函数。
    if not can_cache(conn, request):
        return callback(request)

    # 将请求转换成一个简单的字符串键,方便之后进行查找。
    page_key = 'cache:' + hash_request(request) 
    # 尝试查找被缓存的页面。
    content = conn.get(page_key)

    if not content:
        # 如果页面还没有被缓存,那么生成页面。
        content = callback(request)
        # 将新生成的页面放到缓存里面。
        conn.setex(page_key, content, 300)

    # 返回页面。
    return content
# <end id="_1311_14471_8291"/>


# 代码清单 2-7
# <start id="_1311_14471_8287"/>
def schedule_row_cache(conn, row_id, delay):
    # 先设置数据行的延迟值。
    conn.zadd('delay:', row_id, delay) 
    # 立即缓存数据行。
    conn.zadd('schedule:', row_id, time.time()) 
# <end id="_1311_14471_8287"/>


# 代码清单 2-8
# <start id="_1311_14471_8292"/>
def cache_rows(conn):
    while not QUIT:
        # 尝试获取下一个需要被缓存的数据行以及该行的调度时间戳,
        # 命令会返回一个包含零个或一个元组(tuple)的列表。
        next = conn.zrange('schedule:', 0, 0, withscores=True) 
        now = time.time()
        if not next or next[0][1] > now:
            # 暂时没有行需要被缓存,休眠50毫秒后重试。
            time.sleep(.05) 
            continue

        row_id = next[0][0]
        # 获取下一次调度前的延迟时间。
        delay = conn.zscore('delay:', row_id)
        if delay <= 0:
            # 不必再缓存这个行,将它从缓存中移除。
            conn.zrem('delay:', row_id) 
            conn.zrem('schedule:', row_id)
            conn.delete('inv:' + row_id)
            continue

        # 读取数据行。
        row = Inventory.get(row_id)
        # 更新调度时间并设置缓存值。
        conn.zadd('schedule:', row_id, now + delay)         
        conn.set('inv:' + row_id, json.dumps(row.to_dict())) 
# <end id="_1311_14471_8292"/>


# 代码清单 2-9
# <start id="_1311_14471_8298"/>
def update_token(conn, token, user, item=None):
    timestamp = time.time()
    conn.hset('login:', token, user)
    conn.zadd('recent:', token, timestamp)
    if item:
        conn.zadd('viewed:' + token, item, timestamp)
        conn.zremrangebyrank('viewed:' + token, 0, -26)
        conn.zincrby('viewed:', item, -1)                   # 这行代码是新添加的。
# <end id="_1311_14471_8298"/>


# 代码清单 2-10
# <start id="_1311_14471_8288"/>
def rescale_viewed(conn):
    while not QUIT:
        # 删除所有排名在20 000名之后的商品。
        conn.zremrangebyrank('viewed:', 20000, -1)
        # 将浏览次数降低为原来的一半
        conn.zinterstore('viewed:', {'viewed:': .5}) 
        # 5分钟之后再执行这个操作。
        time.sleep(300) 
# <end id="_1311_14471_8288"/>


# 代码清单 2-11
# <start id="_1311_14471_8289"/>
def can_cache(conn, request):
    # 尝试从页面里面取出商品ID。
    item_id = extract_item_id(request)
    # 检查这个页面能否被缓存以及这个页面是否为商品页面。
    if not item_id or is_dynamic(request):
        return False
    # 取得商品的浏览次数排名。
    rank = conn.zrank('viewed:', item_id)
    # 根据商品的浏览次数排名来判断是否需要缓存这个页面。
    return rank is not None and rank < 10000 
# <end id="_1311_14471_8289"/>


#--------------- 以下是用于测试代码的辅助函数 --------------------------------

def extract_item_id(request):
    parsed = urlparse.urlparse(request)
    query = urlparse.parse_qs(parsed.query)
    return (query.get('item') or [None])[0]

def is_dynamic(request):
    parsed = urlparse.urlparse(request)
    query = urlparse.parse_qs(parsed.query)
    return '_' in query

def hash_request(request):
    return str(hash(request))

class Inventory(object):
    def __init__(self, id):
        self.id = id

    @classmethod
    def get(cls, id):
        return Inventory(id)

    def to_dict(self):
        return {'id':self.id, 'data':'data to cache...', 'cached':time.time()}

class TestCh02(unittest.TestCase):
    def setUp(self):
        import redis
        self.conn = redis.Redis(db=15)

    def tearDown(self):
        conn = self.conn
        to_del = (
            conn.keys('login:*') + conn.keys('recent:*') + conn.keys('viewed:*') +
            conn.keys('cart:*') + conn.keys('cache:*') + conn.keys('delay:*') + 
            conn.keys('schedule:*') + conn.keys('inv:*'))
        if to_del:
            self.conn.delete(*to_del)
        del self.conn
        global QUIT, LIMIT
        QUIT = False
        LIMIT = 10000000
        print
        print

    def test_login_cookies(self):
        conn = self.conn
        global LIMIT, QUIT
        token = str(uuid.uuid4())

        update_token(conn, token, 'username', 'itemX')
        print "We just logged-in/updated token:", token
        print "For user:", 'username'
        print

        print "What username do we get when we look-up that token?"
        r = check_token(conn, token)
        print r
        print
        self.assertTrue(r)


        print "Let's drop the maximum number of cookies to 0 to clean them out"
        print "We will start a thread to do the cleaning, while we stop it later"

        LIMIT = 0
        t = threading.Thread(target=clean_sessions, args=(conn,))
        t.setDaemon(1) # to make sure it dies if we ctrl+C quit
        t.start()
        time.sleep(1)
        QUIT = True
        time.sleep(2)
        if t.isAlive():
            raise Exception("The clean sessions thread is still alive?!?")

        s = conn.hlen('login:')
        print "The current number of sessions still available is:", s
        self.assertFalse(s)

    def test_shoppping_cart_cookies(self):
        conn = self.conn
        global LIMIT, QUIT
        token = str(uuid.uuid4())

        print "We'll refresh our session..."
        update_token(conn, token, 'username', 'itemX')
        print "And add an item to the shopping cart"
        add_to_cart(conn, token, "itemY", 3)
        r = conn.hgetall('cart:' + token)
        print "Our shopping cart currently has:", r
        print

        self.assertTrue(len(r) >= 1)

        print "Let's clean out our sessions and carts"
        LIMIT = 0
        t = threading.Thread(target=clean_full_sessions, args=(conn,))
        t.setDaemon(1) # to make sure it dies if we ctrl+C quit
        t.start()
        time.sleep(1)
        QUIT = True
        time.sleep(2)
        if t.isAlive():
            raise Exception("The clean sessions thread is still alive?!?")

        r = conn.hgetall('cart:' + token)
        print "Our shopping cart now contains:", r

        self.assertFalse(r)

    def test_cache_request(self):
        conn = self.conn
        token = str(uuid.uuid4())

        def callback(request):
            return "content for " + request

        update_token(conn, token, 'username', 'itemX')
        url = 'http://test.com/?item=itemX'
        print "We are going to cache a simple request against", url
        result = cache_request(conn, url, callback)
        print "We got initial content:", repr(result)
        print

        self.assertTrue(result)

        print "To test that we've cached the request, we'll pass a bad callback"
        result2 = cache_request(conn, url, None)
        print "We ended up getting the same response!", repr(result2)

        self.assertEquals(result, result2)

        self.assertFalse(can_cache(conn, 'http://test.com/'))
        self.assertFalse(can_cache(conn, 'http://test.com/?item=itemX&_=1234536'))

    def test_cache_rows(self):
        import pprint
        conn = self.conn
        global QUIT
        
        print "First, let's schedule caching of itemX every 5 seconds"
        schedule_row_cache(conn, 'itemX', 5)
        print "Our schedule looks like:"
        s = conn.zrange('schedule:', 0, -1, withscores=True)
        pprint.pprint(s)
        self.assertTrue(s)

        print "We'll start a caching thread that will cache the data..."
        t = threading.Thread(target=cache_rows, args=(conn,))
        t.setDaemon(1)
        t.start()

        time.sleep(1)
        print "Our cached data looks like:"
        r = conn.get('inv:itemX')
        print repr(r)
        self.assertTrue(r)
        print
        print "We'll check again in 5 seconds..."
        time.sleep(5)
        print "Notice that the data has changed..."
        r2 = conn.get('inv:itemX')
        print repr(r2)
        print
        self.assertTrue(r2)
        self.assertTrue(r != r2)

        print "Let's force un-caching"
        schedule_row_cache(conn, 'itemX', -1)
        time.sleep(1)
        r = conn.get('inv:itemX')
        print "The cache was cleared?", not r
        print
        self.assertFalse(r)

        QUIT = True
        time.sleep(2)
        if t.isAlive():
            raise Exception("The database caching thread is still alive?!?")

    # We aren't going to bother with the top 10k requests are cached, as
    # we already tested it as part of the cached requests test.

if __name__ == '__main__':
    unittest.main()