编码实现RNN以及LSTM

本次涉及代码较多,部分重复代码不再重复发布。重复的代码详见以下链接:

https://blog.csdn.net/qq_37402392/article/details/121468321?spm=1001.2014.3001.5501

bn_layers.py

cnn_layers.py

dropout_layers.py

layers.py

updater.py

以下是本次添加的内容。

captioning_trainer.py

训练器,与之前的训练器Trainer类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#-*- coding: utf-8 -*-
import numpy as np

from coco_utils import *
import updater

class CaptioningTrainer(object):
"""
CaptioningTrainer大部分内容和前面的Trainer相同
使用方法:
data = load_coco_data()
model = MyAwesomeModel(hidden_dim=100)
trainer = CaptioningTrainer(model, data,
update_rule='sgd',
updater_config={
'learning_rate': 1e-3,
},
lr_decay=0.95,
num_epochs=10, batch_size=100,
print_every=100)
trainer.train()
"""

def __init__(self, model, data, **kwargs):
"""
初始化CaptioningTrainer
所需参数:
- model: RNN模型
- data: coco数据集

可选参数:
- update_rule:更新规则,查看 updater.py.
默认为 'sgd'.
- updater_config: 更新器配置
- lr_decay:学习率衰减因子
- batch_size: 批量大小
- num_epochs: 迭代次数
- print_every:每训练多少次,打印训练结果
- verbose:是否打印训练中间结果
"""
self.model = model
self.data = data

self.update_rule = kwargs.pop('update_rule', 'sgd')
self.updater_config = kwargs.pop('updater_config', {})
self.lr_decay = kwargs.pop('lr_decay', 1.0)
self.batch_size = kwargs.pop('batch_size', 100)
self.num_epochs = kwargs.pop('num_epochs', 10)

self.print_every = kwargs.pop('print_every', 10)
self.verbose = kwargs.pop('verbose', True)

if len(kwargs) > 0:
extra = ', '.join('"%s"' % k for k in kwargs.keys())
raise ValueError('Unrecognized arguments %s' % extra)

if not hasattr(updater, self.update_rule):
raise ValueError('Invalid update_rule "%s"' % self.update_rule)
self.update_rule = getattr(updater, self.update_rule)

self._reset()


def _reset(self):
self.epoch = 0
self.best_val_acc = 0
self.best_params = {}
self.loss_history = []
self.train_acc_history = []
self.val_acc_history = []

self.updater_configs = {}
for p in self.model.params:
d = {k: v for k, v in self.updater_config.items()}
self.updater_configs[p] = d


def _step(self):
minibatch = sample_coco_minibatch(self.data,
batch_size=self.batch_size,
split='train')
captions, features, urls = minibatch

loss, grads = self.model.loss(features, captions)
self.loss_history.append(loss)

for p, w in self.model.params.items():
dw = grads[p]
config = self.updater_configs[p]
next_w, next_config = self.update_rule(w, dw, config)
self.model.params[p] = next_w
self.updater_configs[p] = next_config

def train(self):
num_train = self.data['train_captions'].shape[0]
iterations_per_epoch = max(num_train / self.batch_size, 1)
num_iterations = int(self.num_epochs * iterations_per_epoch)

for t in range(num_iterations):
self._step()


if self.verbose and t % self.print_every == 0:
print('(Iteration %d / %d) loss: %f' % (
t + 1, num_iterations, self.loss_history[-1]))

epoch_end = (t + 1) % iterations_per_epoch == 0
if epoch_end:
self.epoch += 1
for k in self.updater_configs:
self.updater_configs[k]['learning_rate'] *= self.lr_decay


coco_utils.py

对coco数据集进行处理,包括了读取数据文件、解码数据以及小批量读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os, json
import numpy as np
import h5py


def load_coco_data(base_dir='datasets/coco_captioning', max_train=None,
pca_features=True):
'''
读取CoCo训练文件

Parameters
----------
base_dir : TYPE, optional
数据文件位置. The default is 'datasets/coco_captioning'.
max_train : TYPE, optional
是否对训练数据进行再抽样. The default is None.
pca_features : TYPE, optional
是否使用降维特征. The default is True.

Returns
-------
data : TYPE
读取的数据文件.

'''
# 保存数据
data = {}
# 获得文件
caption_file = os.path.join(base_dir, 'coco2014_captions.h5')
# 添加文件汇总的内容
with h5py.File(caption_file, 'r') as f:
for k, v in f.items():
data[k] = np.asarray(v)

