Self Attention和Multi-Head Attention的原理和实现

引言

    使用深度学习做NLP的方法,一般是将单词转换为词向量序列,然后通过神经网络编码或者说提取这些词向量中的特征信息,继而根据不同任务进行不同的操作。提取特征的第一个方法是使用卷积神经网络,RNN结构简单,非常适合序列建模,但是缺点是无法并行运算,使得运算速度非常低。提取特征的第二个方法是使用卷积神经网络,但是效果并不是很好。第三个方法是使用注意力机制,注意力机制可以并行的提取序列特征。

    Attention机制本质上是人类视觉的注意力机制。人们视觉在感知东西的时候一般不会是一个场景从到头看到尾每次全部都看,而往往是根据需求观察注意特定的一部分。而且当人们发现一个场景经常在某部分出现自己想观察的东西时,人们会进行学习在将来再出现类似场景时把注意力放到该部分上。

    上一篇文章《基于Attention的自动标题生成》介绍了Attention机制如何应用到“编码-解码”架构中用于自然语言生成,那里面的Attention被称为“Encoder-Decoder Attention”,同时提到了Attention计算可以抽象为查询过程。简单的说,Self Attention就是Q、K、V均为同一个输入向量映射而来的Encoder-Decoder Attention,Multi-Head Attention同时计算多个Attention,并最终得到合并结果。


Self Attention原理

    self attention有什么优点呢,这里引用谷歌论文《Attention Is All You Need》里面说的,第一是计算复杂度小,第二是可以大量的并行计算,第三是可以更好的学习远距离依赖。Attention的计算公式如下:

