技术交流QQ群:1027579432,欢迎你的加入!

0.概述

  • 当神经网络来处理大量的输入信息时,也可以借助人脑的注意力机制,只选择一些关键的信息输入进行处理,用来提高神经网络的效率。在目前的神经网络模型中,可以将max pooling和gating机制近似地看作是自下而上的基于显著性的注意力机制。此外,自上而下的聚焦式注意力也是一种有效的信息选择方法。例如:给定一篇很长的文章,然后就此文章的内容进行提问,提出的问题只和文章中某个段落中的一两个句子相关,其余都无关的。为了减小神经网络的计算代价,只需要把相关的片段挑选出来让后续的神经网络来处理,而不需要把所有文章内容都输入到神经网络中。

1.Attention机制基础知识

  • 表示N组输入信息,其中每个向量都表示一组输入信息。为了节省计算资源,不需要将所有的信息都输入到神经网络中,只需要从X中选择一些和任务相关的信息。注意力机制的计算可以分为两步:
    • (1)在所有输入信息上计算注意力分布;
    • (2)根据注意力分布来计算输入信息的加权平均
  • 1.1 注意力分布
    • 为了从N个输入向量中选择出与某个特定任务相关的信息,需要引入一个和任务相关的表示,称为查询向量q,并通过一个打分函数来计算每个输入向量和查询向量之间的相关性。
    • 给定一个和任务相关的查询向量q,用注意力变量来表示被选择信息的索引位置,即z=i表示选择了第i个输入向量。为了方便计算,下面首先介绍Soft Attention注意力机制。首先计算在给定q和X下,选择第i个输入向量的概率

      其中称为注意力分布,是注意力打分函数,可以使用下面的几种方法来计算:
      • 加性模型
      • 点积模型
      • 缩放点积模型
      • 双线性模型
    • 上式中W、U、v是可学习的参数,d是输入向量的维度。理论上,加性模型和点积模型的复杂度差不多,但是点积模型在实现上可以更好地利用矩阵乘积,从而计算效率更高。但当输入向量的维度d比较高,点积模型的值通常有较大的方差,从而导致softmax函数的梯度比较小。因此,缩放点积模型可以很好地解决这个问题。双线性模型可以看做是一种泛化的点积模型。假设 ,则双线性模型可以写为即分别对x和q进行线性变换后计算点积。相比点积模型,双线性模型在计算相似度时引入了非对称性。
  • 1.2 加权平均
    • 注意力分布可以解释为在给定任务相关的查询q时,第i个输入向量受注意的程度。下面采用一种软性的信息选择机制对输入信息进行汇总。

      上式称为软注意力机制(Soft Attention Mechanism)。下图给出了软注意力机制的示例图:
      软注意力机制