# 是否使用降维特征
if pca_features:
train_feat_file = os.path.join(base_dir, 'train2014_vgg16_fc7_pca.h5')
else:
train_feat_file = os.path.join(base_dir, 'train2014_vgg16_fc7.h5')
with h5py.File(train_feat_file, 'r') as f:
data['train_features'] = np.asarray(f['features'])

if pca_features:
val_feat_file = os.path.join(base_dir, 'val2014_vgg16_fc7_pca.h5')
else:
val_feat_file = os.path.join(base_dir, 'val2014_vgg16_fc7.h5')
with h5py.File(val_feat_file, 'r') as f:
data['val_features'] = np.asarray(f['features'])

dict_file = os.path.join(base_dir, 'coco2014_vocab.json')
with open(dict_file, 'r') as f:
dict_data = json.load(f)
for k, v in dict_data.items():
data[k] = v

train_url_file = os.path.join(base_dir, 'train2014_urls.txt')
with open(train_url_file, 'r') as f:
train_urls = np.asarray([line.strip() for line in f])
data['train_urls'] = train_urls

val_url_file = os.path.join(base_dir, 'val2014_urls.txt')
with open(val_url_file, 'r') as f:
val_urls = np.asarray([line.strip() for line in f])
data['val_urls'] = val_urls

# 也许对训练数据进行再抽样
if max_train is not None:
num_train = data['train_captions'].shape[0]
mask = np.random.randint(num_train, size=max_train)
data['train_captions'] = data['train_captions'][mask]
data['train_image_idxs'] = data['train_image_idxs'][mask]

return data


def decode_captions(captions, idx_to_word):
singleton = False
if captions.ndim == 1:
singleton = True
captions = captions[None]
decoded = []
N, T = captions.shape
for i in range(N):
words = []
for t in range(T):
word = idx_to_word[captions[i, t]]
if word != '<NULL>':
words.append(word)
if word == '<END>':
break
decoded.append(' '.join(words))
if singleton:
decoded = decoded[0]
return decoded


def sample_coco_minibatch(data, batch_size=100, split='train'):
split_size = data['%s_captions' % split].shape[0]
mask = np.random.choice(split_size, batch_size)
captions = data['%s_captions' % split][mask]
image_idxs = data['%s_image_idxs' % split][mask]
image_features = data['%s_features' % split][image_idxs]
urls = data['%s_urls' % split][image_idxs]
return captions, image_features, urls

image_utils.py

通用的方法用于展示过程中的图片。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import urllib.request, os, tempfile

import numpy as np
from scipy.misc import imread

from cnn_layers import conv_forward_fast


"""
Utility functions used for viewing and processing images.
"""


def blur_image(X):
"""
A very gentle image blurring operation, to be used as a regularizer for image
generation.

Inputs:
- X: Image data of shape (N, 3, H, W)

Returns:
- X_blur: Blurred version of X, of shape (N, 3, H, W)
"""
w_blur = np.zeros((3, 3, 3, 3))
b_blur = np.zeros(3)
blur_param = {'stride': 1, 'pad': 1}
for i in range(3):
w_blur[i, i] = np.asarray([[1, 2, 1], [2, 188, 2], [1, 2, 1]], dtype=np.float32)
w_blur /= 200.0
return conv_forward_fast(X, w_blur, b_blur, blur_param)[0]


def preprocess_image(img, mean_img, mean='image'):
"""
Convert to float, transepose, and subtract mean pixel

Input:
- img: (H, W, 3)

Returns:
- (1, 3, H, 3)
"""
if mean == 'image':
mean = mean_img
elif mean == 'pixel':
mean = mean_img.mean(axis=(1, 2), keepdims=True)
elif mean == 'none':
mean = 0
else:
raise ValueError('mean must be image or pixel or none')
return img.astype(np.float32).transpose(2, 0, 1)[None] - mean


def deprocess_image(img, mean_img, mean='image', renorm=False):
"""
Add mean pixel, transpose, and convert to uint8

Input:
- (1, 3, H, W) or (3, H, W)

Returns:
- (H, W, 3)
"""
if mean == 'image':
mean = mean_img
elif mean == 'pixel':
mean = mean_img.mean(axis=(1, 2), keepdims=True)
elif mean == 'none':
mean = 0
else:
raise ValueError('mean must be image or pixel or none')
if img.ndim == 3:
img = img[None]
img = (img + mean)[0].transpose(1, 2, 0)
if renorm:
low, high = img.min(), img.max()
img = 255.0 * (img - low) / (high - low)
return img.astype(np.uint8)


