文章目录
Spring - 数据库读写分离
一般情况下应用程序对数据库操作都是读多写少,造成数据库读取数据时压力比较大,那么读写分离、数据库集群等解决方案就出现了。一个作为负责写数据的主库,称为写库,其它都作为负责读取数据的从库,称之为读库。与此同时我们需要保证以下几点:
- 读库与写库数据一致,可通过主从同步实现。
- 写数据只能通过写库(主库),读数据只能通过读库(从库)。
1.读写分离解决方案
1.1 应用层解决
第一种方式是从应用层去实现读写分离的策略,通过程序去控制对应操作数据源。
优点:
- 多数据源更灵活,由程序完成;
- 不需要引入中间件,减少学习和搭建成本;
- 理论上支持任何数据库;
缺点:
- 由开发独立完成,运维不可管控;
- 无法动态变更数据源;
![]()
1.2 中间件解决
第二种方式是利用中间件去实现读写分离的策略,通过中间件代理去分配操作数据源。
优点:
- 程序无法任何改动,开发无感知;
- 动态变更数据源不需要重启程序;
缺点:
- 由于依赖于中间件,会导致切换数据库困难;
- 由于利用中间件做代理,故性能有所下降;
![]()
2.Mysql主从配置
2.1 Mysql主从复制原理
我们在使用读写分离的前提下需要保证主从数据的一致性,所以主从数据同步是读写分离的前提。这里我们先来了解一下Mysql主从复制的原理,这里借用网上一张图片可以让我们更容易明白。
- master将数据改变记录到二进制日志(binarylog)中,也即是配置文件log-bin指定的文件(这些记录叫做二进制日志事件,binary log events)
- slave将master的binary logevents拷贝到它的中继日志(relay log)
- slave重做中继日志中的事件,将改变反映它自己的数据(数据重演)
2.2 Master主库配置
首先我们需要打开mysql目录下的
my.cnf
配置文件,在[mysqld]
下添加或修改以下配置#指定主库serverid server-id=1 #打开二进制日志 log-bin= master-bin #指定同步的数据库,若不指定则同步全部数据库(根据具体需求而定) binlog-do-db=my_test_db
配置完成保存后,重启mysql服务,进入mysql执行
show master status
命令,可以看到以下信息,主要。
记录好以上File
以及Position
信息后,我们在主库上创建一个用于数据同步的用户。create user u_replication identified by 'replication123456'; grant replication slave on *.* to 'u_replication'@'%'; flush privileges;
2.3 Slave从库配置
同样我们打开从库机器上mysql目录下的
my.cnf
,在[mysqld]
下添加或修改以下配置。#指定从库serverid,不可重复 server-id=2
配置完成后,登录mysql执行以下命令
change master to master_host='127.0.0.1',//主库服务器ip master_user='u_replication ',//主库数据同步用户 master_password='replication123456', master_port=3306, master_log_file='master-bin.000002',//File master_log_pos=5891;//Position
执行成功后,执行以下命令启动slave同步并可查看状态
# 启动slave同步 start slave; # 查看同步状态 show slave status \G;
图中两项若都为Yes,则表明主从配置启动成功。在以上过程中可能由于环境不同造成一些问题,最基本的是要保证mysql主从服务的数据库版本保持一致以及server_id唯一,另外比较常见的错误是由于复制的UUID重复造成的。
Fatal error:The slave I/O thread stops because master and slave have equal MySQL server UUIDS; these UUIDs must be different for replication to work.
出现以上问题可以尝试通过以下几种方案解决:
- 查看server_id是否相同
show variables like 'server_id';
- 查看auto.cnf文件
show variables like 'server_uuid';
cat /var/lib/mysql/auto.cnf
以下命令在从服务器执行(可通过find / -name "auto.cnf"查找文件)
mv /var/lib/mysql/auto.cnf /var/lib/mysql/auto.cnf.bk
systemctl restart mysql
以上完成之后即可验证主从同步是否成功。
3.应用层实现读写分离
应用层实现读写分离的方式有很多种,这里主要介绍一下我所了解的三种:多数据源配置、自定义Mybatis插件、AOP动态切换。这里先来介绍下这三种方案的数据源变更策略。
- 多数据源配置:将读库写库作为多种数据源看待,然后分别映射不同文件操作数据库。
- 自定义Mybatis插件:根据
事务策略
+SQL命令类型
变更数据源。- AOP动态切换:根据
事务策略
+操作方法名前缀
变更数据源。
3.1 多数据源配置
这种方式从编码层面难度比较低,只需要在mybatis配置文件中将主库和从库作为两个数据源去负责映射不同目录下的mapper文件即可。这种方式比较明显的一个缺点就是,同一个实体类操作
mapper
和对应sql的xml
会有两份,分别是读和写操作,如果这些文件是由mybatis-generator
工具生成则需要自己手动拆分开,比较繁琐。
spring-mybatis.xml
<bean id="dataSourceWrite" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
<!-- 基本属性driverClassName、 url、user、password -->
<property name="driverClassName" value="${db.driver}" />
<property name="url" value="${db.master.url}" />
<property name="username" value="${db.master.username}" />
<property name="password" value="${db.master.password}" />
<!-- 配置初始化大小、最小、最大 -->
<!-- 通常来说,只需要修改initialSize、minIdle、maxActive -->
<property name="initialSize" value="${db.master.initialSize}" />
<property name="minIdle" value="${db.master.minIdle}" />
<property name="maxActive" value="${db.master.maxActive}" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="${db.master.maxWait}" />
<!-- 默认值是 true ,当从连接池取连接时,验证这个连接是否有效 -->
<property name="testOnBorrow" value="true" />
<!-- 指明连接是否被空闲连接回收器(如果有)进行检验.如果检测失败,则连接将被从池中去除. 注意: 设置为true后如果要生效,validationQuery参数必须设置为非空字符串 -->
<property name="testWhileIdle" value="true" />
<!-- 默认值是 flase, 当从把该连接放回到连接池的时,验证这个连接是否有效 -->
<property name="testOnReturn" value="false" />
<!--用来验证从连接池取出的连接,在将连接返回给调用者之前.如果指定,则查询必须是一个SQL SELECT并且必须返回至少一行记录-->
<property name="validationQuery" value="SELECT 'x'" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="30000" />
<property name="removeAbandoned" value="true" />
<property name="removeAbandonedTimeout" value="180" />
<!-- 关闭abanded连接时输出错误日志 -->
<property name="logAbandoned" value="${db.master.logAbandoned}" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 解密密码必须要配置的项 -->
<!-- <property name="filters" value="config" /> <property name="connectionProperties" value="config.decrypt=true" /> -->
</bean>
<bean id="dataSourceRead" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
<!-- 基本属性driverClassName、 url、user、password -->
<property name="driverClassName" value="${db.driver}" />
<property name="url" value="${db.slave.url}" />
<property name="username" value="${db.slave.username}" />
<property name="password" value="${db.slave.password}" />
<!-- 配置初始化大小、最小、最大 -->
<!-- 通常来说,只需要修改initialSize、minIdle、maxActive -->
<property name="initialSize" value="${db.slave.initialSize}" />
<property name="minIdle" value="${db.slave.minIdle}" />
<property name="maxActive" value="${db.slave.maxActive}" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="${db.slave.maxWait}" />
<!-- 默认值是 true ,当从连接池取连接时,验证这个连接是否有效 -->
<property name="testOnBorrow" value="true" />
<!-- 指明连接是否被空闲连接回收器(如果有)进行检验.如果检测失败,则连接将被从池中去除. 注意: 设置为true后如果要生效,validationQuery参数必须设置为非空字符串 -->
<property name="testWhileIdle" value="true" />
<!-- 默认值是 flase, 当从把该连接放回到连接池的时,验证这个连接是否有效 -->
<property name="testOnReturn" value="false" />
<!--用来验证从连接池取出的连接,在将连接返回给调用者之前.如果指定,则查询必须是一个SQL SELECT并且必须返回至少一行记录-->
<property name="validationQuery" value="SELECT 1 " />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="30000" />
<!-- 超过时间限制是否回收 -->
<property name="removeAbandoned" value="true" />
<!-- 超时时间;单位为秒。180秒=3分钟 -->
<property name="removeAbandonedTimeout" value="180" />
<!-- 关闭abanded连接时输出错误日志 -->
<property name="logAbandoned" value="${db.slave.logAbandoned}" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 解密密码必须要配置的项 -->
<!-- <property name="filters" value="config" /> <property name="connectionProperties" value="config.decrypt=true" /> -->
</bean>
<!-- spring和MyBatis完美整合,不需要mybatis的配置映射文件 -->
<bean id="sqlSessionFactoryWrite" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSourceWrite" />
<!-- 自动扫描mapping.xml文件 -->
<property name="mapperLocations" value="classpath:mapper/write/*.xml"></property>
</bean>
<!-- spring和MyBatis完美整合,不需要mybatis的配置映射文件 -->
<bean id="sqlSessionFactoryRead" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSourceRead" />
<!-- 自动扫描mapping.xml文件 -->
<property name="mapperLocations" value="classpath:mapper/read/*.xml"></property>
</bean>
<!-- DAO接口所在包名,Spring会自动查找其下的类 -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.ithzk.rws.dao.write" />
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryWrite"></property>
</bean>
<!-- DAO接口所在包名,Spring会自动查找其下的类 -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.ithzk.rws.dao.read" />
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryRead"></property>
</bean>
<!-- (事务管理)transaction manager, use JtaTransactionManager for global tx -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSourceRead" />
</bean>
<!-- 配置事务属性 -->
<tx:advice id="txAdvice" transaction-manager="transactionManager" >
<tx:attributes>
<tx:method name="save*" propagation="REQUIRED" />
<tx:method name="update*" propagation="REQUIRED" />
<tx:method name="remove*" propagation="REQUIRED" />
<tx:method name="get*" read-only="true" />
<tx:method name="list*" read-only="true" />
<tx:method name="count*" read-only="true" />
</tx:attributes>
</tx:advice>
<!-- 配置事务的切点,并把事务切点和事务属性不关联起来 -->
<aop:config >
<aop:pointcut expression="execution(* com.ithzk.rws..service.impl.*.*(..))" id="txPointCut" />
<aop:advisor advice-ref="txAdvice" pointcut-ref="txPointCut" order="2"/>
</aop:config>
3.2 自定义Mybaits插件
自定义插件的方式主要是通过
Mybatis
框架提供的Interceptor
***去动态控制操作的目标数据源,在设计这种解决方案前我们需要了解Mybatis
框架提供的几个类以及其中的属性:AbstractRoutingDataSource
、LazyConnectionDataSourceProxy
。看过相关源码的同学应该很容易就发现其实不管是多数据源的配置还是动态切换本质都和targetDataSources
这个集合有关,其实本身Mybatis就会根据情况去使用不用的目标数据源,从而达到想要的效果。
这里由于涉及数据库配置,顺便带过一下加解密配置相关工具类。
DesUtils.java
package com.ithzk.rws.utils;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.DESKeySpec;
import java.security.SecureRandom;
/** * @author hzk * @date 2019/3/26 */
public class DesUtils {
private static final String DES = "DES";
private static final String KEY = "4YztMHI7PsT4rLZN";
// private static final String KEY = "rws";
private DesUtils() {}
private static byte[] encrypt(byte[] src, byte[] key) throws Exception {
SecureRandom sr = new SecureRandom();
DESKeySpec dks = new DESKeySpec(key);
SecretKeyFactory keyFactory = SecretKeyFactory.getInstance(DES);
SecretKey secretKey = keyFactory.generateSecret(dks);
Cipher cipher = Cipher.getInstance(DES);
cipher.init(Cipher.ENCRYPT_MODE, secretKey, sr);
return cipher.doFinal(src);
}
private static byte[] decrypt(byte[] src, byte[] key) throws Exception {
SecureRandom sr = new SecureRandom();
DESKeySpec dks = new DESKeySpec(key);
SecretKeyFactory keyFactory = SecretKeyFactory.getInstance(DES);
SecretKey secretKey = keyFactory.generateSecret(dks);
Cipher cipher = Cipher.getInstance(DES);
cipher.init(Cipher.DECRYPT_MODE, secretKey, sr);
return cipher.doFinal(src);
}
private static String byte2hex(byte[] b) {
String hs = "";
String temp = "";
for (int n = 0; n < b.length; n++) {
temp = (java.lang.Integer.toHexString(b[n] & 0XFF));
if (temp.length() == 1){
hs = hs + "0" + temp;
}
else{
hs = hs + temp;
}
}
return hs.toUpperCase();
}
private static byte[] hex2byte(byte[] b) {
if ((b.length % 2) != 0)
throw new IllegalArgumentException("length not even");
byte[] b2 = new byte[b.length / 2];
for (int n = 0; n < b.length; n += 2) {
String item = new String(b, n, 2);
b2[n / 2] = (byte) Integer.parseInt(item, 16);
}
return b2;
}
private static String decode(String src, String key) {
String decryptStr = "";
try {
byte[] decrypt = decrypt(hex2byte(src.getBytes()), key.getBytes());
decryptStr = new String(decrypt);
} catch (Exception e) {
e.printStackTrace();
}
return decryptStr;
}
private static String encode(String src, String key){
byte[] bytes = null;
String encryptStr = "";
try {
bytes = encrypt(src.getBytes(), key.getBytes());
} catch (Exception ex) {
ex.printStackTrace();
}
if (bytes != null)
encryptStr = byte2hex(bytes);
return encryptStr;
}
/** * 解密 */
public static String decode(String src) {
return decode(src, KEY);
}
/** * 加密 */
public static String encode(String src) {
return encode(src, KEY);
}
public static void main(String[] args){
System.out.println(encode("root"));
System.out.println(encode("123"));
}
}
EncryptPropertyPlaceholderConfigurer.java
package com.ithzk.rws.utils;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;
/** * @author hzk * @date 2019/3/26 */
public class EncryptPropertyPlaceholderConfigurer extends PropertyPlaceholderConfigurer{
private String[] encryptPropNames = {"db.master.username","db.slave.username","db.master.password","db.slave.password"};
@Override
protected String convertProperty(String propertyName, String propertyValue) {
if(isEncryptProp(propertyName)){
return DesUtils.decode(propertyValue);
}else{
return propertyValue;
}
}
private boolean isEncryptProp(String propertyName){
for (String encryptPropName:encryptPropNames) {
if(encryptPropName.equals(propertyName)){
return true;
}
}
return false;
}
}
我们需要实现动态切换数据源,需要通过
AbstractRoutingDataSource
类来改变当前操作路由键,由于我们需要考虑到线程安全所以在操作路由键的时候需要通过ThreadLocal
保证线程安全。
DynamicDataSourceHolder.java
package com.ithzk.rws.utils.dynamic;
/** * @author hzk * @date 2019/3/26 */
public class DynamicDataSourceHolder {
public static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
public static final String DB_MASTER = "master";
public static final String DB_SLAVE = "slave";
/** * 获取路由Key * @return */
public static String getRouteKey(){
String routeKey = contextHolder.get();
if(null == routeKey){
routeKey = DB_MASTER;
}
return routeKey;
}
/** * 设置路由Key */
public static void setRouteKey(String routeKey){
contextHolder.set(routeKey);
System.out.println("切换到数据源:"+routeKey);
}
}
DynamicDataSource.java
package com.ithzk.rws.utils.dynamic;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
/** * 通过AbstractRoutingDataSource实现动态切换数据源,需重写determineCurrentLookupKey方法 * 由于DynamicDataSource是单例的,线程不安全的,所以采用ThreadLocal保证线程安全,由DynamicDataSourceHolder完成。 * @author hzk * @date 2019/3/26 */
public class DynamicDataSource extends AbstractRoutingDataSource {
/** * 在spring容器中查询对应key来应用为数据源 * @return */
@Override
protected Object determineCurrentLookupKey() {
return DynamicDataSourceHolder.getRouteKey();
}
}
上面这些类已经可以让我们自定义去设置所需操作的目标数据源,这里我们只需在最后一步提供一个目标数据源设置插件即可达到我们要实现的读写分离的目的。这里我们需要依赖Mybatis提供的
Interceptor
***去对SQL操作进行拦截处理。根据相对应的SQL操作类型改变目标数据源。
DynamicDataSourcePlugin.java
package com.ithzk.rws.utils.dynamic;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.executor.keygen.SelectKeyGenerator;
import org.apache.ibatis.executor.resultset.ResultSetHandler;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.util.Properties;
/** * 读写分离路由插件 * @author hzk * @date 2019/3/26 */
@Intercepts(
//update 增删改 query 查询
{@Signature(type = Executor.class,method = "update",args = {MappedStatement.class,Object.class}),
@Signature(type = Executor.class,method = "query",args = {MappedStatement.class,Object.class, RowBounds.class, ResultHandler.class})
}
)
public class DynamicDataSourcePlugin implements Interceptor {
@Override
public Object intercept(Invocation invocation) throws Throwable {
//判断操作是否存在事务
boolean active = TransactionSynchronizationManager.isActualTransactionActive();
//默认让routeKey为MASTER
String routeKey = DynamicDataSourceHolder.DB_MASTER;
//第一个参数为MappedStatement对象,第二参数为传入的参数
Object[] args = invocation.getArgs();
MappedStatement mappedStatement = (MappedStatement) args[0];
if(active){
//带事务操作操作主库
routeKey = DynamicDataSourceHolder.DB_MASTER;
}else{
//判断读方法
if(mappedStatement.getSqlCommandType().equals(SqlCommandType.SELECT)){
//如果使用select_last_insert_id函数
if(mappedStatement.getId().contains(SelectKeyGenerator.SELECT_KEY_SUFFIX)){
routeKey = DynamicDataSourceHolder.DB_MASTER;
}else{
routeKey = DynamicDataSourceHolder.DB_SLAVE;
}
}
}
//设置确定的路由Key
DynamicDataSourceHolder.setRouteKey(routeKey);
System.out.println("使用["+invocation.getMethod().getName()+"]方法,使用["+routeKey+"]策略,执行的SQL命令["+mappedStatement.getSqlCommandType().name()+"]");
return invocation.proceed();
}
@Override
public Object plugin(Object target) {
return Plugin.wrap(target,this);
}
@Override
public void setProperties(Properties properties) {
}
}
上面几步完成之后,其实我们离读写分离只差一步,那就是在配置文件中配置好这些。这里需要注意的是,我们这里的
DynamicDataSourcePlugin
只做了一些简单的逻辑判断,各位需要根据实际情况去完善自己的逻辑代码。
spring-mybatis.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd ">
<context:component-scan base-package="com.ithzk.rws">
<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller" />
</context:component-scan>
<!-- 配置数据库相关参数properties属性-->
<bean class="com.ithzk.rws.utils.EncryptPropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:jdbc.properties</value>
</list>
</property>
</bean>
<!-- 数据库连接池-->
<bean id="abstractDataSource" class="com.alibaba.druid.pool.DruidDataSource" abstract="true" init-method="init" destroy-method="close">
<!-- 配置初始化大小、最小、最大 -->
<!-- 通常来说,只需要修改initialSize、minIdle、maxActive -->
<property name="initialSize" value="${db.master.initialSize}" />
<property name="minIdle" value="${db.master.minIdle}" />
<property name="maxActive" value="${db.master.maxActive}" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="${db.master.maxWait}" />
<!-- 默认值是 true ,当从连接池取连接时,验证这个连接是否有效 -->
<property name="testOnBorrow" value="true" />
<!-- 指明连接是否被空闲连接回收器(如果有)进行检验.如果检测失败,则连接将被从池中去除. 注意: 设置为true后如果要生效,validationQuery参数必须设置为非空字符串 -->
<property name="testWhileIdle" value="true" />
<!-- 默认值是 flase, 当从把该连接放回到连接池的时,验证这个连接是否有效 -->
<property name="testOnReturn" value="false" />
<!--用来验证从连接池取出的连接,在将连接返回给调用者之前.如果指定,则查询必须是一个SQL SELECT并且必须返回至少一行记录-->
<property name="validationQuery" value="SELECT 'x'" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="30000" />
<property name="removeAbandoned" value="true" />
<property name="removeAbandonedTimeout" value="180" />
<!-- 关闭abanded连接时输出错误日志 -->
<property name="logAbandoned" value="${db.master.logAbandoned}" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<property name="poolPreparedStatements" value="true" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="50" />
<property name="filters" value="stat" />
</bean>
<!-- 主库 -->
<bean id="master" parent="abstractDataSource">
<!-- 基本属性driverClassName、 url、user、password -->
<property name="driverClassName" value="${db.driver}" />
<property name="url" value="${db.master.url}" />
<property name="username" value="${db.master.username}" />
<property name="password" value="${db.master.password}" />
</bean>
<!-- 从库 -->
<bean id="slave" parent="abstractDataSource">
<!-- 基本属性driverClassName、 url、user、password -->
<property name="driverClassName" value="${db.driver}" />
<property name="url" value="${db.slave.url}" />
<property name="username" value="${db.slave.username}" />
<property name="password" value="${db.slave.password}" />
</bean>
<!-- 配置动态路由 -->
<bean id="dynamicDataSourceRouting" class="com.ithzk.rws.utils.dynamic.DynamicDataSource">
<property name="targetDataSources">
<map>
<entry key="master" value-ref="master"/>
<entry key="slave" value-ref="slave"/>
</map>
</property>
<!-- 设置默认的数据源 -->
<property name="defaultTargetDataSource" ref="master"/>
</bean>
<!-- 配置数据源 -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy">
<property name="targetDataSource" ref="dynamicDataSourceRouting"/>
</bean>
<bean id="sqlSessionFactoryWrite" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource" />
<!-- 配置mybatis全局配置文件 -->
<property name="configLocation" value="classpath:mybatis-config.xml" />
<!-- 自动扫描mapping.xml文件 -->
<property name="mapperLocations" value="classpath:mapper/*.xml"/>
</bean>
<!-- 动态扫描Dao -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.ithzk.rws.dao"/>
</bean>
<!-- (事务管理)transaction manager, use JtaTransactionManager for global tx -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<!-- 配置事务属性 -->
<tx:advice id="txAdvice" transaction-manager="transactionManager" >
<tx:attributes>
<!--定义查询方法都是只读的 -->
<tx:method name="get*" read-only="true" />
<tx:method name="list*" read-only="true" />
<tx:method name="count*" read-only="true" />
<tx:method name="query*" read-only="true" />
<tx:method name="find*" read-only="true" />
<tx:method name="select*" read-only="true" />
<!-- 主库执行操作,事务传播行为定义为默认行为 -->
<tx:method name="save*" propagation="REQUIRED" />
<tx:method name="update*" propagation="REQUIRED" />
<tx:method name="remove*" propagation="REQUIRED" />
<tx:method name="delete*" propagation="REQUIRED" />
<tx:method name="insert*" propagation="REQUIRED" />
<!--其他方法使用默认事务策略 -->
<tx:method name="*" />
</tx:attributes>
</tx:advice>
<!-- 配置事务的切点,并把事务切点和事务属性不关联起来 -->
<aop:config >
<aop:pointcut id="txPointCut" expression="execution(* com.ithzk.rws.service.impl.*.*(..))" />
<!-- 应用事务策略到Service切面 -->
<aop:advisor advice-ref="txAdvice" pointcut-ref="txPointCut" order="2"/>
</aop:config>
</beans>
mybatis-config.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTO Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<!-- 配置全局属性 -->
<settings>
<!-- 使用jdbc的generatedKeys获取数据库自增主键 -->
<setting name="useGeneratedKeys" value="true"/>
<!-- 使用列别名替换列名 默认true -->
<setting name="useColumnLabel" value="true"/>
<!-- 开启驼峰命名转换 Table{create_time} -> Entity{createTime} -->
<setting name="mapUnderscoreToCamelCase" value="true"/>
<!-- 打印sql语句 -->
<setting name="logImpl" value="STDOUT_LOGGING"/>
</settings>
<!-- 配置路由插件 -->
<plugins>
<plugin interceptor="com.ithzk.rws.utils.dynamic.DynamicDataSourcePlugin"/>
</plugins>
</configuration>
3.3 AOP动态切换
上面第二种方法我们借助了
Mybaits
框架提供的插件去变更目标数据源,其实我们通过Spring
的AOP
我们也可以达到这种效果,这里我们利用AOP
根据事务策略的配置去改变目标数据源,大家看看其中的区别。这里改动不多,唯一的区别就是将插件变更为了AOP操作。
DataSourceAspect.java(替代DynamicDataSourcePlugin)
package com.ithzk.rws.utils.aop;
import com.ithzk.rws.utils.dynamic.DynamicDataSourceHolder;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.JoinPoint;
import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttributeSource;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/** * 如果事务管理中配置了事务策略,则采用配置的事务策略中的标记了ReadOnly的方法是用Slave,其它使用Master。 * 如果没有配置事务管理的策略,则采用方法名匹配的原则,以query、find、get开头方法用Slave,其它用Master。 * @author hzk * @date 2019/3/27 */
public class DataSourceAspect {
private List<String> slaveMethodPattern = new ArrayList<String>();
private static final String[] defaultSlaveMethodStart = new String[]{ "query", "find", "get","select","list","count","select" };
private String[] slaveMethodStart;
/** * 读取事务管理中的策略 * * @param txAdvice * @throws Exception */
@SuppressWarnings("unchecked")
public void setTxAdvice(TransactionInterceptor txAdvice) throws Exception {
if (txAdvice == null) {
//未配置事务管理策略
return;
}
//获取到策略配置信息
TransactionAttributeSource transactionAttributeSource = txAdvice.getTransactionAttributeSource();
if (!(transactionAttributeSource instanceof NameMatchTransactionAttributeSource)) {
return;
}
//使用反射技术获取到NameMatchTransactionAttributeSource对象中的nameMap属性值
NameMatchTransactionAttributeSource matchTransactionAttributeSource = (NameMatchTransactionAttributeSource) transactionAttributeSource;
Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap");
//设置该字段可访问(穿透)
nameMapField.setAccessible(true);
Map<String, TransactionAttribute> map = (Map<String, TransactionAttribute>) nameMapField.get(matchTransactionAttributeSource);
for (Map.Entry<String, TransactionAttribute> entry : map.entrySet()) {
//ReadOnly只读策略加入到slaveMethodPattern
if (!entry.getValue().isReadOnly()) {
continue;
}
slaveMethodPattern.add(entry.getKey());
}
}
/** * 在进入Service方法之前执行 * @param point 切面对象 */
public void before(JoinPoint point) {
// 获取到当前执行的方法名
String methodName = point.getSignature().getName();
boolean isSlave = false;
if (slaveMethodPattern.isEmpty()) {
//当前Spring容器中没有配置事务策略,采用方法名匹配方式
isSlave = isSlave(methodName);
} else {
// 使用策略规则匹配
for (String mappedName : slaveMethodPattern) {
if (isMatch(methodName, mappedName)) {
isSlave = true;
break;
}
}
}
if (isSlave) {
// 标记为读库
DynamicDataSourceHolder.setRouteKey(DynamicDataSourceHolder.DB_MASTER);
} else {
// 标记为写库
DynamicDataSourceHolder.setRouteKey(DynamicDataSourceHolder.DB_SLAVE);
}
}
/** * 判断是否为读库 * * @param methodName * @return */
private Boolean isSlave(String methodName) {
// 方法名以query、find、get开头的方法名走从库
return StringUtils.startsWithAny(methodName, getSlaveMethodStart());
}
/** * 通配符匹配 * * Return if the given method name matches the mapped name. * <p> * The default implementation checks for "xxx*", "*xxx" and "*xxx*" matches, as well as direct * equality. Can be overridden in subclasses. * * @param methodName the method name of the class * @param mappedName the name in the descriptor * @return if the names match * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */
protected boolean isMatch(String methodName, String mappedName) {
return PatternMatchUtils.simpleMatch(mappedName, methodName);
}
/** * 用户指定slave的方法名前缀 * @param slaveMethodStart */
public void setSlaveMethodStart(String[] slaveMethodStart) {
this.slaveMethodStart = slaveMethodStart;
}
public String[] getSlaveMethodStart() {
if(this.slaveMethodStart == null){
// 没有指定,使用默认
return defaultSlaveMethodStart;
}
return slaveMethodStart;
}
}
这里配置文件需要修改的只有aop配置这方面,将我们的
DataSourceAspect
配置在切面管理器中,并且可以将mybatis-config.xml
中配置路由插件去除。
spring-mybatis.xml
<!-- 配置事务的切点,并把事务切点和事务属性不关联起来 -->
<aop:config >
<aop:pointcut id="txPointCut" expression="execution(* com.ithzk.rws.service.impl.*.*(..))" />
<!-- 应用事务策略到Service切面 -->
<aop:advisor advice-ref="txAdvice" pointcut-ref="txPointCut" order="2"/>
<!-- 将切面应用到自定义的切面处理器上,-9999保证该切面优先级最高执行 -->
<aop:aspect ref="dataSourceAspect" order="-9999">
<aop:before method="before" pointcut-ref="txPointCut" />
</aop:aspect>
</aop:config>
<!-- 定义AOP切面处理器 -->
<bean id="dataSourceAspect" class="com.ithzk.rws.utils.aop.DataSourceAspect">
<!-- 指定事务策略 -->
<property name="txAdvice" ref="txAdvice"/>
<property name="slaveMethodStart" value="query,find,get,select,list,count,select"/>
</bean>
3.4 一主多从
当我们数据访问量不断增大时,我们可能会选择使用更多的从库去减缓数据库查询的压力,避免单台服务宕机造成的不必要损失。这时我们就会采取一主多从的策略,那么我们应用层也需要作出对应的改变,以
3.2
和3.3
为例,我们只需要修改我们自定义的DynamicDataSource
类中路由Key的选取方式即可。
DynamicDataSource.java
package com.ithzk.rws.utils.dynamic;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.util.ReflectionUtils;
import javax.sql.DataSource;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/** * 通过AbstractRoutingDataSource实现动态切换数据源,需重写determineCurrentLookupKey方法 * 由于DynamicDataSource是单例的,线程不安全的,所以采用ThreadLocal保证线程安全,由DynamicDataSourceHolder完成。 * @author hzk * @date 2019/3/26 */
public class DynamicDataSource extends AbstractRoutingDataSource {
private static final int TURN_MAX_COUNT = 888;
private Integer slaveCount;
/** * 轮询计数,初始为-1,AtomicInteger是线程安全的 */
private AtomicInteger counter = new AtomicInteger(-1);
/** * 读库路由键仓库 */
private List<Object> slaveDataSources = new ArrayList<Object>(0);
@Override
protected Object determineCurrentLookupKey() {
if (DynamicDataSourceHolder.DB_MASTER.equals(DynamicDataSourceHolder.getRouteKey())) {
Object key = DynamicDataSourceHolder.getRouteKey();
System.out.println("当前数据源为: " + key);
return key;
}
Object key = getSlaveKey();
System.out.println("当前数据源为: " + key);
return key;
}
/** * 初始化读库路由键仓库 */
@SuppressWarnings("unchecked")
@Override
public void afterPropertiesSet() {
super.afterPropertiesSet();
// 反射获取父类AbstractRoutingDataSource中私有属性resolvedDataSources
Field field = ReflectionUtils.findField(AbstractRoutingDataSource.class, "resolvedDataSources");
field.setAccessible(true);
try {
Map<Object, DataSource> resolvedDataSources = (Map<Object, DataSource>) field.get(this);
//数据源总数 = 读库数量 + 写库数量(这里一主多从 写库数量即为1)
this.slaveCount = resolvedDataSources.size() - 1;
for (Map.Entry<Object, DataSource> entry : resolvedDataSources.entrySet()) {
if (DynamicDataSourceHolder.DB_MASTER.equals(entry.getKey())) {
continue;
}
slaveDataSources.add(entry.getKey());
}
} catch (Exception e) {
System.out.println("DynamicDataSource -> afterPropertiesSet Exception:"+e);
}
}
/** * 轮询算法实现 * @return 从库路由键 */
private Object getSlaveKey() {
// 获取偏移量
Integer index = counter.incrementAndGet() % slaveCount;
// 固定偏移量范围避免数值越界
if (counter.get() > TURN_MAX_COUNT) {
// 重置偏移量
counter.set(-1);
}
return slaveDataSources.get(index);
}
}
3.5 总结
以上三种方式都有自己的优缺点,相互之间存在差异化,具体如何选择大家根据自己的需求决定。三种方法看似不同,其实本质都是一样,对源码认真剖析定能知其一二。我自己在这条道路上也还有很多需要努力学习的,我的所有文章目的都是出于学习分享以及记录,如果有不正确的地方还希望大家见谅并指出。
【该章节github地址】