自动语音识别

目录

自动语音识别

介绍几个前导知识:

了解数据集

代码实现+讲解


首先我们看一下WaveNet的网络结构:

         我大概描述一下这个网络的结构: 首先输入数据,这里我们输入的是音频的mfcc特征(不懂没关系,等会说) , 接着进行一个一维的空洞卷积,然后进入到残差块中,残差块是这样的结构:将进来的数据再进行一次空洞卷积,分两路,一路是用tanh()做激活函数,一路是用sigmoid做激活函数,最后又将两路合并,合并完成后,我们在经过一个一维的空洞卷积,这里得到的输出,我们又会进行两路处理,一路是进入下一次的残差块,一路是往右边发展,经过一个relu激活,一个一维空洞卷积,一个relu,一个一维空洞卷积,再接着softmax ,最后得到输出。。说道这里,大概好多人都有点晕。不要紧,接着往下看。一步一步解开谜团。

介绍几个前导知识:

概念一:空洞卷积是什么?

       很可惜,csdn放不了动图。。。。

            如上图,这是一个二维的3x3的空洞卷积(这里只看深蓝色的框) ,深蓝色和深蓝色框之间间隔了一个浅蓝色的框,这个浅蓝色的框就是空出来的,空洞的名字就是从这里来的,也就是卷积的时候,卷积的范围变大了,但是对于部分像素,我们不参与计算(这个图中就是5x5格子中浅蓝色的那些) 。 这里你可以把这个空洞卷积想象成咱们的正常卷积,如果是正常的卷积,这个卷积核就是5x5的。只不顾部分权重是零而已。  水平有限,只能说道这里。。

概念二:一维卷积

      我们在图像处理中经常采用的都是二维卷积,对于一维卷积感觉好默认。下面就简要的说明一下

假设我们有一个待卷积序列(左边) 。 有一个1x5的卷积核。 如果我们的步长为1,则卷积的过程是:

了解数据集

数据源:下载数据集    这里如果下载不来,可以评论练习我

     首先我们先了解数据:    .wav文件是音频, .wav.trn是音频所对应的文本文件。。总共有13388条数据

代码实现+讲解

1:加载所需的模块:

from keras.models import Model
from keras.layers import Input, Activation, Conv1D, Lambda, Add, Multiply, BatchNormalization
from keras.optimizers import Adam, SGD
from keras import backend as K
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
import random
import pickle
import glob
from tqdm import tqdm
from python_speech_features import mfcc
import scipy.io.wavfile as wav
import librosa
from IPython.display import Audio

2:读取文本文件,去掉一些换行符号等。。最后将每个文本中的中文保存到texts列表中,接着将每个音频的路径保存到paths中。见下面代码。注意看注释

# 读取语音所对应的文本数据
text_paths = glob.glob('data/*.trn')
total = len(text_paths)   # 统计文本的个数
print(total)

with open(text_paths[0], 'r', encoding='utf8') as fr:
    lines = fr.readlines()
    print(lines)    # 查看第一条文本的内容


# 提取文本标注和语音文件路径,保留中文并去掉空格
texts = []   # 放置文本
paths = []   # 存的是每个对应音频的路径
for path in text_paths:
    with open(path, 'r', encoding='utf8') as fr:
        lines = fr.readlines()
        line = lines[0].strip('\n').replace(' ', '')   # 因为我们原始数据中还包含有拼音,我们要将其去除掉
        texts.append(line)
        paths.append(path.rstrip('.trn'))   # 去掉.trn就是音频文件

print(paths[0], texts[0])   # 文本所在路径 以及所对应的文本

3:处理音频文件, 首先我们删除音频开始和结束部分中的静音部分。然后将计算各个音频所对应的的mfcc特征。这里mfcc你就可以理解为是一段很短音频的数字描述。 

# MFCC特征保留13维,定义加载语音文件并去掉两端静音的函数, 以及可视化语音文件的函数
mfcc_dim = 13