def image_from_url(url):
"""
Read an image from a URL. Returns a numpy array with the pixel data.
We write the image to a temporary file then read it back. Kinda gross.
"""
try:
f = urllib.request.urlopen(url)
_, fname = tempfile.mkstemp()
with open(fname, 'wb') as ff:
ff.write(f.read())
img = imread(fname)
os.remove(fname)
return img
except urllib.request.URLError as e:
print('URL Error: ', e.reason, url)
except urllib.request.HTTPError as e:
print('HTTP Error: ', e.code, url)

rnn.py

图片说明任务RNN网络类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#-*- coding: utf-8 -*-
import numpy as np

from layers import *
from rnn_layers import *


class CaptioningRNN(object):
"""
处理图片说明任务RNN网络
注意:不使用正则化
"""

def __init__(self, word_to_idx, input_dim=512, wordvec_dim=128,
hidden_dim=128, cell_type='rnn'):
"""
初始化CaptioningRNN
Inputs:
- word_to_idx: 单词字典,用于查询单词索引对应的词向量
- input_dim: 输入图片数据维度
- wordvec_dim: 词向量维度.
- hidden_dim: RNN隐藏层维度.
- cell_type: 细胞类型; 'rnn' 或 'lstm'.
"""
# 参数检验
if cell_type not in {'rnn', 'lstm'}:
raise ValueError('Invalid cell_type "%s"' % cell_type)

# 初始化数据
self.cell_type = cell_type
self.word_to_idx = word_to_idx
self.idx_to_word = {i: w for w, i in word_to_idx.items()}
self.params = {}

vocab_size = len(word_to_idx)

self._null = word_to_idx['<NULL>']
self._start = word_to_idx.get('<START>', None)
self._end = word_to_idx.get('<END>', None)

# 初始化词向量
self.params['W_embed'] = np.random.randn(vocab_size, wordvec_dim)
self.params['W_embed'] /= 100

# 初始化 CNN -> 隐藏层参数,用于将图片特征提取到RNN中
self.params['W_proj'] = np.random.randn(input_dim, hidden_dim)
self.params['W_proj'] /= np.sqrt(input_dim)
self.params['b_proj'] = np.zeros(hidden_dim)

# 初始化RNN参数
dim_mul = {'lstm': 4, 'rnn': 1}[cell_type]
self.params['Wx'] = np.random.randn(wordvec_dim, dim_mul * hidden_dim)
self.params['Wx'] /= np.sqrt(wordvec_dim)
self.params['Wh'] = np.random.randn(hidden_dim, dim_mul * hidden_dim)
self.params['Wh'] /= np.sqrt(hidden_dim)
self.params['b'] = np.zeros(dim_mul * hidden_dim)

# 初始化输出层参数
self.params['W_vocab'] = np.random.randn(hidden_dim, vocab_size)
self.params['W_vocab'] /= np.sqrt(hidden_dim)
self.params['b_vocab'] = np.zeros(vocab_size)


def loss(self, features, captions):
"""
计算RNN或LSTM的损失值。
Inputs:
- features: 输入图片特征(N, D)。
- captions: 图像文字说明(N, T)。

Returns 元组:
- loss: 损失值。
- grads:梯度。
"""
#将文字切分为两段:captions_in除去最后一词用于RNN输入
#captions_out除去第一个单词,用于RNN输出配对
captions_in = captions[:, :-1]
captions_out = captions[:, 1:]

# 掩码
mask = (captions_out != self._null)

# 图像仿射转换矩阵
W_proj, b_proj = self.params['W_proj'], self.params['b_proj']

# 词嵌入矩阵
W_embed = self.params['W_embed']

# RNN参数
Wx, Wh, b = self.params['Wx'], self.params['Wh'], self.params['b']

# 隐藏层输出转化矩阵
W_vocab, b_vocab = self.params['W_vocab'], self.params['b_vocab']