2.其他类型的注意力机制

  • 2.1 硬注意力机制
    • 上面的公式提到的是软注意力机制,其选择的信息是所有输入向量在注意力分布下的期望。此外还有一种注意力是只关注到某一个输入向量,叫做硬注意力机制(Hard Attention Mechanism)。硬注意力机制有两种方法可以实现:
      • (1)选择最高概率的一个输入向量,即

        其中j为概率最大的输入向量的下标,即
      • (2)通过在注意力分布上随机采样的方式实现
    • 硬注意力的一个缺点是基于最大采样或随机采样的方式来选择信息。因此最终的损失函数与注意力分布之间的函数关系不可导,因此无法使用反向传播算法进行训练。为了使用反向传播算法进行训练,一般使用软注意力机制。
  • 2.2 键值对注意力
    • 可以使用键值对格式来表示输入信息,其中键用来计算注意力分布,值用来计算聚合信息。用来表示N组输入信息,给定任务相关的查询向量q时,注意力函数为:

      其中是打分函数,1.2节的图中给出了键值对注意力机制的示意图。当K=V时,键值对模式等于普通模式的注意力机制。
  • 2.3 多头注意力
    • 多头注意力(Multi-head Attention)是利用多个查询 来平行计算从输入信息中选取多组信息。每个注意 力关注输入信息的不同部分。

      其中⊕表示向量拼接。
  • 2.4 自注意力模型(Self Attention)
    • 当使用神经网络来处理一个变化长度的向量序列时,通过可以使用卷积网络或循环网络进行编码来得到一个相同长度的输出向量序列,如下图所示:


      基于卷积神经网络和循环神经网络的变长序列编码
    • 基于卷积或循环网络的序列编码都是可以看做是一种局部的编码方式,只建模了输入信息的局部依赖关系。虽然循环网络理论上可以建立长距离依赖关系,但是由于信息传递的容量以及梯度消失问题,实际上也只能建立短距离依赖关系。
    • 如果要建立输入序列之间的长距离依赖关系,可以使用以下两种方法:一种方法是增加网络的层数,通过一个深层网络来获取远距离的信息交互;另一种方法是使用全连接网络。全连接网络是一种非常直接的建模远距离依赖的模型,但是无法处理变长的输入序列。不同的输入长度,其连接权重的大小也是不同的。这时,就可以利用注意力机制来“动态”地生成不同连接的权重,这就是自注意力模型(Self-Attention Model)
    • 假设输入序列为,输出序列为,则可以通过线性变换得到三组向量序列:

      其中,Q、K、V分别为查询向量序列,键向量序列、值向量序列,分别表示可学习的参数矩阵。根据,可以得到输出向量

      其中,为输出和输入向量序列的位置,连接权重由注意力机制动态生成。
      如果使用缩放点积来作为注意力打分函数,输出向量序列可以写为:

      其中softmax函数为按列进行归一化的函数。
    • 下图给出了全连接模型和自注意力模型的对比,其中实线表示可学习的权重,虚线表示动态生成的权重。由于自注意力模型的权重是动态生成的,因此可以处理变长的信息序列。


      全连接模型与自注意力模型
    • 自注意力模型可以作为神经网络中的一层来使用,既可以用来替换卷积层和循环层,也可以和它们一起交替使用(例如输入向量X可以是卷积层或循环层的输出)。自注意模型计算的权重只依赖于的相关性,从而忽略了输入信息的位置信息。因此,在单独使用时,自注意模型一般需要加入位置编码信息来进行修正。

3.实战------以Seq2Seq网络进行法语到英语的翻译为例进行说明

  • 利用机器翻译中的经典网络结构Seq2Seq(具体结构见参考资料中的文献),其中包含Encoder编码网络将输入的法语句子进行编码,然后输入到Decoder解码网络进行解码,输出期望得到的英文句子。整个网络的结构如下:


    Seq2Seq网络结构

    Encoder部分

    Decoder部分

    注意力权重可视化
增加坐标轴的注意力权重可视化1
增加坐标轴的注意力权重可视化2

增加坐标轴的注意力权重可视化3
增加坐标轴的注意力权重可视化4
from __future__ import unicode_literals, print_function, division

from io import open
import unicodedata
import string
import re
import random
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 将法语翻译成英语


SOS_token = 0  # 开始的标注
EOS_token = 1  # 结束的标注

# 辅助类


class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {}   # word---->index
        self.index2word = {0: "SOS", 1: "EOS"}  # index---->word
        self.word2count = {}   # 稍后用来替换稀有单词,统计每个单词出现的次数
        self.n_words = 2  # 统计单词总数
    
    def addSentence(self, sentence):
        for word in sentence.split(" "):
            self.addWord(word)
    
    def addWord(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1
 # Turn a Unicode string to plain ASCII
def unicodeToAscii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )

# 小写,修剪和删除非字母字符
def normalizeString(s):
    s = unicodeToAscii(s.lower().strip())
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
    return s
# 加载文件
def readLangs(lang1, lang2, reverse=False):
    print("Reading lines.......")
    # 读取文件并进行划分成行
    lines = open(r"E://DeepLearning//jupyter_code//dataset//corpus//translation_data//%s-%s.txt" % (lang1, lang2), encoding='utf-8').\
                read().strip().split("\n")
    
    # 将每行切成一组pairs
    pairs = [[normalizeString(s) for s in l.split("\t")] for l in lines]
    # 将其他语言翻译成英语
    if reverse:
        pairs = [list(reversed(p)) for p in pairs]
        input_lang = Lang(lang2)
        output_lang = Lang(lang1)
    else:
        input_lang = Lang(lang1)  
        output_lang = Lang(lang2)   
        
    return input_lang, output_lang, pairs

# 由于有很多例句,为了能快速训练,我们会将数据集修剪成相对简短的句子。这里最大长度是10个单词(包括结束标点符号)

MAX_LENGTH = 10

# 英语前缀
eng_prefixes = (
    "i am ", "i m ",
    "he is", "he s ",
    "she is", "she s ",
    "you are", "you re ",
    "we are", "we re ",
    "they are", "they re "
)