def load_and_trim(path):
    # 去掉两端的静音
    audio, sr = librosa.load(path)   # 加载音频
    energy = librosa.feature.rmse(audio)   # 计算能量
    frames = np.nonzero(energy >= np.max(energy) / 5)   # 如果能量大则保留 说明这段不是两端的静音
    indices = librosa.core.frames_to_samples(frames)[1]
    audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0]

    return audio, sr    # 去掉两端静音的音频和采样频率


def visualize(index):
    # 可视化
    path = paths[index]  # 获取某个音频
    text = texts[index]  # 获得所对应的文字
    print('Audio Text:', text)

    audio, sr = load_and_trim(path)
    plt.figure(figsize=(12, 3))
    plt.plot(np.arange(len(audio)), audio)
    plt.title('Raw Audio Signal')
    plt.xlabel('Time')
    plt.ylabel('Audio Amplitude')  # 音频高度
    plt.show()

    feature = mfcc(audio, sr, numcep=mfcc_dim, nfft=551)   # 计算mfcc特征
    print('Shape of MFCC:', feature.shape)

    fig = plt.figure(figsize=(12, 5))
    ax = fig.add_subplot(111)
    im = ax.imshow(feature, cmap=plt.cm.jet, aspect='auto')
    plt.title('Normalized MFCC')
    plt.ylabel('Time')
    plt.xlabel('MFCC Coefficient')
    plt.colorbar(im, cax=make_axes_locatable(ax).append_axes('right', size='5%', pad=0.05))
    ax.set_xticks(np.arange(0, 13, 2), minor=False);
    plt.show()

    return path


Audio(visualize(0))   # 只是用ipython时,让其显示在notebook中,不想在ipython中显示,直接visualize(0)就行了,我们只看一下第一节音频的特征


# 获取全部语音文件对应的MFCC特征
features = []    # 存取所有音频的mfcc特征
for i in tqdm(range(total)):
    path = paths[i]
    audio, sr = load_and_trim(path)    # 加载全部的语音
    features.append(mfcc(audio, sr, numcep=mfcc_dim, nfft=551))  # nfft每个多少采一下
    # 记住,这里的features是 [ []    ]
print(len(features), features[0].shape)  # 输出13388, (444, 13)
# 也就是说有13388段音频,,  (444, 13) 代表的是将第一段音频被分为444个小段,每个小段有13个mfcc特征

   这里我多啰嗦一句,我们数据集中一段音频可以切分成好多段短的音频,所以一段音频最后结构就变成了:[切分的段数, 13维特征]

 4:我们对mfcc特征进行归一化, 并将音频所对应的文字与数字进行映射,因为电脑不认识汉字,只认识数字

# 将MFCC特征进行归一化
samples = random.sample(features, 100)  # 这里是随机选了100段音频,将它们拼接起来,求均值,和方差。
samples = np.vstack(samples)

mfcc_mean = np.mean(samples, axis=0)
mfcc_std = np.std(samples, axis=0)
print(mfcc_mean)
print(mfcc_std)

# 运用上面的均值和方差对整体音频进行处理
features = [(feature - mfcc_mean) / (mfcc_std + 1e-14) for feature in features]


# 建立字典,共2883个不同的字
chars = {}
for text in texts:
    for c in text:
        chars[c] = chars.get(c, 0) + 1

chars = sorted(chars.items(), key=lambda x: x[1], reverse=True)
chars = [char[0] for char in chars]
print(len(chars), chars[:100])   # 输出2883  则有2993个汉字

char2id = {c: i for i, c in enumerate(chars)}
id2char = {i: c for i, c in enumerate(chars)}

5: 准备输入数据和输出数据(标签,也就是对应的汉字),我们在模型训练的时候,采用generator函数生成批数据往模型中送,代码如下:

# total是所有音频的个数,也可以说是所有文本的条数。
# 划分训练数据和测试数据 定义产生批数据的函数
data_index = np.arange(total)
np.random.shuffle(data_index)   # 将索引打乱
train_size = int(0.9 * total)   # 训练数据占90%
test_size = total - train_size
train_index = data_index[:train_size]   # 切分出来训练数据的索引
test_index = data_index[train_size:]   # 切分出来测试数据的索引