loss, grads = 0.0, {}
############################################################################
# 任务:实现CaptioningRNN传播 #
# (1)使用仿射变换(features,W_proj,b_proj), #
# 将图片特征输入进隐藏层初始状态h0(N,H) #
# (2)使用词嵌入层将captions_in中的单词索引转换为词向量(N,T,W) #
# (3)使用RNN或LSTM处理词向量(N,T,H) #
# (4)使用时序仿射传播temporal_affine_forward计算各单词得分(N,T,V) #
# (5)使用temporal_softmax_loss计算损失值 #
############################################################################
# 1 使用仿射变换(features,W_proj,b_proj),将图片特征输入进隐藏层初始状态h0(N,H)
h0, cache_h0 = affine_forward(features, W_proj, b_proj)
# 2 使用词嵌入层将captions_in中的单词索引转换为词向量(N,T,W)
x, cache_embedding = word_embedding_forward(captions_in, W_embed)
# 3 使用RNN或LSTM处理词向量(N,T,H)
if self.cell_type == 'rnn':
out_h, cache_rnn = rnn_forward(x, h0, Wx, Wh, b)
elif self.cell_type == 'lstm':
out_h, cache_rnn = lstm_forward(x, h0, Wx, Wh, b)
else:
raise ValueError('Invalid cell_type "%s"' % self.cell_type)
# 4 使用时序仿射传播temporal_affine_forward计算各单词得分(N,T,V)
yHat, cache_out = temporal_affine_forward(out_h, W_vocab, b_vocab)
# 5 使用temporal_softmax_loss计算损失值
loss, dy = temporal_softmax_loss(yHat, captions_out, mask, verbose=False)
# 计算梯度
dout_h, dW_vocab, db_vocab = temporal_affine_backward(dy, cache_out)
# 输出层到隐藏层的反向传播
if self.cell_type == 'rnn':
dx, dh0, dWx, dWh, db = rnn_backward(dout_h, cache_rnn)
elif self.cell_type == 'lstm':
dx, dh0, dWx, dWh, db = lstm_backward(dout_h, cache_rnn)
else:
raise ValueError('Invalid cell_type "%s"' % self.cell_type)
# 隐藏层到隐藏层自身的反向传播
dW_embed = word_embedding_backward(dx, cache_embedding)
# 隐藏层到输入层的反向传播
dfeatures, dW_proj, db_proj = affine_backward(dh0, cache_h0)
# 记录梯度
grads['W_proj'] = dW_proj
grads['b_proj'] = db_proj
grads['W_embed'] = dW_embed
grads['Wx'] = dWx
grads['Wh'] = dWh
grads['b'] = db
grads['W_vocab'] = dW_vocab
grads['b_vocab'] = db_vocab
############################################################################
# 结束编码 #
############################################################################
return loss, grads


def sample(self, features, max_length=30):
"""
测试阶段的前向传播过程,采样一批图片说明作为输入
Inputs:
- features: 图片特征(N, D).
- max_length:生成说明文字的最大长度

Returns:
- captions: 说明文字的字典索引串(N, max_length)
"""
N = features.shape[0]
captions = self._null * np.ones((N, max_length), dtype=np.int32)

W_proj, b_proj = self.params['W_proj'], self.params['b_proj']
W_embed = self.params['W_embed']
Wx, Wh, b = self.params['Wx'], self.params['Wh'], self.params['b']
W_vocab, b_vocab = self.params['W_vocab'], self.params['b_vocab']

###########################################################################
# 任务:测试阶段前向传播 #
# 提示:(1)第一个单词应该是<START>标记,captions[:,0]=self._start #
# (2)当前单词输入为之前RNN的输出 #
# (3)前向传播过程为预测当前单词的下一个单词, #
# 你需要计算所有单词得分,然后选取最大得分作为预测单词 #
# (4)你无法使用rnn_forward 或 lstm_forward函数, #
# 你需要循环调用rnn_step_forward或lstm_step_forward函数 #
###########################################################################
# 获取数据
N, D = features.shape
affine_out, affine_cache = affine_forward(features, W_proj, b_proj)
prev_word_idx = [self._start]*N
prev_h = affine_out
prev_c = np.zeros(prev_h.shape)
# 1第一个单词应该是<START>标记
captions[:, 0] = self._start
for i in range(1, max_length):
# 2当前单词输入为之前RNN的输出
prev_word_embed = W_embed[prev_word_idx]
# 4循环调用rnn_step_forward或lstm_step_forward函数
if self.cell_type == 'rnn':
next_h, rnn_step_cache = rnn_step_forward(prev_word_embed, prev_h,
Wx, Wh, b)
elif self.cell_type == 'lstm':
next_h, next_c, lstm_step_cache = lstm_step_forward(prev_word_embed, prev_h,
prev_c, Wx, Wh, b)
prev_c = next_c
else:
raise ValueError('Invalid cell_type "%s"' % self.cell_type)
vocab_affine_out, vocab_affine_out_cache = affine_forward(next_h,
W_vocab, b_vocab)
# 3计算所有单词得分,然后选取最大得分作为预测单词
captions[:, i] = list(np.argmax(vocab_affine_out, axis=1))
prev_word_idx = captions[:, i]
prev_h = next_h
############################################################################
# 结束编码 #
############################################################################
return captions

rnn_layers.py

RNN隐藏层需要使用到的方法,包括了RNN、LSTM以及词嵌入的前向传播和反向传播。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
#-*- coding: utf-8 -*-
import numpy as np

