太久没进行分享感觉少了点东西,所以现在给自己定一个目标就是从八月份开始每个月至少分享两篇技术博客,沉淀太久了,都忘记当初为啥要看源码,为啥要深入学习技术了。回归正题,本文分享一些关于Mysql如何解决多事务并发的问题和Spring源码是怎么控制事务以及一些事务失效的场景。

分享内容

  • Mysql事务隔离机制
  • 锁机制
  • MVCC多版本并发控制隔离机制
  • Spring事务应用和源码分析
  • 事务失效问题

一、Mysql事务

数据库的多事务并发问题,为了解决多事务并发问题,数据库设计了事务隔离机制、锁机制、MVCC多版本并发控制隔离机制,用一整套机制来解决多事务并发问题

1.1事务的属性

事务(Transaction)是数据库系统中一系列操作的一个逻辑单元,所有操作要么全部成功要么全部失败。

事务是区分文件存储系统与Nosql数据库重要特性之一,其存在的意义是为了保证即使在并发情况下也能正确的执行crud操作。怎样才算是正确的呢?这时提出了事务需要保证的四个特性即ACID:

  • 原子性(Atomicity) :事务是一个原子操作单元,其对数据的修改,要么全都执行,要么全都不执行。
  • 一致性(Consistent) :在事务开始和完成时,数据都必须保持一致状态。这意味着所有相关的数据规则都必须应用于事务的修改,以保持数据的完整性。
  • 隔离性(Isolation) :数据库系统提供一定的隔离机制,保证事务在不受外部并发操作影响的”独立“环境执行。这意味着事务处理过程中的中间状态对外部是不可见的,反之亦然。
  • 持久性(Durable) :事务完成之后,它对于数据的修改是永久性的,即使出现系统故障也能够保持。

1.2事务的隔离级别

在高并发的情况下,要完全保证其ACID特性是非常困难的,除非把所有的事务串行化执行,但带来的负面的影响将是性能大打折扣。很多时候我们有些业务对事务的要求是不一样的,所以数据库中设计了四种隔离级别,供用户基于业务进行选择。

数据库默认隔离级别:

Oracle中默认级别是 Read committed

Mysql 中默认级别 Repeatable read

#查看mysql 的默认隔离级别
SELECT @@tx_isolation

#设置为读未提交
set tx_isolation='read-uncommitted';  

#设置为读已提交
set tx_isolation='read-committed';  

#设置为可重复读
set tx_isolation='REPEATABLE-READ';   

#设置为串行化
set tx_isolation='SERIALIZABLE';

并发事务处理带来的问题

  • 更新丢失(Lost Update)或脏写

当两个或多个事务选择同一行,然后基于最初选定的值更新该行时,由于每个事务都不知道其他事务的存在,就会发生丢失更新问题–最后的更新覆盖了由其他事务所做的更新

  • 脏读(Dirty Reads)

一个事务正在对一条记录做修改,在这个事务完成并提交前,这条记录的数据就处于不一致的状态;这时,另一个事务也来读取同一条记录,如果不加控制,第二个事务读取了这些“脏”数据,并据此作进一步的处理,就会产生未提交的数据依赖关系。这种现象被形象<typo id="typo-1445" data-origin="的" ignoretag="true">的</typo>叫做“脏读”。

简短总结:事务A读取到了事务B已经修改但尚未提交的数据,还在这个数据基础上做了操作。此时,如果B事务回滚,A读取的数据无效,不符合一致性要求。

  • 不可重读(Non-Repeatable Reads)

一个事务在读取某些数据后的某个时间,再次读取以前读过的数据,却发现其读出的数据已经发生了改变、或某些记录已经被删除了!这种现象就叫做“不可重复读”。

简短总结:事务A内部的相同查询语句在不同时刻读出的结果不一致,不符合隔离性

  • 幻读(Phantom Reads)

一个事务按相同的查询条件重新读取以前检索过的数据,却发现其他事务插入了满足其查询条件的新数据,这种现象就称为“幻读”。

简短总结:事务A读取到了事务B提交的新增数据,不符合隔离性

image

具体的隔离级别的实战演示省略了

1.3锁机制

锁是计算机协调多个进程或线程并发访问某一资源的机制。

