import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.*;
/**
* Redis实战 - Chapter 02
* 使用 Redis 构建 Web 应用
*/
public class Chapter02 {
public static void main(String[] args) throws InterruptedException {
new Chapter02().run();
}
public void run() throws InterruptedException {
Jedis conn = new Jedis("localhost");
conn.select(15);
testLoginCookies(conn);
testShopppingCartCookies(conn);
testCacheRows(conn);
testCacheRequest(conn);
}
/**
* updateToken - checkToken - CleanSessionsThread 验证
* @param conn -
* @throws InterruptedException -
*/
public void testLoginCookies(Jedis conn) throws InterruptedException {
System.out.println("\n----- testLoginCookies -----");
String token = UUID.randomUUID().toString();
// 更新令牌
updateToken(conn, token, "username", "itemX");
System.out.println("We just logged-in/updated token: " + token);
System.out.println("For user: 'username'");
System.out.println();
// 验证令牌,返回用户名/Id
System.out.println("What username do we get when we look-up that token?");
String r = checkToken(conn, token);
System.out.println(r);
System.out.println();
assert r != null;
System.out.println("Let's drop the maximum number of cookies to 0 to clean them out");
System.out.println("We will start a thread to do the cleaning, while we stop it later");
// 清空所有会话信息
CleanSessionsThread thread = new CleanSessionsThread(0);
thread.start();
Thread.sleep(1000);
thread.quit();
Thread.sleep(2000);
if (thread.isAlive()){
throw new RuntimeException("The clean sessions thread is still alive?!?");
}
long s = conn.hlen("login:");
System.out.println("The current number of sessions still available is: " + s);
assert s == 0;
}
/**
* updateToken - addToCart - CleanFullSessionsThread 验证
* @param conn -
* @throws InterruptedException -
*/
public void testShopppingCartCookies(Jedis conn) throws InterruptedException {
System.out.println("\n----- testShopppingCartCookies -----");
String token = UUID.randomUUID().toString();
// 更新令牌,更新最近用户登录列表,更新该用户浏览商品信息列表,更新某商品浏览量
System.out.println("We'll refresh our session...");
updateToken(conn, token, "username", "itemX");
// 添加3件 itemY 到购物车
System.out.println("And add an item to the shopping cart");
addToCart(conn, token, "itemY", 3);
Map<String,String> r = conn.hgetAll("cart:" + token);
System.out.println("Our shopping cart currently has:");
for (Map.Entry<String,String> entry : r.entrySet()){
// 打印商品 id & count
System.out.println(" " + entry.getKey() + ": " + entry.getValue());
}
System.out.println();
assert r.size() >= 1;
// 清空会话信息
System.out.println("Let's clean out our sessions and carts");
CleanFullSessionsThread thread = new CleanFullSessionsThread(0);
thread.start();
Thread.sleep(1000);
thread.quit();
Thread.sleep(2000);
if (thread.isAlive()){
throw new RuntimeException("The clean sessions thread is still alive?!?");
}
r = conn.hgetAll("cart:" + token);
System.out.println("Our shopping cart now contains:");
for (Map.Entry<String,String> entry : r.entrySet()){
System.out.println(" " + entry.getKey() + ": " + entry.getValue());
}
assert r.size() == 0;
}
/**
* scheduleRowCache - CacheRowsThread 验证
* @param conn -
* @throws InterruptedException -
*/
public void testCacheRows(Jedis conn) throws InterruptedException {
System.out.println("\n----- testCacheRows -----");
// 设置 itemX 需要每 5 秒刷新一次
System.out.println("First, let's schedule caching of itemX every 5 seconds");
scheduleRowCache(conn, "itemX", 5);
System.out.println("Our schedule looks like:");
Set<Tuple> s = conn.zrangeWithScores("schedule:", 0, -1);
for (Tuple tuple : s){
System.out.println(" " + tuple.getElement() + ", " + tuple.getScore());
}
assert s.size() != 0;
// 定期缓存 itemX 的商品信息
System.out.println("We'll start a caching thread that will cache the data...");
CacheRowsThread thread = new CacheRowsThread();
thread.start();
Thread.sleep(1000);
System.out.println("Our cached data looks like:");
String r = conn.get("inv:itemX"); // 打印 itemX 详情信息
System.out.println(r);
assert r != null;
System.out.println();
System.out.println("We'll check again in 5 seconds...");
Thread.sleep(5000);
System.out.println("Notice that the data has changed...");
String r2 = conn.get("inv:itemX");
System.out.println(r2);
System.out.println();
assert r2 != null;
assert !r.equals(r2); // 两次 itemX 详细信息的 time 肯定不同
// delay 设置为 -1,表示之后不再缓存该数据,清空对应 schedule & delay 关于该商品的信息
System.out.println("Let's force un-caching");
scheduleRowCache(conn, "itemX", -1);
Thread.sleep(1000);
r = conn.get("inv:itemX");
System.out.println("The cache was cleared? " + (r == null));
assert r == null;
thread.quit();
Thread.sleep(2000);
if (thread.isAlive()){
throw new RuntimeException("The database caching thread is still alive?!?");
}
}
/**
* callback - updateToken - cacheRequest - canCache 验证
* @param conn -
*/
public void testCacheRequest(Jedis conn) {
System.out.println("\n----- testCacheRequest -----");
String token = UUID.randomUUID().toString();
// 打印 request 信息
Callback callback = new Callback(){
@Override
public String call(String request){
return "content for " + request;
}
};
// 更新令牌,更新最近用户登录列表,更新该用户浏览商品信息列表,更新某商品浏览量
updateToken(conn, token, "username", "itemX");
String url = "http://test.com/?item=itemX";
System.out.println("We are going to cache a simple request against " + url);
// 存储 url 请求信息
String result = cacheRequest(conn, url, callback);
System.out.println("We got initial content:\n" + result);
System.out.println();
assert result != null;
// 即使 callback == null 但是之前缓存过该request,所以仍会返回之前缓存的数据
System.out.println("To test that we've cached the request, we'll pass a bad callback");
String result2 = cacheRequest(conn, url, null);
System.out.println("We ended up getting the same response!\n" + result2);
assert result.equals(result2);
// url 中没有 item关键字,验证不通过
assert !canCache(conn, "http://test.com/");
// url 中有 '_',验证不通过
assert !canCache(conn, "http://test.com/?item=itemX&_=1234536");
}
/**
* 验证令牌
* @param conn - redis
* @param token -
* @return - userId
*/
public String checkToken(Jedis conn, String token) {
return conn.hget("login:", token);
}
/**
* 更新令牌,更新最近用户登录,更新该用户浏览商品信息,更新某商品浏览量
* hash - obj[login:] key[token] val[userId] -- 存储登录对象,存放不同令牌对应下的用户名
* zset - key[recent:] score[timestamp] val[token] -- 存储排行信息,最近用户登录令牌
* zset - key[viewed:token] socre[timestamp] val[item] -- 排行,存储某位用户最近浏览商品,并进行修剪,保留25条记录
* zset - key[viewed:] score[-1] val[item] -- 排行,记录所有用户对该商品的浏览量,浏览量-1,保证浏览量越少,排名越前
* @param conn - redis
* @param token -
* @param user - userId
* @param item - 商品
*/
public void updateToken(Jedis conn, String token, String user, String item) {
long timestamp = System.currentTimeMillis() / 1000;
conn.hset("login:", token, user);
conn.zadd("recent:", timestamp, token);
if (item != null) {
conn.zadd("viewed:" + token, timestamp, item);
conn.zremrangeByRank("viewed:" + token, 0, -26);
conn.zincrby("viewed:", -1, item);
}
}
/**
* 加入购物车
* hash - obj[cart:token] key[item] val[count] -- 存储某位用户购物车中某件商品的数量
* @param conn -
* @param session - token
* @param item - 商品ID
* @param count - 商品数量
*/
public void addToCart(Jedis conn, String session, String item, int count) {
if (count <= 0) {
conn.hdel("cart:" + session, item);
} else {
conn.hset("cart:" + session, item, String.valueOf(count));
}
}
/**
* 定时缓存数据行
* zset - key[delay:] score[delay] val[rowId] -- 排行,存储某件商品的定时刷新间隔时间
* zset - key[schedule:] score[current] val[rowId] -- 排行,存储某件商品加入定时时间
* @param conn -
* @param rowId -
* @param delay -
*/
public void scheduleRowCache(Jedis conn, String rowId, int delay) {
conn.zadd("delay:", delay, rowId);
conn.zadd("schedule:", System.currentTimeMillis() / 1000, rowId);
}
/**
* 缓存 排行前10000的商品 的 url 请求信息
* set - key[cache:hashcode] expire[300] val[content] -- 存储请求信息
* @param conn -
* @param request - url
* @param callback -
* @return - 请求内容
*/
public String cacheRequest(Jedis conn, String request, Callback callback) {
if (!canCache(conn, request)){
return callback != null ? callback.call(request) : null;
}
// url 请求下的商品需要被缓存
String pageKey = "cache:" + hashRequest(request);
String content = conn.get(pageKey);
// 该 url 类型的请求之前没有缓存过
if (content == null && callback != null){
content = callback.call(request);
conn.setex(pageKey, 300, content);
}
return content;
}
/**
* 只有排名前 10000 的商品才需要缓存
* @param conn -
* @param request - url
* @return - boolean
*/
public boolean canCache(Jedis conn, String request) {
try {
URL url = new URL(request);
HashMap<String,String> params = new HashMap<String,String>(16);
if (url.getQuery() != null){
for (String param : url.getQuery().split("&")){
String[] pair = param.split("=", 2); // 分割成两份
params.put(pair[0], pair.length == 2 ? pair[1] : null);
}
}
// 获取商品id,不能为空且 url 不能包含 '_'
String itemId = extractItemId(params);
if (itemId == null || isDynamic(params)) {
return false;
}
// 获取当前商品排名
Long rank = conn.zrank("viewed:", itemId);
return rank != null && rank < 10000;
}catch(MalformedURLException mue){
return false;
}
}
/**
* 判断 url 请求是否包含 '_'
* @param params - map
* @return - boolean
*/
public boolean isDynamic(Map<String,String> params) {
return params.containsKey("_");
}
public String extractItemId(Map<String,String> params) {
return params.get("item");
}
public String hashRequest(String request) {
return String.valueOf(request.hashCode());
}
/**
* 接口类,提供回调方法
*/
public interface Callback {
public String call(String request);
}
/**
* 清理超过 limit 数量的旧的用户信息
*/
public static class CleanSessionsThread extends Thread {
private final Jedis conn;
private final int limit;
private boolean quit;
public CleanSessionsThread(int limit) {
this.conn = new Jedis("localhost");
this.conn.select(15);
this.limit = limit;
}
public void quit() {
quit = true;
}
@Override
public void run() {
// quit == false 成立
while (!quit) {
long size = conn.zcard("recent:");
if (size <= limit){
try {
sleep(1000);
}catch(InterruptedException ie){
Thread.currentThread().interrupt();
}
continue;
}
long endIndex = Math.min(size - limit, 100);
Set<String> tokenSet = conn.zrange("recent:", 0, endIndex - 1);
String[] tokens = tokenSet.toArray(new String[0]);
ArrayList<String> sessionKeys = new ArrayList<String>();
for (String token : tokens) {
sessionKeys.add("viewed:" + token);
}
// 删除 zset - key[viewed:token] socre[timestamp] val[item] -- 排行,存储某位用户最近浏览商品
conn.del(sessionKeys.toArray(new String[0]));
// 删除 hash - obj[login:] key[token] val[userId] -- 存储登录对象,存放不同令牌对应下的用户名
conn.hdel("login:", tokens);
// 删除 zset - key[recent:] score[timestamp] val[token] -- 存储排行信息,最近用户登录令牌
conn.zrem("recent:", tokens);
}
}
}
public static class CleanFullSessionsThread extends Thread {
private final Jedis conn;
private final int limit;
private boolean quit;
public CleanFullSessionsThread(int limit) {
this.conn = new Jedis("localhost");
this.conn.select(15);
this.limit = limit;
}
public void quit() {
quit = true;
}
@Override
public void run() {
while (!quit) {
long size = conn.zcard("recent:");
if (size <= limit){
try {
sleep(1000);
}catch(InterruptedException ie){
Thread.currentThread().interrupt();
}
continue;
}
long endIndex = Math.min(size - limit, 100);
Set<String> sessionSet = conn.zrange("recent:", 0, endIndex - 1);
String[] sessions = sessionSet.toArray(new String[0]);
ArrayList<String> sessionKeys = new ArrayList<String>();
for (String sess : sessions) {
sessionKeys.add("viewed:" + sess);
sessionKeys.add("cart:" + sess);
}
conn.del(sessionKeys.toArray(new String[0]));
conn.hdel("login:", sessions);
conn.zrem("recent:", sessions);
}
}
}
/**
* 定期缓存需要缓存的商品信息
* set - key[inv:rowId] val[jsonObj[rowId]] -- 存储某件商品的详情信息
*/
public static class CacheRowsThread extends Thread {
private final Jedis conn;
private boolean quit;
public CacheRowsThread() {
this.conn = new Jedis("localhost");
this.conn.select(15);
}
public void quit() {
quit = true;
}
@Override
public void run() {
Gson gson = new Gson();
while (!quit){
Set<Tuple> range = conn.zrangeWithScores("schedule:", 0, 0);
Tuple next = range.size() > 0 ? range.iterator().next() : null;
long now = System.currentTimeMillis() / 1000;
// 未到缓存当前最应该缓存商品的缓存时间 - 等待
if (next == null || next.getScore() > now){
try {
sleep(50);
}catch(InterruptedException ie){
Thread.currentThread().interrupt();
}
continue;
}
// 获取要更新商品的ID
String rowId = next.getElement();
// 判断是否要更新
double delay = conn.zscore("delay:", rowId);
// 不更新则删除
if (delay <= 0) {
conn.zrem("delay:", rowId);
conn.zrem("schedule:", rowId);
conn.del("inv:" + rowId);
// 跳出当前循环
continue;
}
// 获取商品详情,并加到 redis中
Inventory row = Inventory.get(rowId);
conn.zadd("schedule:", now + delay, rowId);
conn.set("inv:" + rowId, gson.toJson(row));
}
}
}
/**
* 根据商品ID返回商品详情,包括 ID,data 和 time
* 每次调用此函数获取的某商品 time 肯定不同,因此返回的两次结果也定不相等
*/
public static class Inventory {
private String id;
private String data;
private long time;
private Inventory (String id) {
this.id = id;
this.data = "data to cache...";
this.time = System.currentTimeMillis() / 1000;
}
public static Inventory get(String id) {
return new Inventory(id);
}
}
}