def rnn_step_forward(x, prev_h, Wx, Wh, b):
"""
RNN单步前向传播,使用tanh激活单元
Inputs:
- x: 当前时间步数据输入(N, D).
- prev_h: 前一时间步隐藏层状态 (N, H)
- Wx: 输入层到隐藏层连接权重(D, H)
- Wh:隐藏层到隐藏层连接权重(H, H)
- b: 隐藏层偏置项(H,)

Returns 元组:
- next_h: 下一隐藏层状态(N, H)
- cache: 缓存
"""
next_h, cache = None, None
##############################################################################
# 任务:实现RNN单步前向传播 #
# 将输出值储存在next_h中, #
# 将反向传播时所需的各项缓存存放在cache中 #
##############################################################################
# 计算神经元输入
a = prev_h.dot(Wh)+x.dot(Wx)+b
# 神经元激活
next_h = np.tanh(a)
# 保留过程中的数据
cache = (x, prev_h, Wh, Wx, b, next_h)
##############################################################################
# 结束编码 #
##############################################################################
return next_h, cache


def rnn_step_backward(dnext_h, cache):
"""
RNN单步反向传播。
Inputs:
- dnext_h: 后一时间片段的梯度。
- cache: 前向传播时的缓存。

Returns 元组:
- dx: 数据梯度(N, D)。
- dprev_h: 前一时间片段梯度(N, H)。
- dWx: 输入层到隐藏层权重梯度(D,H)。
- dWh: 隐藏层到隐藏层权重梯度(H, H)。
- db: 偏置项梯度(H,)。
"""
dx, dprev_h, dWx, dWh, db = None, None, None, None, None
##############################################################################
# 任务:实现RNN单步反向传播 #
# 提示:tanh(x)梯度: 1 - tanh(x)*tanh(x) #
##############################################################################
# 获取缓存数据
x, prev_h, Wh, Wx, b, next_h = cache
# 根据链式求导法则依次计算各个变量的梯度
dscores = dnext_h*(1-next_h*next_h)
dWx = np.dot(x.T, dscores)
db = np.sum(dscores, axis=0)
dWh = np.dot(prev_h.T, dscores)
dx = np.dot(dscores, Wx.T)
dprev_h = np.dot(dscores, Wh.T)
##############################################################################
# 结束编码 #
##############################################################################
return dx, dprev_h, dWx, dWh, db


def rnn_forward(x, h0, Wx, Wh, b):
"""
RNN前向传播。
Inputs:
- x: 完整的时序数据 (N, T, D)。
- h0: 隐藏层初始化状态 (N, H)。
- Wx: 输入层到隐藏层权重 (D, H)。
- Wh: 隐藏层到隐藏层权重(H, H)。
- b: 偏置项(H,)。

Returns 元组:
- h: 所有时间步隐藏层状态(N, T, H)。
- cache: 反向传播所需的缓存。
"""
h, cache = None, None
##############################################################################
# 任务:实现RNN前向传播。 #
# 提示: 使用前面实现的rnn_step_forward 函数。 #
##############################################################################
# 获取数据维度
N, T, D = x.shape
(H, ) = b.shape
# 初始化h
h = np.zeros((N, T, H))
# 获取默认隐藏层状态
prev_h = h0
# 遍历所有时间
for t in range(T):
# 获取当前时间片段
xt = x[:, t, :]
# 计算每一个片段
next_h, _ = rnn_step_forward(xt, prev_h, Wx, Wh, b)
# 更新状态
prev_h = next_h
# 保留结果
h[:, t, :] = prev_h
# 数据缓存,
cache = (x, h0, Wh, Wx, b, h)
##############################################################################
# 结束编码 #
##############################################################################
return h, cache


def rnn_backward(dh, cache):
"""
RNN反向传播。
Inputs:
- dh: 隐藏层所有时间步梯度(N, T, H)。
Returns 元组:
- dx: 输入数据时序梯度(N, T, D)。
- dh0: 初始隐藏层梯度(N, H)。
- dWx: 输入层到隐藏层权重梯度(D, H)。
- dWh: 隐藏层到隐藏层权重梯度(H, H)。
- db: 偏置项梯度(H,)。
"""
dx, dh0, dWx, dWh, db = None, None, None, None, None
##############################################################################
# 任务:实现RNN反向传播。 #
# 提示:使用 rnn_step_backward函数。 #
##############################################################################
# 获取缓存数据
x, h0, Wh, Wx, b, h = cache
# 获取数据维度
N, T, H = dh.shape
_, _, D = x.shape
# 得到最后的细胞状态
next_h = h[:, T-1, :]
# 初始化
dprev_h = np.zeros((N, H))
dx = np.zeros((N, T, D))
dh0 = np.zeros((N, H))
dWx = np.zeros((D, H))
dWh = np.zeros((H, H))
db = np.zeros((H,))
# 遍历所有时间片段
for t in range(T):
# 当前处理的时间片段(从后往前)
t = T-1-t
# 获取对应的数据
xt = x[:, t, :]
# 最初时间片段的之前细胞状态默认为h0
if t == 0:
prev_h = h0
else:
prev_h = h[:, t-1, :]
# 获取缓存数据
step_cache = (xt, prev_h, Wh, Wx, b, next_h)
# 更新状态
next_h = prev_h
dnext_h = dh[:, t, :]+dprev_h
# 进行反向传播
dx[:, t, :], dprev_h, dWxt, dWht, dbt = rnn_step_backward(dnext_h, step_cache)
# 状态累加
dWx, dWh, db = dWx+dWxt, dWh+dWht, db+dbt
# 记录h0的梯度
dh0 = dprev_h
##############################################################################
# 结束编码 #
##############################################################################
return dx, dh0, dWx, dWh, db