在数据库中,除了传统的计算资源(如CPU、RAM、I/O等)的争用以外,数据也是一种供需要用户共享的资源。如何保证数据并发访问的一致性、有效性是所有数据库必须解决的一个问题,锁冲突也是影响数据库并发访问性能的一个重要因素。

锁分类

  • 从性能上分为乐观锁(用版本对比来实现)和悲观锁
  • 从对数据库操作的类型分,分为读锁和写锁(都属于悲观锁)

读锁(共享锁,S锁(Shared)):针对同一份数据,多个读操作可以同时进行而不会互相影响

写锁(排它锁,X锁(eXclusive)):当前写操作没有完成前,它会阻断其他写锁和读锁

  • 从对数据操作的<typo id="typo-2125" data-origin="粒度" ignoretag="true">粒度</typo>分,分为表锁和行锁

表锁

每次操作锁住整张表。开销小,加锁快;不会出现死锁;锁定<typo id="typo-2170" data-origin="粒度" ignoretag="true">粒度</typo>大,发生锁冲突的概率最高,并发度最低;一般用在整表数据迁移的场景。

1.4WAL原则

InnoDB的ARIES三原则Write Ahead Logging(WAL):

  • 日志成功写入后事务就不会丢失,后续由checkpoint机制来保证磁盘物理文件与redo log达到一致性;
  • 利用redo log来记录变更后的数据,即redo里记录事务数据变更后的值;
  • 利用undo log来记录变更前的数据,即undo里记录事务数据变更前的值,用于回滚和其他事务多版本读。

1.5并发事务控制

  • 单版本控制-锁 锁用独占的方式来保证在只有一个版本的情况下事务之间相互隔离。 在 MySQL 事务中,锁的实现与隔离级别有关系,在 RR(Repeatable Read)隔离级别下,MySQL 为了解决幻读的问题,以牺牲并行度为代价,通过 Gap 锁来防止数据的写入,而这种锁,因为其并行度不够,冲突很多,经常会引起死锁。现在流行的 Row 模式可以避免很多冲突甚至死锁问题,所以推荐默认使用 Row + RC(Read Committed)模式的隔离级别,可以很大程度上提高数据库的读写并行度。
  • MVCC多版本并发控制隔离机制 Mysql在可重复读隔离级别下如何保证事务较高的隔离性,同样的sql查询语句在一个事务里多次执行查询结果相同,就算其它事务对数据有修改也不会影响当前事务sql语句的查询结果。 这个隔离性就是靠MVCC(Multi-Version Concurrency Control)机制来保证的,对一行数据的读和写两个操作默认是不会通过加锁互斥来保证隔离性,避免了频繁加锁互斥,而在串行化隔离级别为了保证较高的隔离性是通过将所有操作加锁互斥来实现的。 Mysql在读已提交和可重复读隔离级别下都实现了MVCC机制。

MVCC机制的实现就是通过read-view机制与undo版本链比对机制,使得不同的事务会根据数据版本链对比规则读取同一条数据在版本链上的不同版本数据。

undo日志版本链

对于使用InnoDB存储引擎的表来说,它的聚簇索引记录中都包含两个必要的隐藏列(row_id并不是必要的,我们创建的表中有主键或者非NULL唯一键时都不会包含row_id列):

  • trx_id:每次对某条记录进行改动时,都会把对应的事务id赋值给trx_id隐藏列。
  • roll_pointer:每次对某条记录进行改动时,这个隐藏列会存一个指针,可以通过这个指针找到该记录修改前的信息。

read view机制详解

对于使用READ UNCOMMITTED隔离级别的事务来说,直接读取记录的最新版本就好了,对于使用SERIALIZABLE隔离级别的事务来说,使用加锁的方式来访问记录。对于使用READ COMMITTED和REPEATABLE READ隔离级别的事务来说,就需要用到我们上边所说的版本链了,核心问题就是:需要判断一下版本链中的哪个版本是当前事务可见的。

ReadView中主要包含4个比较重要的内容:

  1. m_ids:表示在生成ReadView时当前系统中活跃的读写事务的事务id列表。
  2. min_trx_id:表示在生成ReadView时当前系统中活跃的读写事务中最小的事务id,也就是m_ids中的最小值。
  3. max_trx_id:表示生成ReadView时系统中应该分配给下一个事务的id值。
  4. creator_trx_id:表示生成该ReadView的事务的事务id。