X_train = [features[i] for i in train_index]   # 取出训练音频的mfcc特征
Y_train = [texts[i] for i in train_index]    # 取出训练的标签,也就是所对应的文本
X_test = [features[i] for i in test_index]   # 同理 取测试
Y_test = [texts[i] for i in test_index]   # 同理 取测试

batch_size = 16


def batch_generator(x, y, batch_size=batch_size):
    # 主要是产生批数据
    offset = 0
    while True:
        offset += batch_size

        if offset == batch_size or offset >= len(x):    # 当偏移量大于x的长度时,将序号进行打乱。再进行训练
            data_index = np.arange(len(x))
            np.random.shuffle(data_index)
            x = [x[i] for i in data_index]   # [ [[13维],[13维],[13维],[13维],[13维]...], [], []...]
            y = [y[i] for i in data_index]   # [ [              汉字                  ], [], []...]
            offset = batch_size

        X_data = x[offset - batch_size: offset]     # 取出一批语音所对应的mfcc值
        Y_data = y[offset - batch_size: offset]     # 取出语音所对应的汉字

        X_maxlen = max([X_data[i].shape[0] for i in range(batch_size)])   # 不同的语音段切分多少段不一定。这里获取切分最长的长度
        Y_maxlen = max([len(Y_data[i]) for i in range(batch_size)])    # 语音对应的汉字也是不固定的,这里获取汉字最多的长度

        X_batch = np.zeros([batch_size, X_maxlen, mfcc_dim])       # 输入的维度
        Y_batch = np.ones([batch_size, Y_maxlen]) * len(char2id)   # 输出的维度
        X_length = np.zeros([batch_size, 1], dtype='int32')     # 输入的批量数  这是为了算ctc损失所需要的
        Y_length = np.zeros([batch_size, 1], dtype='int32')     # 输出的批量数  这是为了算ctc损失所需要的

        for i in range(batch_size):
            X_length[i, 0] = X_data[i].shape[0]    # 将每一条语音切分的长度复制给X_length   这里再强调一下,这只是为了算ctc损失
            X_batch[i, :X_length[i, 0], :] = X_data[i]   # 将每条数据放入到X_batch  这是我们最终构造的输入

            Y_length[i, 0] = len(Y_data[i])   # 将每条语音所对应的汉字所具有的长度复制为Y_length 这里再强调一下,这只是为了算ctc损失
            Y_batch[i, :Y_length[i, 0]] = [char2id[c] for c in Y_data[i]]   # 将每条汉字转为id  放进Y_batch  这是我们最终构造的标签

        inputs = {'X': X_batch, 'Y': Y_batch, 'X_length': X_length, 'Y_length': Y_length}
        outputs = {'ctc': np.zeros([batch_size])}   # ctc损失

        yield (inputs, outputs)

6:定义wavenet网络。这里对应着最前面的那个图看

# 定义训练参数和模型结构并开始训练
epochs = 50
num_blocks = 3
filters = 128    # 128个卷积核

X = Input(shape=(None, mfcc_dim,), dtype='float32', name='X')   # 输入的音频的那个
Y = Input(shape=(None,), dtype='float32', name='Y')   # 输入的是所对应的汉字
X_length = Input(shape=(1,), dtype='int32', name='X_length')
Y_length = Input(shape=(1,), dtype='int32', name='Y_length')


def conv1d(inputs, filters, kernel_size, dilation_rate):
    # causal指定是因果空洞卷积   dilation_rate指的空洞数
    return Conv1D(filters=filters, kernel_size=kernel_size, strides=1, padding='causal', activation=None,
                  dilation_rate=dilation_rate)(inputs)


def batchnorm(inputs):
    return BatchNormalization()(inputs)


def activation(inputs, activation):
    return Activation(activation)(inputs)