def word_embedding_forward(x, W):
"""
词嵌入前向传播,将数据矩阵中的N条长度为T的词索引转化为词向量。
如:W[x[i,j]]表示第i条,第j时间步单词索引所对应的词向量。
Inputs:
- x: 整数型数组(N,T),N表示数据条数,T表示单条数据长度,
数组的每一元素存放着单词索引,取值范围[0,V)。
- W: 词向量矩阵(V,D)存放各单词对应的向量。

Returns 元组:
- out:输出词向量(N, T, D)。
- cache:反向传播时所需的缓存。
"""
out, cache = None, None
##############################################################################
# 任务:实现词嵌入前向传播。 #
##############################################################################
# 获取数据维度
N, T = x.shape
V, D = W.shape
# 初始化
out = np.zeros((N, T, D))
# 遍历所有数据
for i in range(N):
for j in range(T):
# 将其转化为词向量
out[i, j] = W[x[i, j]]
cache = (x, W.shape)
##############################################################################
# 结束编码 #
##############################################################################
return out, cache


def word_embedding_backward(dout, cache):
"""
词嵌入反向传播

Inputs:
- dout: 上层梯度 (N, T, D)
- cache:前向传播缓存

Returns:
- dW: 词嵌入矩阵梯度(V, D).
"""
dW = None
##############################################################################
# 任务:实现词嵌入反向传播 #
# 提示:你可以使用np.add.at函数 #
# 例如 np.add.at(a,[1,2],1)相当于a[1],a[2]分别加1 #
##############################################################################
x, W_shape = cache
dW = np.zeros(W_shape)
# np.add.at()是将传入的数组中制定下标位置的元素加上指定的值.
np.add.at(dW, x, dout)
##############################################################################
# 结束编码 #
##############################################################################
return dW


def sigmoid(x):
"""
数值稳定版本的sigmoid函数。
"""
pos_mask = (x >= 0)
neg_mask = (x < 0)
z = np.zeros_like(x)
z[pos_mask] = np.exp(-x[pos_mask])
z[neg_mask] = np.exp(x[neg_mask])
top = np.ones_like(x)
top[neg_mask] = z[neg_mask]
return top / (1 + z)


def lstm_step_forward(x, prev_h, prev_c, Wx, Wh, b):
"""
LSTM单步前向传播

Inputs:
- x: 输入数据 (N, D)
- prev_h: 前一隐藏层状态 (N, H)
- prev_c: 前一细胞状态(N, H)
- Wx: 输入层到隐藏层权重(D, 4H)
- Wh: 隐藏层到隐藏层权重 (H, 4H)
- b: 偏置项(4H,)

Returns 元组:
- next_h: 下一隐藏层状态(N, H)
- next_c: 下一细胞状态(N, H)
- cache: 反向传播所需的缓存
"""
next_h, next_c, cache = None, None, None
#############################################################################
# 任务:实现LSTM单步前向传播。 #
# 提示:稳定版本的sigmoid函数已经帮你实现,直接调用即可。 #
# tanh函数使用np.tanh。 #
#############################################################################
# 获取数据
N, D = x.shape
N, H = prev_h.shape
# 计算输入门、遗忘门、输出门
input_gate = sigmoid(np.dot(x, Wx[:, 0:H])+np.dot(prev_h, Wh[:, 0:H])+b[0:H])
forget_gate = sigmoid(np.dot(x, Wx[:, H:2*H])+np.dot(prev_h, Wh[:, H:2*H])
+b[H:2*H])
output_gate = sigmoid(np.dot(x, Wx[:, 2*H:3*H])+np.dot(prev_h, Wh[:, 2*H:3*H])
+b[2*H:3*H])
# 计算输出单元
input_data = np.tanh(np.dot(x, Wx[:, 3*H:4*H])+np.dot(prev_h, Wh[:, 3*H:4*H])
+b[3*H:4*H])
# 更新细胞记忆
next_c = forget_gate*prev_c+input_data*input_gate
# 计算细胞输出
next_scores_c = np.tanh(next_c)
next_h = output_gate*next_scores_c
cache = (x, Wx, Wh, b, input_data, input_gate, output_gate, forget_gate,
prev_h, prev_c, next_scores_c)
##############################################################################
# 结束编码 #
##############################################################################
return next_h, next_c, cache