注意max_trx_id并不是m_ids中的最大值,事务id是递增分配的。比方说现在有id为1,2,3这三个事务,之后id为3的事务提交了。那么一个新的读事务在生成ReadView时,m_ids就包括1和2,min_trx_id的值就是1,max_trx_id的值就是4。

有了这个ReadView,这样在访问某条记录时,只需要按照下边的步骤判断记录的某个版本是否可见:

  • 如果被访问版本的trx_id属性值与ReadView中的creator_trx_id值相同,意味着当前事务在访问它自己修改过的记录,所以该版本可以被当前事务访问。
  • 如果被访问版本的trx_id属性值小于ReadView中的min_trx_id值,表明生成该版本的事务在当前事务生成ReadView前已经提交,所以该版本可以被当前事务访问。
  • 如果被访问版本的trx_id属性值大于ReadView中的max_trx_id值,表明生成该版本的事务在当前事务生成ReadView后才开启,所以该版本不可以被当前事务访问。
  • 如果被访问版本的trx_id属性值在ReadView的min_trx_id和max_trx_id之间,那就需要判断一下trx_id属性值是不是在m_ids列表中,如果在,说明创建ReadView时生成该版本的事务还是活跃的,该版本不可以被访问;如果不在,说明创建ReadView时生成该版本的事务已经被提交,该版本可以被访问。

READ COMMITTED的实现方式

每次读取数据前都生成一个ReadView

REPEATABLE READ实现方式

在第一次读取数据时生成一个ReadView

题记:关于undo日志版本链与read view机制细节还需要探讨一下

二、Spring事务

2.1Spring 事务相关API

断点跟spring源码后在processon上画关于spring事务执行的流程图

www.processon.com/view/5eea03

Spring 事务是在数据库事务的基础上进行封装扩展 其主要特性如下:

  • 支持原有的数据库事务的隔离级别,加入了事务传播的概念
  • 提供多个事务的合并或隔离的功能
  • 提供声明式事务,让业务代码与事务分离,事务变得更易用 (AOP)

大致描述一下Spring 提供了事务相关接口:

TransactionDefinition

事务定义 : 事务的隔离级别 事务的传播行为

public interface TransactionDefinition {

    int PROPAGATION_REQUIRED = 0;

    int PROPAGATION_SUPPORTS = 1;

    int PROPAGATION_MANDATORY = 2;

    int PROPAGATION_REQUIRES_NEW = 3;

    int PROPAGATION_NOT_SUPPORTED = 4;

    int PROPAGATION_NEVER = 5;

    int PROPAGATION_NESTED = 6;

    int ISOLATION_DEFAULT = -1;

    int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;

    int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;

    int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;

    int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;

    int TIMEOUT_DEFAULT = -1;

    int getPropagationBehavior();

    int getIsolationLevel();

    int getTimeout();

    boolean isReadOnly();

    String getName();

}

TransactionAttribute

事务属性,实现了对回滚规则的扩展(处理异常)

public interface TransactionAttribute extends TransactionDefinition {

    String getQualifier();

    boolean rollbackOn(Throwable ex);

}
image

PlatformTransactionManager

平台事务管理器

public interface PlatformTransactionManager {

    TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException;

    void commit(TransactionStatus status) throws TransactionException;

    void rollback(TransactionStatus status) throws TransactionException;

}
image

TransactionStatus

事务运行时状态

public interface TransactionStatus extends SavepointManager, Flushable {

    boolean isNewTransaction();

    boolean hasSavepoint();

    void setRollbackOnly();

    boolean isRollbackOnly();

    void flush();

    boolean isCompleted();

}

TransactionInterceptor

事务拦截器,实现了MethodInterceptor

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {

    /**
     * 创建新的TransactionInterceptor
     * 需要设置事务管理器和事务属性。
     */
    public TransactionInterceptor() {
    }

    public TransactionInterceptor(PlatformTransactionManager ptm, Properties attributes) {
        setTransactionManager(ptm);
        setTransactionAttributes(attributes);
    }

    public TransactionInterceptor(PlatformTransactionManager ptm, TransactionAttributeSource tas) {
        setTransactionManager(ptm);
        setTransactionAttributeSource(tas);
    }

    @Override
    @Nullable
    public Object invoke(MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

        return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
    }

    private void writeObject(ObjectOutputStream oos) throws IOException {

        oos.defaultWriteObject();

        oos.writeObject(getTransactionManagerBeanName());
        oos.writeObject(getTransactionManager());
        oos.writeObject(getTransactionAttributeSource());
        oos.writeObject(getBeanFactory());
    }

    private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {

        ois.defaultReadObject();

        setTransactionManagerBeanName((String) ois.readObject());
        setTransactionManager((PlatformTransactionManager) ois.readObject());
        setTransactionAttributeSource((TransactionAttributeSource) ois.readObject());
        setBeanFactory((BeanFactory) ois.readObject());
    }

}