def res_block(inputs, filters, kernel_size, dilation_rate):
    hf = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'tanh')
    hg = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'sigmoid')
    h0 = Multiply()([hf, hg])

    ha = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh')
    hs = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh')

    return Add()([ha, inputs]), hs


h0 = activation(batchnorm(conv1d(X, filters, 1, 1)), 'tanh')
shortcut = []
for i in range(num_blocks):
    for r in [1, 2, 4, 8, 16]:
        h0, s = res_block(h0, filters, 7, r)
        shortcut.append(s)

h1 = activation(Add()(shortcut), 'relu')
h1 = activation(batchnorm(conv1d(h1, filters, 1, 1)), 'relu')
Y_pred = activation(batchnorm(conv1d(h1, len(char2id) + 1, 1, 1)), 'softmax')
sub_model = Model(inputs=X, outputs=Y_pred)

7:定义损失函数,并进行模型的训练。注意这里的损失使用的ctc损失。这就是我们前面为什么要传x_length, y_length原因

def calc_ctc_loss(args):
    y, yp, ypl, yl = args
    return K.ctc_batch_cost(y, yp, ypl, yl)


ctc_loss = Lambda(calc_ctc_loss, output_shape=(1,), name='ctc')([Y, Y_pred, X_length, Y_length])
model = Model(inputs=[X, Y, X_length, Y_length], outputs=ctc_loss)
optimizer = SGD(lr=0.02, momentum=0.9, nesterov=True, clipnorm=5)
model.compile(loss={'ctc': lambda ctc_true, ctc_pred: ctc_pred}, optimizer=optimizer)

checkpointer = ModelCheckpoint(filepath='asr.h5', verbose=0)
lr_decay = ReduceLROnPlateau(monitor='loss', factor=0.2, patience=1, min_lr=0.000)

history = model.fit_generator(
    generator=batch_generator(X_train, Y_train),
    steps_per_epoch=len(X_train) // batch_size,
    epochs=epochs,
    validation_data=batch_generator(X_test, Y_test),
    validation_steps=len(X_test) // batch_size,
    callbacks=[checkpointer, lr_decay])


# 保存模型和字典
sub_model.save('asr.h5')
with open('dictionary.pkl', 'wb') as fw:
    pickle.dump([char2id, id2char, mfcc_mean, mfcc_std], fw)

 最终的损失:

           训练集的损失已经降到非常低了,但是测试集损失还在45左右。。最终就维持在这个状态。这是因为我们的训练数据太小的缘故。。

 模型训练就到这里,我们调用一下模型,随机选取一段语音进行测试:

# -*- coding: utf-8 -*-

from keras.models import load_model
from keras import backend as K
import numpy as np
import librosa
from python_speech_features import mfcc
import pickle
import glob

wavs = glob.glob('data/*.wav')
with open('dictionary.pkl', 'rb') as fr:
    [char2id, id2char, mfcc_mean, mfcc_std] = pickle.load(fr)

mfcc_dim = 13
model = load_model('asr.h5')

index = np.random.randint(len(wavs))
print(wavs[index])

audio, sr = librosa.load(wavs[index])
energy = librosa.feature.rmse(audio)
frames = np.nonzero(energy >= np.max(energy) / 5)
indices = librosa.core.frames_to_samples(frames)[1]
audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0]
X_data = mfcc(audio, sr, numcep=mfcc_dim, nfft=551)
X_data = (X_data - mfcc_mean) / (mfcc_std + 1e-14)
print(X_data.shape)

with open(wavs[index] + '.trn', 'r', encoding='utf8') as fr:
    label = fr.readlines()[0]
    print(label)

pred = model.predict(np.expand_dims(X_data, axis=0))
pred_ids = K.eval(K.ctc_decode(pred, [X_data.shape[0]], greedy=False, beam_width=10, top_paths=1)[0][0])
pred_ids = pred_ids.flatten().tolist()
print(''.join([id2char[i] for i in pred_ids]))

   最终输出结果:

准确率很高,这是一条训练数据,所以拟合的比较好,因为我们训练集的损失已将快降到零了。。如果抽到测试集,就不一定这么好了。