def lstm_step_backward(dnext_h, dnext_c, cache):
"""
LSTM单步反向传播

Inputs:
- dnext_h: 下一隐藏层梯度 (N, H)
- dnext_c: 下一细胞梯度 (N, H)
- cache: 前向传播缓存

Returns 元组:
- dx: 输入数据梯度 (N, D)
- dprev_h: 前一隐藏层梯度 (N, H)
- dprev_c: 前一细胞梯度(N, H)
- dWx: 输入层到隐藏层梯度(D, 4H)
- dWh: 隐藏层到隐藏层梯度(H, 4H)
- db: 偏置梯度(4H,)
"""
dx, dprev_h, dc, dWx, dWh, db = None, None, None, None, None, None
#############################################################################
# 任务:实现LSTM单步反向传播 #
# 提示:sigmoid(x)函数梯度:sigmoid(x)*(1-sigmoid(x)) #
# tanh(x)函数梯度: 1-tanh(x)*tanh(x) #
#############################################################################
# 获取数据
x, Wx, Wh, b, input_data, input_gate, output_gate, forget_gate, prev_h,\
prev_c, next_scores_c = cache
N, D = x.shape
N, H = prev_h.shape
# 初始化变量
dWx = np.zeros((D, 4*H))
dxx = np.zeros((D, 4*H))
dWh = np.zeros((H, 4*H))
dhh = np.zeros((H, 4*H))
db = np.zeros(4*H)
dx = np.zeros((N, D))
dprev_h = np.zeros((N, H))
# 计算当前细胞的梯度
dc_tem = dnext_c+dnext_h*(1-next_scores_c**2)*output_gate
# 求解tanh层
dprev_c = forget_gate*dc_tem
dforget_gate = prev_c*dc_tem
dinput_gate = input_data*dc_tem
dinput = input_gate*dc_tem
doutput_gate = next_scores_c*dnext_h
# 求解sigmoid层
dscores_in_gate = input_gate*(1-input_gate)*dinput_gate
dscores_forget_gate = forget_gate*(1-forget_gate)*dforget_gate
dscores_out_gate = output_gate*(1-output_gate)*doutput_gate
dscores_in = (1-input_data**2)*dinput
da = np.hstack((dscores_in_gate, dscores_forget_gate, dscores_out_gate, dscores_in))
dWx = np.dot(x.T, da)
dWh = np.dot(prev_h.T, da)
db = np.sum(da, axis=0)
dx = np.dot(da, Wx.T)
dprev_h = np.dot(da, Wh.T)
##############################################################################
# 结束编码 #
##############################################################################

return dx, dprev_h, dprev_c, dWx, dWh, db


def lstm_forward(x, h0, Wx, Wh, b):
"""
LSTM前向传播
Inputs:
- x: 输入数据 (N, T, D)
- h0:初始化隐藏层状态(N, H)
- Wx: 输入层到隐藏层权重 (D, 4H)
- Wh: 隐藏层到隐藏层权重(H, 4H)
- b: 偏置项(4H,)

Returns 元组:
- h: 隐藏层所有状态 (N, T, H)
- cache: 用于反向传播的缓存
"""
h, cache = None, None
#############################################################################
# 任务: 实现完整的LSTM前向传播 #
#############################################################################
# 获取数据
N, T, D = x.shape
H = int(b.shape[0]/4)
# 初始化信息
h = np.zeros((N, T, H))
cache = {}
prev_h = h0
prev_c = np.zeros((N, H))
# 遍历所有时序数据
for t in range(T):
# 当前数据
xt = x[:, t, :]
# 进行单步LSTM前向传播
next_h, next_c, cache[t] = lstm_step_forward(xt, prev_h, prev_c, Wx, Wh, b)
# 更新状态
prev_h = next_h
prev_c = next_c
h[:, t, :] = prev_h
##############################################################################
# 结束编码 #
##############################################################################

return h, cache