找事务拦截器 核心:TransactionInterceptor#invoke

TransactionAspectSupport

事务<typo id="typo-7873" data-origin="切面" ignoretag="true">切面</typo>支持, 内部类TransactionInfo封装了事务相关属性

TransactionAspectSupport.TransactionInfo

protected final class TransactionInfo {
    @Nullable
    private final PlatformTransactionManager transactionManager;

    @Nullable
    private final TransactionAttribute transactionAttribute;

    private final String joinpointIdentification;

    @Nullable
    private TransactionStatus transactionStatus;

    @Nullable
    private TransactionInfo oldTransactionInfo;

2.2编程式事务和声明式事务

编程式事务

演示代码:

public class SpringTransactionExample {
    private static String url = "jdbc:mysql://127.0.0.1:3306/test";
    private static String user = "root";
    private static String password = "root";

    public static void main(String[] args) {

        // 获取数据源
        final DataSource ds = new DriverManagerDataSource(url, user, password);
        // 编程式事务
        final TransactionTemplate template = new TransactionTemplate();
        // 设置事务管理器
        template.setTransactionManager(new DataSourceTransactionManager(ds));

        template.execute(new TransactionCallback<Object>() {
            @Override
            public Object doInTransaction(TransactionStatus status) {
                Connection conn = DataSourceUtils.getConnection(ds);
                Object savePoint = null;
                try {
                    {
                        // 插入
                        PreparedStatement prepare = conn.
                                prepareStatement(
                                "insert INTO account (accountName,user,money) VALUES (?,?,?)");
                        prepare.setString(1, "111");
                        prepare.setString(2, "aaa");
                        prepare.setInt(3, 10000);
                        prepare.executeUpdate();
                    }
                    // 设置保存点
                   savePoint = status.createSavepoint();
                    {
                        // 插入
                        PreparedStatement prepare = conn.
                                prepareStatement(
                                "insert INTO account (accountName,user,money) VALUES (?,?,?)");
                        prepare.setString(1, "222");
                        prepare.setString(2, "bbb");
                        prepare.setInt(3, 10000);
                        prepare.executeUpdate();
                    }
                    {
                        // 更新
                        PreparedStatement prepare = conn.prepareStatement(
                                "UPDATE account SET money= money+100 where user=?");
                        prepare.setString(1, "aaa");
                        prepare.executeUpdate();
                        //int i=1/0;

                    }
                } catch (SQLException e) {
                    e.printStackTrace();
                } catch (Exception e) {
                    System.out.println("更新失败");
                    if (savePoint != null) {
                        status.rollbackToSavepoint(savePoint);
                    } else {
                        status.setRollbackOnly();
                    }
                }
                return null;
            }
        });
    }

}

声明式事务

@Transactional

<!-- 开启事务控制的注解支持 -->  
<tx:annotation-driven transaction-manager="txManager"/>

事务注解配置,作用于类,方法上

|

属性名

|

说明

|
|

name

|

<typo id="typo-11218" data-origin="当在" ignoretag="true">当在</typo>配置文件中有多个 TransactionManager , 可以用该属性指定选择哪个事务管理器。

|
|

propagation

|

事务的传播行为,默认值为 REQUIRED。

|
|

isolation

|

事务的隔离度,默认值采用 DEFAULT。

|
|

timeout

|

事务的超时时间,默认值为-1。如果超过该时间限制但事务还没有完成,则自动回滚事务。

|
|

read-only

|

指定事务是否为只读事务,默认值为 false;为了忽略那些不需要事务的方法,比如读取数据,可以设置 read-only 为 true。

|
|

rollback-for

|

用于指定能够触发事务回滚的异常类型,如果有多个异常类型需要指定,各类型之间可以通过逗号分隔。

|
|

no-rollback- for

|

抛出 no-rollback-for 指定的异常类型,不回滚事务。

|

Java Configuration

@EnableTransactionManagement

利用TransactionManagementConfigurationSelector向容器中注册两个组件