def filterPair(p):
    return len(p[0].split(' ')) < MAX_LENGTH and \
        len(p[1].split(' ')) < MAX_LENGTH and \
        p[1].startswith(eng_prefixes)


def filterPairs(pairs):
    return [pair for pair in pairs if filterPair(pair)]
def prepareData(lang1, lang2, reverse=False):
    input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse)
    print("Read %s sentence pairs" % len(pairs))
    pairs = filterPairs(pairs)
    print("Trimmed to %s sentence pairs" % len(pairs))
    print("Counting words...")
    for pair in pairs:
        input_lang.addSentence(pair[0])
        output_lang.addSentence(pair[1])
    print("Counted words:")
    print(input_lang.name, input_lang.n_words)
    print(output_lang.name, output_lang.n_words)
    return input_lang, output_lang, pairs


input_lang, output_lang, pairs = prepareData('eng', 'fra', True)    
# print("pairs:\n", pairs)  pairs = [法语,英语]
print(random.choice(pairs))


# Encoder 部分


class EncoderRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size   # 隐藏状态a的大小
        
        self.embedding = nn.Embedding(input_size, hidden_size)  # 词嵌入层
        self.gru = nn.GRU(hidden_size, hidden_size)   # 多层的GRU
        
    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        output = embedded
        output, hidden = self.gru(output, hidden)
        return output, hidden
    
    def initHidden(self):
        return torch.zeros(1,1, self.hidden_size, device=device)


# Decoder部分


class DecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(DecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)
        
    def forward(self, input, hidden):
        output = self.embedding(input).view(1, 1, -1)
        output = F.relu(output)
        output, hidden = self.gru(output, hidden)
        output = self.softmax(self.out(output[0]))
        return output, hidden
    
    def initHidden(self):
        return torch.zeros(1,1,self.hidden_size, device=device)

# Attention 部分
class AttnDecoderRNN(nn.Module):
    def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH):
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length
        
        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.attn = nn.Linear(self.hidden_size*2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size*2, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)
    
    def forward(self, input, hidden, encoder_outputs):
        embedded = self.embedding(input).view(1, 1, -1)
        embedded = self.dropout(embedded)
        
        attn_weights = F.softmax(self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)  # 注意力权重
        attn_applied = torch.bmm(attn_weights.unsqueeze(0), encoder_outputs.unsqueeze(0))  # 两个batch之间的矩阵乘法
        
        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)
        
        output = F.relu(output)
        output, hidden = self.gru(output, hidden)
        
        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights
    
    # 隐状态初始化
    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

# 训练模型

# 准备训练数据

def indexesFromSentence(lang, sentence):
    return [lang.word2index[word] for word in sentence.split(" ")]

def tensorFromSentence(lang, sentence):
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)   # EOS作为encoder编码器网络的结束标志,  SOS作为Decoder解码器网络的开始标志
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

def tensorsFromPair(pair): 
    input_tensor = tensorFromSentence(input_lang, pair[0])   # pair[0]是法语
    targe_tensor = tensorFromSentence(output_lang, pair[1])  # pair[1]是英语
    return (input_tensor, targe_tensor)


# 开始训练

# “tearcher_forcing_ratio将上一时刻的真实目标输出当作下一个时刻的Encoder网络的输入,而不是使用Encoder网络的上一时刻的预测输出作为下一时刻的输入。
tearcher_forcing_ratio = 0.5  

def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH):
    encoder_hidden = encoder.initHidden()
    
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()
    
    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)
    
    encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)
    
    loss = 0
    
    # encoder部分
    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)
        encoder_outputs[ei] = encoder_output[0, 0]
    
    # decoder部分
    decoder_input = torch.tensor([[SOS_token]], device=device)
    
    decoder_hidden = encoder_hidden
    
    use_teacher_foring = True if random.random() < tearcher_forcing_ratio else False
    
    # using teacher forcing
    if use_teacher_foring:
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs)
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di]  
            
    # 不使用teacher forcing,使用上一时刻的输出作为下一时刻的输入        
    else:
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs)
            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze().detach()  
            
            loss += criterion(decoder_output, target_tensor[di])
            
            if decoder_input.item() == EOS_token:
                break
    loss.backward()
    
    encoder_optimizer.step()
    decoder_optimizer.step()
    
    return loss.item() / target_length


# 辅助函数------记录时间

import time
import math

def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return "%dm %ds" % (m, s)

def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return "%s (- %s)" % (asMinutes(s), asMinutes(rs))