def lstm_backward(dh, cache):
"""
LSTM反向传播
Inputs:
- dh: 各隐藏层梯度(N, T, H)
- cache: V前向传播缓存

Returns 元组:
- dx: 输入数据梯度 (N, T, D)
- dh0:初始隐藏层梯度(N, H)
- dWx: 输入层到隐藏层权重梯度 (D, 4H)
- dWh: 隐藏层到隐藏层权重梯度 (H, 4H)
- db: 偏置项梯度 (4H,)
"""
dx, dh0, dWx, dWh, db = None, None, None, None, None
#############################################################################
# 任务:实现完整的LSTM反向传播 #
#############################################################################
# 获取数据
N, T, H = dh.shape
# 从最后一条开始更新
x, Wx, Wh, b, input_data, input_gate, output_gate, forget_gate, prev_h, prev_c,\
next_scores_c = cache[T-1]
D = x.shape[1]
# 初始化
dprev_h = np.zeros((N, H))
dprev_c = np.zeros((N, H))
dx = np.zeros((N, T, D))
dh0 = np.zeros((N, H))
dWx = np.zeros((D, 4*H))
dWh = np.zeros((H, 4*H))
db = np.zeros((4*H,))
# 遍历所有数据
for t in range(T):
# 选择当前时间(从后向前)
t = T-1-t
# 获取数据
step_cache = cache[t]
dnext_h = dh[:, t, :]+dprev_h
dnext_c = dprev_c
# 进行单步反向传播计算
dx[:, t, :], dprev_h, dprev_c, dWxt, dWht, dbt = lstm_step_backward(dnext_h,
dnext_c, step_cache)
# 更新参数
dWx, dWh, db = dWx+dWxt, dWh+dWht, db+dbt
# 更新h0梯度
dh0 = dprev_h
##############################################################################
# 结束编码 #
##############################################################################

return dx, dh0, dWx, dWh, db


def temporal_affine_forward(x, w, b):
"""
时序隐藏层仿射传播:将隐藏层时序数据(N,T,D)重塑为(N*T,D),
完成前向传播后,再重塑回原型输出。

Inputs:
- x: 时序数据(N, T, D)。
- w: 权重(D, M)。
- b: 偏置(M,)。

Returns 元组:
- out: 输出(N, T, M)。
- cache: 反向传播缓存。
"""
N, T, D = x.shape
M = b.shape[0]
# Affine层
out = x.reshape(N * T, D).dot(w).reshape(N, T, M) + b
cache = x, w, b, out
return out, cache


def temporal_affine_backward(dout, cache):
"""
时序隐藏层仿射反向传播。

Input:
- dout:上层梯度 (N, T, M)。
- cache: 前向传播缓存。

Returns 元组:
- dx: 输入梯度(N, T, D)。
- dw: 权重梯度 (D, M)。
- db: 偏置项梯度 (M,)。
"""
x, w, b, out = cache
N, T, D = x.shape
M = b.shape[0]
# Affine层反向传播
dx = dout.reshape(N * T, M).dot(w.T).reshape(N, T, D)
dw = dout.reshape(N * T, M).T.dot(x.reshape(N * T, D)).T
db = dout.sum(axis=(0, 1))

return dx, dw, db


def temporal_softmax_loss(x, y, mask, verbose=False):
"""
时序版本的Softmax损失和原版本类似,只需将数据(N, T, V)重塑为(N*T,V)即可。
需要注意的是,对于NULL标记不计入损失值,因此,你需要加入掩码进行过滤。
Inputs:
- x: 输入数据得分(N, T, V)。
- y: 目标索引(N, T),其中0<= y[i, t] < V。
- mask: 过滤NULL标记的掩码。
Returns 元组:
- loss: 损失值。
- dx: x梯度。
"""
# 获取必备信息
N, T, V = x.shape

x_flat = x.reshape(N * T, V)
y_flat = y.reshape(N * T)
mask_flat = mask.reshape(N * T)

# 和原有softmax类似,不足的部分使用NULL补充,计算的时候过滤
probs = np.exp(x_flat - np.max(x_flat, axis=1, keepdims=True))
probs /= np.sum(probs, axis=1, keepdims=True)
loss = -np.sum(mask_flat * np.log(probs[np.arange(N * T), y_flat])) / N
dx_flat = probs.copy()
dx_flat[np.arange(N * T), y_flat] -= 1
dx_flat /= N
dx_flat *= mask_flat[:, None]

# 是否打印
if verbose:
print('dx_flat: ', dx_flat.shape)

dx = dx_flat.reshape(N, T, V)

return loss, dx



编码实现RNN以及LSTM
https://fulequn.github.io/2021/11/Article202111225/
作者
Fulequn
发布于
2021年11月22日
许可协议