  • AutoProxyRegistrar 给容器中注册一个 InfrastructureAdvisorAutoProxyCreator 的后置处理器,返回一个代理对象(增强器),代理对象执行方法利用拦截器链进行调用;
  • ProxyTransactionManagementConfiguration 是一个@Configuration 给容器中注册事务增强器transactionAdvisor; 复制代码 AnnotationTransactionAttributeSource解析事务注解 复制代码 事务拦截器transactionInterceptor 复制代码
@Configuration
@ComponentScan("com.xuchang")
@EnableTransactionManagement
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class AppConfig {

    @Bean
    public DataSource dataSource(){
        DriverManagerDataSource dataSource = new DriverManagerDataSource();
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource.setUrl("jdbc:mysql://localhost:3306/test?characterEncoding=utf8");
        dataSource.setUsername("root");
        dataSource.setPassword("root");
        return dataSource;
    }

    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }

    @Bean
    public PlatformTransactionManager transactionManager(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }
}

2.3事务失效问题

  • Bean是否是代理对象
  • 入口函数是否是public的
  • 数据库是否支持事务(Mysql的MyIsam不支持事务) ,行锁才支持事务
  • 切点是否配置正确
  • 内部方法间调用导致事务失效 因为this不是代理对象,可以配置 expose-proxy="true" ,就可以通过AopContext.currentProxy()获取 到当前类的代理对象。 <aop:aspectj-autoproxy expose-proxy="true"/> 复制代码 @EnableAspectJAutoProxy(exposeProxy = true) 复制代码 也可以注入当前bean
  • 异常类型是否配置正确 默认只支持 RuntimeException和Error ,不支持检查异常 想要支持检查异常需配置rollbackFor @Transactional(rollbackFor = Exception.class) 复制代码 异常体系:
image

源码分析

#找事务拦截器
TransactionInterceptor#invoke
# 事务相关的调用
TransactionAspectSupport#invokeWithinTransaction
#异常回滚的逻辑
TransactionAspectSupport#completeTransactionAfterThrowing

#异常回滚
txInfo.transactionAttribute.rollbackOn(ex)

#可以设置异常回滚规则  
RuleBasedTransactionAttribute#rollbackOn

# 默认的异常回滚规则
DefaultTransactionAttribute#rollbackOn
public boolean rollbackOn(Throwable ex) {
    return (ex instanceof RuntimeException || ex instanceof Error);
}

2.4事务的传播机制

spring在TransactionDefinition接口中定义了七个事务传播行为:

  • propagation_requierd:如果当前没有事务,就新建一个事务,如果已存在一个事务中,加入到这个事务中,这是最常见的选择。
  • propagation_supports:支持当前事务,如果没有当前事务,就以非事务方法执行。
  • propagation_mandatory:使用当前事务,如果没有当前事务,就抛出异常。
  • propagation_required_new:新建事务,如果当前存在事务,把当前事务挂起。
  • propagation_not_supported:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
  • propagation_never:以非事务方式执行操作,如果当前事务存在则抛出异常。
  • propagation_nested:如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与propagation_required类似的操作

常用事务传播机制:

  • PROPAGATION_REQUIRED 这个也是默认的传播机制;
  • PROPAGATION_REQUIRES_NEW 总是新启一个事务,这个传播机制适用于不受父方法事务影响的操作,比如某些业务场景下需要记录业务日志,用于异步反查,那么不管主体业务逻辑是否完成,日志都需要记录下来,不能因为主体业务逻辑报错而丢失日志;
  • PROPAGATION_NOT_SUPPORTED 可以用于发送提示消息,站内信、短信、邮件提示等。不属于并且不应当影响主体业务逻辑,即使发送失败也不应该对主体业务逻辑回滚。

2.5源码分析

#找事务拦截器
TransactionInterceptor#invoke
# 事务相关的调用
>TransactionAspectSupport#invokeWithinTransaction

#返回事务信息 TransactionInfo
>TransactionAspectSupport#createTransactionIfNecessary

# 返回 TransactionStatus  包含事务传播属性的逻辑
>AbstractPlatformTransactionManager#getTransaction

断点跟spring源码后在processon上画关于spring事务执行的流程图

www.processon.com/view/5eea03