0.png

    下面一步步分解self attention的计算过程(图来自https://jalammar.github.io/illustrated-transformer/):

  1. 输入单词表示向量,比如可以是词向量。

  2. 把输入向量映射到q、k、v三个变量,如下图:

    1.png

    比如上图X1和X2分别是Thinking和Machines这两个单词的词向量,q1和q2被称为查询向量,k称为键向量,v称为值向量。Wq,Wk,Wv都是随机初始化的映射矩阵。

  3. 计算Attention score,即某个单词的查询向量和各个单词对应的键向量的匹配度,匹配度可以通过加法或点积得到。图如下:

    2.png

  4. 减小score,并将score转换为权重。

    3.png

    其中dk是q k v的维度。score可以通过点积和加法得到,当dk较小时,这两种方法得到的结果很相似。但是点积的速度更快和省空间。但是当dk较大时,加法计算score优于点积结果没有除以dk^0.5的情况。原因可能是:the dot products grow large in magnitude, pushing the softmax function into regions where it has extremely small gradients。所以要先除以dk^0.5,再进行softmax。

  5. 权重乘以v,并求和。

    4.png

    最终的结果z就是x1这个单词的Attention向量。当同时计算所有单词的Attention时,图示如下:

    1. 将输入词向量转换为Q、K、V.

    5.png

    2. 直接计算Z

    6.png



Self Attention代码实现

    使用Keras自定义self attention层,代码如下:

from keras import initializers
from keras import activations
from keras import backend as K
from keras.engine.topology import Layer

class MySelfAttention(Layer):
    
    def __init__(self,output_dim,kernel_initializer='glorot_uniform',**kwargs):
        self.output_dim=output_dim
        self.kernel_initializer = initializers.get(kernel_initializer)
        super(MySelfAttention,self).__init__(**kwargs)
        
    def build(self,input_shape):
        self.W=self.add_weight(name='W',
             shape=(3,input_shape[2],self.output_dim),
             initializer=self.kernel_initializer,
             trainable=True)
        self.built = True
        
    def call(self,x):
        q=K.dot(x,self.W[0])
        k=K.dot(x,self.W[1])
        v=K.dot(x,self.W[2])
        #print('q_shape:'+str(q.shape))
        e=K.batch_dot(q,K.permute_dimensions(k,[0,2,1]))#把k转置,并与q点乘
        e=e/(self.output_dim**0.5)
        e=K.softmax(e)
        o=K.batch_dot(e,v)
        return o
        
    def compute_output_shape(self,input_shape):
        return (input_shape[0],input_shape[1],self.output_dim)




Multi-Head Attention原理

    不同的随机初始化映射矩阵Wq,Wk,Wv可以将输入向量映射到不同的子空间,这可以让模型从不同角度理解输入的序列。因此同时几个Attention的组合效果可能会优于单个Attenion,这种同时计算多个Attention的方法被称为Multi-Head Attention,或者多头注意力。

    每个“Head”都会产生一个输出向量z,但是我们一般只需要一个,因此还需要一个矩阵把多个合并的注意力向量映射为单个向量。图示如下:

   7.png



Multi-Head Attention代码实现

    还是使用Keras实现multi-head attention,代码如下:

from keras import initializers
from keras import activations
from keras import backend as K
from keras.engine.topology import Layer


class MyMultiHeadAttention(Layer):
    def __init__(self,output_dim,num_head,kernel_initializer='glorot_uniform',**kwargs):
        self.output_dim=output_dim
        self.num_head=num_head
        self.kernel_initializer = initializers.get(kernel_initializer)
        super(MyMultiHeadAttention,self).__init__(**kwargs)
        
    def build(self,input_shape):
        self.W=self.add_weight(name='W',
           shape=(self.num_head,3,input_shape[2],self.output_dim),
           initializer=self.kernel_initializer,
           trainable=True)
        self.Wo=self.add_weight(name='Wo',
           shape=(self.num_head*self.output_dim,self.output_dim),
           initializer=self.kernel_initializer,
           trainable=True)
        self.built = True
        
    def call(self,x):
        q=K.dot(x,self.W[0,0])
        k=K.dot(x,self.W[0,1])
        v=K.dot(x,self.W[0,2])
        e=K.batch_dot(q,K.permute_dimensions(k,[0,2,1]))#把k转置,并与q点乘
        e=e/(self.output_dim**0.5)
        e=K.softmax(e)
        outputs=K.batch_dot(e,v)
        for i in range(1,self.W.shape[0]):
            q=K.dot(x,self.W[i,0])
            k=K.dot(x,self.W[i,1])
            v=K.dot(x,self.W[i,2])
            #print('q_shape:'+str(q.shape))
            e=K.batch_dot(q,K.permute_dimensions(k,[0,2,1]))#把k转置,并与q点乘
            e=e/(self.output_dim**0.5)
            e=K.softmax(e)
            #print('e_shape:'+str(e.shape))
            o=K.batch_dot(e,v)
            outputs=K.concatenate([outputs,o])
        z=K.dot(outputs,self.Wo)
        return z
        
    def compute_output_shape(self,input_shape):
        return (input_shape[0],input_shape[1],self.output_dim)




新闻分类

    新闻分类应该是自然语言处理的一个很经典的任务,这里用Self Attention和Multi-Head Self Attention编码输入的新闻,并加上全连接网络进行新闻的分类。

    使用的数据集和上一篇博文一样,数据集存放的位置为百度网盘:链接:https://pan.baidu.com/s/1riEHnI7KW_1alVdXurF95Q 提取码:jhpi 。

    下面开始给出代码:

1. 读取数据集

data_file_path='D:/NLP/dataset/news/cnews.train.txt'
with open(data_file_path,'r',encoding='utf-8') as f:
    lines=f.readlines()
class_dict={'体育':0,'娱乐':1,'家居':2,'房产':3,'教育':4,'时尚':5,'时政':6,'游戏':7,'科技':8,'财经':9}
texts=[]
classes=[]
for line in lines:
    cls=line[:2]
    if(cls in class_dict):
        classes.append(class_dict[cls])
        texts.append(line[3:])
print(len(texts))
print(len(classes))


2.数据预处理

import jieba
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
import tqdm

inputTextList=[' '.join([w for w in jieba.cut(text)]) for text in tqdm.tqdm(texts)]
tokenizer=Tokenizer()
tokenizer.fit_on_texts(texts=inputTextList)
word_index=tokenizer.word_index
print(len(word_index))

上面的代码输出分词后的单词总量,有三十多万,但是我的显存告诉我,它带不动那么多单词,因此我这里设置使用前30000个单词。

#确定保留词的数量
MAX_WORDS=30000
tokenizer.num_words=MAX_WORDS
input_sequences=tokenizer.texts_to_sequences(texts=inputTextList)

输出一下语料的长度情况

texts_lens=[]
for line in tqdm.tqdm(input_sequences):
    texts_lens.append(len(line))
texts_lens.sort()
print('text_len_avg:%f'%(sum(texts_lens)/len(texts)))
print('text_len_middle:%f'%(texts_lens[int(len(texts)/2)]))
print('text_len_min:%f'%(texts_lens[0]))
print('text_len_max:%f'%(texts_lens[len(texts)-1]))

我的显存又告诉我,它快炸了,那我只好设置输入新闻的长度为200,并截断和padding数据。

import numpy as np
Tx=200
input_arr=[]
for line in tqdm.tqdm(input_sequences):
    slen=len(line)
    if(slen<Tx):
        newline=line+[0]*(Tx-slen)
        input_arr.append(newline)
    else:
        input_arr.append(line[:Tx])
input_arr=np.array(input_arr)
print(input_arr.shape)

标签one-hot一下

from keras.utils import to_categorical
labels=to_categorical(classes)
print(labels.shape)


3.定义模型

首先把词嵌入文件读取进来

with open(r'D:\NLP\wordvector\sgns.zhihu.word\sgns.zhihu.word', 'r',encoding='utf-8') as f:
    word_vec = {}
    for line in tqdm.tqdm(f):
        line = line.strip().split()
        curr_word = line[0]
        word_vec[curr_word] = np.array(line[1:], dtype=np.float64)

接着设置词嵌入层

from keras.layers import *
def pretrained_embedding_layer(word_vec, word_index):
    vocab_len = MAX_WORDS + 1        # Keras Embedding的API要求+1
    emb_dim = 300
    emb_matrix = np.zeros((vocab_len, emb_dim))
    
    # 用词向量填充embedding矩阵
    for word, index in word_index.items():
        vec = word_vec.get(word, np.zeros(emb_dim))
        if(index>=MAX_WORDS):
            break
        emb_matrix[index, :] = vec
    # 定义Embedding层,并指定不需要训练该层的权重
    embedding_layer = Embedding(vocab_len, emb_dim, trainable=False)
    embedding_layer.build((None,))# build
    embedding_layer.set_weights([emb_matrix])
    return embedding_layer
    
embedding_layer = pretrained_embedding_layer(word_vec, word_index)

然后定义Attention层

from keras import initializers
from keras import activations
from keras import backend as K
from keras.engine.topology import Layer


class MySelfAttention(Layer):
    
    def __init__(self,output_dim,kernel_initializer='glorot_uniform',**kwargs):
        self.output_dim=output_dim
        self.kernel_initializer = initializers.get(kernel_initializer)
        super(MySelfAttention,self).__init__(**kwargs)
        
    def build(self,input_shape):
        self.W=self.add_weight(name='W',
                                shape=(3,input_shape[2],self.output_dim),
                                initializer=self.kernel_initializer,
                                trainable=True)
        
        self.built = True
        
    def call(self,x):
        q=K.dot(x,self.W[0])
        k=K.dot(x,self.W[1])
        v=K.dot(x,self.W[2])
        #print('q_shape:'+str(q.shape))
        e=K.batch_dot(q,K.permute_dimensions(k,[0,2,1]))#把k转置,并与q点乘
        e=e/(self.output_dim**0.5)
        e=K.softmax(e)
        #print('e_shape:'+str(e.shape))
        o=K.batch_dot(e,v)
        #print('o_shape:'+str(o.shape))
        return o
        
    def compute_output_shape(self,input_shape):
        return (input_shape[0],input_shape[1],self.output_dim)

class MyMultiHeadAttention(Layer):
    def __init__(self,output_dim,num_head,kernel_initializer='glorot_uniform',**kwargs):
        self.output_dim=output_dim
        self.num_head=num_head
        self.kernel_initializer = initializers.get(kernel_initializer)
        super(MyMultiHeadAttention,self).__init__(**kwargs)
        
    def build(self,input_shape):
        self.W=self.add_weight(name='W',
                                shape=(self.num_head,3,input_shape[2],self.output_dim),
                                initializer=self.kernel_initializer,
                                trainable=True)
        self.Wo=self.add_weight(name='Wo',
                                shape=(self.num_head*self.output_dim,self.output_dim),
                                initializer=self.kernel_initializer,
                                trainable=True)
        self.built = True
        
    def call(self,x):
        q=K.dot(x,self.W[0,0])
        k=K.dot(x,self.W[0,1])
        v=K.dot(x,self.W[0,2])
        e=K.batch_dot(q,K.permute_dimensions(k,[0,2,1]))#把k转置,并与q点乘
        e=e/(self.output_dim**0.5)
        e=K.softmax(e)
        outputs=K.batch_dot(e,v)
        for i in range(1,self.W.shape[0]):
            q=K.dot(x,self.W[i,0])
            k=K.dot(x,self.W[i,1])
            v=K.dot(x,self.W[i,2])
            #print('q_shape:'+str(q.shape))
            e=K.batch_dot(q,K.permute_dimensions(k,[0,2,1]))#把k转置,并与q点乘
            e=e/(self.output_dim**0.5)
            e=K.softmax(e)
            #print('e_shape:'+str(e.shape))
            o=K.batch_dot(e,v)
            outputs=K.concatenate([outputs,o])
        z=K.dot(outputs,self.Wo)
        return z
        
    def compute_output_shape(self,input_shape):
        return (input_shape[0],input_shape[1],self.output_dim)

先来测试一下Self Attention层,定义模型

from keras.models import Sequential,Model
from keras.layers import Dense,SimpleRNN,Embedding,Flatten
#定义模型
seq=Input(shape=(Tx,))# 定义输入层
# Embedding层
embed = embedding_layer(seq)
att=MySelfAttention(128)(embed)
t=Flatten()(att)
t=Dense(256, activation = "relu")(t)
out=Dense(10, activation = "softmax")(t)
model = Model(seq, out)


4.训练模型

先编译模型

out = model.compile(optimizer='rmsprop',
                    metrics=['accuracy'],
                    loss='categorical_crossentropy')

然后打乱数据,并划分训练数据和验证数据。

permutation = np.random.permutation(input_arr.shape[0])
x = input_arr[permutation]
y = labels[permutation]
x_val=x[:2000]
y_val=y[:2000]
x_train=x[2000:]
y_train=y[2000:]

最后训练模型

history=model.fit(x_train, y_train, 
                  batch_size=32, 
                  epochs=5,
                  verbose=1,
                  validation_data=(x_val,y_val)
                 )

看一看训练结果

import matplotlib.pyplot as plt
%matplotlib inline

# 绘制训练 & 验证的准确率值
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()
# 绘制训练 & 验证的损失值
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

下面的结果是self attention的

8.png

同理定义multi-head attention,训练结果如下:

9.png

由于只训练了5轮,对比并不十分明显。




参考文献

[1]robert_ai.自然语言处理中的自注意力机制(Self-attention Mechanism).https://www.cnblogs.com/robert-dlut/p/8638283.html. 2018-03-24

[2]Jay Alammar.The Illustrated Transformer.https://jalammar.github.io/illustrated-transformer/.2018-6-27



首页 所有文章 机器人 计算机视觉 自然语言处理 机器学习 编程随笔 关于