# 整个训练过程如下:
 # 开启定时器
 # 初始化优化器和loss函数
 # 创建training pairs
 # 开始训练并绘图

def trainIters(encoder, decoder, n_iters, print_every=1000, plot_every=100, learning_rate=0.01):
    start = time.time()  # 开启定时器
    plot_losses = []
    print_loss_total = 0  # Reset every print_every
    plot_loss_total = 0  # Reset every plot_every

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)  # 定义优化算法
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
    training_pairs = [tensorsFromPair(random.choice(pairs))  # 创建training pairs
                      for i in range(n_iters)]
    criterion = nn.NLLLoss()  # 定义损失函数

    for iter in range(1, n_iters + 1):
        training_pair = training_pairs[iter - 1]
        input_tensor = training_pair[0]
        target_tensor = training_pair[1]

        loss = train(input_tensor, target_tensor, encoder,
                     decoder, encoder_optimizer, decoder_optimizer, criterion)
        print_loss_total += loss
        plot_loss_total += loss

        if iter % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print('%s (%d %d%%) %.4f' % (timeS***art, iter / n_iters),
                                         iter, iter / n_iters * 100, print_loss_avg))

        if iter % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            plot_loss_total = 0

    showPlot(plot_losses)


# 绘制loss曲线

import matplotlib.pyplot as plt
plt.switch_backend('agg')
import matplotlib.ticker as ticker
import numpy as np

%matplotlib inline

def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    # this locator puts ticks at regular intervals
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)


# 测试阶段--------测试阶段整体与训练阶段类似,但是测试阶段,不用给出target_tensor,只是将decoder网络上一时刻的预测值作为下一时刻的输入值
# 当预测值是EOS时,则停止预测

def evaluate(encoder, decoder, sentence, max_length = MAX_LENGTH):
     with torch.no_grad():
        input_tensor = tensorFromSentence(input_lang, sentence)
        input_length = input_tensor.size()[0]
        encoder_hidden = encoder.initHidden()

        encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

        # encoder部分
        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei],
                                                     encoder_hidden)
            encoder_outputs[ei] += encoder_output[0, 0]

        decoder_input = torch.tensor([[SOS_token]], device=device)  # SOS

        decoder_hidden = encoder_hidden

        decoded_words = []
        decoder_attentions = torch.zeros(max_length, max_length)
        
        # decoder部分
        for di in range(max_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            decoder_attentions[di] = decoder_attention.data
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == EOS_token:  # 结束时的条件
                decoded_words.append('<EOS>')
                break
            else:
                decoded_words.append(output_lang.index2word[topi.item()])

            decoder_input = topi.squeeze().detach()

        return decoded_words, decoder_attentions[:di + 1]

# 随机地从训练集中选择pairs,然后在测试集上进行评估

def evaluateRandomly(encoder, decoder, n=10):
    for i in range(n):
        pair = random.choice(pairs)
        print('输入:>', pair[0])
        print('目标:=', pair[1])
        output_words, attentions = evaluate(encoder, decoder, pair[0])
        output_sentence = ' '.join(output_words)
        print('预测:<', output_sentence)
        print('')

# 正式训练开始运行

hidden_size = 256
encoder1 = EncoderRNN(input_lang.n_words, hidden_size).to(device)
attn_decoder1 = AttnDecoderRNN(hidden_size, output_lang.n_words, dropout_p=0.1).to(device)

trainIters(encoder1, attn_decoder1, 75000, print_every=5000)

evaluateRandomly(encoder1, attn_decoder1)

# 注意力可视化

output_words, attentions = evaluate(
    encoder1, attn_decoder1, "je suis trop froid .")
plt.matshow(attentions.numpy());

# 增加坐标轴,更加清楚的可视化

def showAttention(input_sentence, output_words, attentions):
    # Set up figure with colorbar
    fig = plt.figure()
    ax = fig.add_subplot(111)
    cax = ax.matshow(attentions.numpy(), cmap='bone')
    fig.colorbar(cax)

    # Set up axes
    ax.set_xticklabels([''] + input_sentence.split(' ') +
                       ['<EOS>'], rotation=90)
    ax.set_yticklabels([''] + output_words)

    # Show label at every tick
    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    plt.show()


def evaluateAndShowAttention(input_sentence):
    output_words, attentions = evaluate(
        encoder1, attn_decoder1, input_sentence)
    print('input =', input_sentence)
    print('output =', ' '.join(output_words))
    showAttention(input_sentence, output_words, attentions)

4.参考资料