编码实现卷积神经网络

cnn_layers.py

实现卷积神经网络的前向后传播的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
#-*- coding: utf-8 -*-
import numpy as np
from layers import *
from bn_layers import *


def conv_forward_naive(x, w, b, conv_param):
"""
卷积前向传播。
Input:
- x: 四维图片数据(N, C, H, W)分别表示(数量,色道,高,宽)
- w: 四维卷积核(F, C, HH, WW)分别表示(下层色道,上层色道,高,宽)
- b: 偏置项(F,)
- conv_param: 字典型参数表,其键值为:
- 'stride':跳跃数据卷积的跨幅数量
- 'pad':输入数据的零填充数量

Returns 元组型:
- out: 输出数据(N, F, H', W') ,其中 H' 和 W' 分别为:
H' = 1 + (H + 2 * pad - HH) / stride
W' = 1 + (W + 2 * pad - WW) / stride
- cache: (x, w, b, conv_param)
"""
out = None
#############################################################################
# 任务: 实现卷积层的前向传播 #
# 提示: 你可以使用np.pad函数进行零填充 #
#############################################################################
# 获取数据的各种数据量
# 数量,色道,高,宽
N, C, H, W = x.shape[0], x.shape[1], x.shape[2], x.shape[3]
# 下层色道,高,宽
F,HH,WW = w.shape[0],w.shape[2],w.shape[3]
# 输入数据的零填充数量
pad = conv_param['pad']
# 跳跃数据进行卷积的跨幅数量
stride = conv_param['stride']
# 进行填充
x_pad = np.pad(x, ((0,), (0,), (pad,), (pad,)), 'constant')

# 计算循环次数
Hhat = int(1 + (H + 2 * pad - HH) / stride)
What= int(1 + (W + 2 * pad - WW) / stride)
# 输出值
out = np.zeros([N,F,Hhat,What])
# 遍历所有数据的下层色道的高和宽
for n in range(N):
for f in range(F):
for i in range(Hhat):
for j in range(What):
xx =x_pad[n, :, i*stride:i*stride+HH, j*stride:j*stride+WW]
out[n,f,i,j] =np.sum(xx*w[f])+b[f]
#############################################################################
# 结束编码 #
#############################################################################
cache = (x, w, b, conv_param)
return out, cache


def conv_forward_fast(x, w, b, conv_param):
'''
卷积前向传播的快速版本

Parameters
----------
x : 四维图片数据(N, C, H, W)分别表示(数量,色道,高,宽)
w : 四维卷积核(F, C, HH, WW)分别表示(下层色道,上层色道,高,宽)
b : 偏置项(F,)
conv_param : 字典型参数表,其键值为:
- 'stride':跳跃数据卷积的跨幅数量
- 'pad':输入数据的零填充数量

Returns
-------
out : 输出数据(N, F, H', W') ,其中 H' 和 W' 分别为:
H' = 1 + (H + 2 * pad - HH) / stride
W' = 1 + (W + 2 * pad - WW) / stride
cache : (x, w, b, conv_param)

'''
N, C, H, W = x.shape
F, _, HH, WW = w.shape
stride, pad = conv_param['stride'], conv_param['pad']
assert (W + 2 * pad - WW) % stride == 0, '宽度异常'
assert (H + 2 * pad - HH) % stride == 0, '高度异常'
# 零填充
p = pad
x_padded = np.pad(x, ((0, 0), (0, 0), (p, p), (p, p)),
mode='constant')
# 计算输出维度
H += 2 * pad
W += 2 * pad
out_h = int((H - HH) / stride + 1)
out_w = int((W - WW) / stride + 1)
shape = (C, HH, WW, N, out_h, out_w)
strides = (H * W, W, 1, C * H * W, stride * W, stride)
strides = x.itemsize * np.array(strides)
x_stride = np.lib.stride_tricks.as_strided(x_padded,
shape=shape, strides=strides)
x_cols = np.ascontiguousarray(x_stride)
x_cols.shape = (C * HH * WW, N * out_h * out_w)
# 将所有卷积核重塑成一行
res = w.reshape(F, -1).dot(x_cols) + b.reshape(-1, 1)
# 重塑输出
res.shape = (F, N, out_h, out_w)
out = res.transpose(1, 0, 2, 3)
out = np.ascontiguousarray(out)
cache = (x, w, b, conv_param)
return out, cache



def conv_backward_naive1(dout, cache):
"""
卷积层反向传播显式循环版本

Inputs:
- dout:上层梯度.
- cache: 前向传播时的缓存元组 (x, w, b, conv_param)

Returns 元组:
- dx: x梯度
- dw: w梯度
- db: b梯度
"""
dx, dw, db = None, None, None
#############################################################################
# 任务 :实现卷积层反向传播 #
#############################################################################
x, w, b, conv_param = cache
P = conv_param['pad']
x_pad = np.pad(x,((0,),(0,),(P,),(P,)),'constant')
N, C, H, W = x.shape
F, C, HH, WW = w.shape
N, F, Hh, Hw = dout.shape
S = conv_param['stride']
dw = np.zeros((F, C, HH, WW))
for fprime in range(F):
for cprime in range(C):
for i in range(HH):
for j in range(WW):
sub_xpad =x_pad[:,cprime,i:i+Hh*S:S,j:j+Hw*S:S]
dw[fprime,cprime,i,j] = np.sum(
dout[:,fprime,:,:]*sub_xpad)


db = np.zeros((F))
for fprime in range(F):
db[fprime] = np.sum(dout[:,fprime,:,:])
dx = np.zeros((N, C, H, W))

for nprime in range(N):
for i in range(H):
for j in range(W):
for f in range(F):
for k in range(Hh):
for l in range(Hw):
mask1 = np.zeros_like(w[f,:,:,:])
mask2 = np.zeros_like(w[f,:,:,:])
if (i+P-k*S)<HH and (i+P-k*S)>= 0:
mask1[:,i+P-k*S,:] = 1.0
if (j+P-l* S) < WW and (j+P-l*S)>= 0:
mask2[:,:,j+P-l*S] = 1.0
w_masked=np.sum(w[f,:,:,:]*mask1*mask2,axis=(1,2))
dx[nprime,:,i,j] +=dout[nprime,f,k,l]*w_masked
#############################################################################
# 结束编码 #
#############################################################################
return dx, dw, db


def conv_backward_naive(dout, cache):
"""
卷积层反向传播

Inputs:
- dout:上层梯度.
- cache: 前向传播时的缓存元组 (x, w, b, conv_param)

Returns 元组:
- dx: x梯度
- dw: w梯度
- db: b梯度
"""
dx, dw, db = None, None, None
#############################################################################
# 任务 :实现卷积层反向传播 #
#############################################################################
x, w, b, conv_param = cache
# 初始化参数
N, C, H, W = x.shape
F, _, HH, WW = w.shape
stride, pad = conv_param['stride'], conv_param['pad']
# 计算循环次数
H_out = int(1+(H+2*pad-HH)/stride)
W_out = int(1+(W+2*pad-WW)/stride)
# 进行0填充
x_pad = np.pad(x,((0,), (0,), (pad,), (pad,)),
mode='constant', constant_values=0)
# 计算梯度
dx = np.zeros_like(x)
dx_pad = np.zeros_like(x_pad)
dw = np.zeros_like(w)
db = np.zeros_like(b)
# 进行求解
db = np.sum(dout, axis=(0, 2, 3))
x_pad = np.pad(x,((0,), (0,), (pad,), (pad,)),
mode='constant', constant_values=0)
for i in range(H_out):
for j in range(W_out):
x_pad_masked = x_pad[:, :, i*stride:i*stride+HH,
j*stride:j*stride+WW]
# 计算dw
for k in range(F):
dw[k, :, :, :] += np.sum(x_pad_masked*(dout[:, k, i, j])[:,
None, None, None], axis=0)

# 计算dx_pad
for n in range(N):
dx_pad[n, :, i*stride:i*stride+HH, j*stride:j*stride+WW] += \
np.sum((w[:, :, :, :]*(dout[n, :, i, j])[:, None, None, None]),
axis=0)

dx = dx_pad[:, :, pad:-pad, pad:-pad]
#############################################################################
# 结束编码 #
#############################################################################
return dx, dw, db


def max_pool_forward_naive(x, pool_param):
"""
最大池化前向传播

Inputs:
- x: 数据 (N, C, H, W)
- pool_param: 键值:
- 'pool_height': 池化高
- 'pool_width': 池化宽
- 'stride': 步幅

Returns 元组型:
- out: 输出数据
- cache: (x, pool_param)
"""
out = None
#############################################################################
# 任务: 实现最大池化操作的前向传播 #
#############################################################################
# 初始化参数
N, C, H, W = x.shape
HH = pool_param['pool_height']
WW = pool_param['pool_width']
stride = pool_param['stride']

# 计算循环次数
H_out = int((H-HH)/stride+1)
W_out = int((W-WW)/stride+1)
out = np.zeros((N, C, H_out, W_out))
for i in range(H_out):
for j in range(W_out):
# 先找到对应区域
x_masked = x[:, :, i*stride:i*stride+HH, j*stride:j*stride+WW]
# 选择其中最大的值
out[:, :, i, j] = np.max(x_masked, axis=(2,3))
#############################################################################
# 结束编码 #
#############################################################################
cache = (x, pool_param)
return out, cache


def max_pool_forward_fast(x, pool_param):
'''
最大池化前向传播的快速版本

Parameters
----------
x : 四维图片数据(N, C, H, W)分别表示(数量,色道,高,宽)
pool_param : 字典型参数表,其键值为:
- 'pool_height': 池化高
- 'pool_width': 池化宽
- 'stride': 步幅

Returns
-------
out : 输出数据
cache : (x, x_reshaped, out)
'''
# 初始化参数
N, C, H, W = x.shape
pool_height = pool_param['pool_height']
pool_width = pool_param['pool_width']
stride = pool_param['stride']

assert pool_height == pool_width == stride, 'Invalid pool params'
assert H % pool_height == 0
assert W % pool_height == 0

x_reshaped = x.reshape(N, C, int(H / pool_height), pool_height,
int(W / pool_width), pool_width)
out = x_reshaped.max(axis=3).max(axis=4)

cache = (x, x_reshaped, out)
return out, cache


def max_pool_backward_naive(dout, cache):
"""
最大池化反向传播.

Inputs:
- dout: 上层梯度
- cache: 缓存 (x, pool_param)
Returns:
- dx: x梯度
"""
dx = None
#############################################################################
# 任务:实现最大池化反向传播 #
#############################################################################
x, pool_param = cache
N, C, H, W = x.shape
HH = pool_param['pool_height']
WW = pool_param['pool_width']
stride = pool_param['stride']
H_out = int((H-HH)/stride+1)
W_out = int((W-WW)/stride+1)
dx = np.zeros_like(x)
for i in range(H_out):
for j in range(W_out):
x_masked = x[:, :, i*stride:i*stride+HH, j*stride:j*stride+WW]
max_x_masked = np.max(x_masked, axis=(2, 3))
temp_binary_mask = (x_masked == (max_x_masked)[:, :, None, None])
dx[:, :, i*stride:i*stride+HH, j*stride:j*stride+WW] += \
temp_binary_mask*(dout[:, :, i, j])[:, :, None, None]
#############################################################################
# 结束编码 #
#############################################################################
return dx


def max_pool_backward_fast(dout, cache):
x, x_reshaped, out = cache
dx_reshaped = np.zeros_like(x_reshaped)
out_newaxis = out[:, :, :, np.newaxis, :, np.newaxis]
mask = (x_reshaped == out_newaxis)
dout_newaxis = dout[:, :, :, np.newaxis, :, np.newaxis]
dout_broadcast, _ = np.broadcast_arrays(dout_newaxis, dx_reshaped)
dx_reshaped[mask] = dout_broadcast[mask]
dx_reshaped /= np.sum(mask, axis=(3, 5), keepdims=True)
dx = dx_reshaped.reshape(x.shape)
return dx


def spatial_batchnorm_forward(x, gamma, beta, bn_param):
"""
空间批量归一化前向传播

Inputs:
- x: 数据 (N, C, H, W)
- gamma: 缩放因子 (C,)
- beta: 偏移因子 (C,)
- bn_param: 参数字典:
- mode: 'train' or 'test';
- eps: 数值稳定常数
- momentum: 运行平均值衰减因子
- running_mean: 形状为(D,) 的运行均值
- running_var :形状为 (D,) 的运行方差

Returns 元组:
- out:输出 (N, C, H, W)
- cache: 用于反向传播的缓存
"""
out, cache = None, None
#############################################################################
# 任务:实现空间BN算法前向传播 #
# 提示:你只需要重塑数据,调用 batchnorm_forward函数即可 #
#############################################################################
N, C, H, W = x.shape
temp_output, cache = batchnorm_forward(
x.transpose(0, 3, 2, 1).reshape(N*H*W, C), gamma, beta, bn_param)
out = temp_output.reshape(N, W, H, C).transpose(0, 3, 2, 1)
#############################################################################
# 结束编码 #
#############################################################################

return out, cache


def spatial_batchnorm_backward(dout, cache):
"""
空间批量归一化反向传播

Inputs:
- dout: 上层梯度 (N, C, H, W)
- cache: 前向传播缓存

Returns 元组:
- dx:输入梯度 (N, C, H, W)
- dgamma: gamma梯度 (C,)
- dbeta: beta梯度 (C,)
"""
dx, dgamma, dbeta = None, None, None
#############################################################################
# 任务:实现空间BN算法反向传播 #
# 提示:你只需要重塑数据调用batchnorm_backward_alt函数即可 #
#############################################################################
N, C, H, W = dout.shape
dx_temp, dgamma, dbeta = batchnorm_backward_alt(
dout.transpose(0, 3 , 2, 1).reshape((N*H*W, C)), cache)
dx = dx_temp.reshape(N, W, H, C).transpose(0, 3, 2, 1)
#############################################################################
# 结束编码 #
#############################################################################
return dx, dgamma, dbeta


def conv_relu_forward(x, w, b, conv_param):
a, conv_cache = conv_forward_fast(x, w, b, conv_param)
out, relu_cache = relu_forward(a)
cache = (conv_cache, relu_cache)
return out, cache


def conv_relu_backward(dout, cache):
conv_cache, relu_cache = cache
da = relu_backward(dout, relu_cache)
dx, dw, db = conv_backward_naive(da, conv_cache)
return dx, dw, db


def conv_relu_pool_forward(x, w, b, conv_param, pool_param):
a, conv_cache = conv_forward_fast(x, w, b, conv_param)
s, relu_cache = relu_forward(a)
out, pool_cache = max_pool_forward_fast(s, pool_param)
cache = (conv_cache, relu_cache, pool_cache)
return out, cache


def conv_relu_pool_backward(dout, cache):
'''
完整卷积层的反向传播

Parameters
----------
dout : 上层梯度 (N, C, H, W)
cache : (conv_cache, relu_cache, pool_cache)

Returns
-------
dx : x的梯度
dw : w的梯度
db : b的梯度
'''
conv_cache, relu_cache, pool_cache = cache
ds = max_pool_backward_fast(dout, pool_cache)
da = relu_backward(ds, relu_cache)
dx, dw, db = conv_backward_naive(da, conv_cache)
return dx, dw, db

layers.py

之前已经写好的前向传播与后向传播代码以及softmax的损失函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#-*- coding: utf-8 -*-
import numpy as np

def affine_forward(x, w, b):
"""
计算神经网络当前层的前馈传播。该方法计算在全连接情况下的得分函数
注:如果不理解affine仿射变换,简单的理解为在全连接情况下的得分函数即可

输入数据x的形状为(N, d_1, ..., d_k),其中N表示数据量,(d_1, ..., d_k)表示
每一通道的数据维度。如果是图片数据就为(长,宽,色道),数据的总维度就为
D = d_1 * ... * d_k,因此我们需要数据整合成完整的(N,D)形式再进行仿射变换。

Inputs:
- x: 输入数据,其形状为(N, d_1, ..., d_k)的numpy array
- w: 权重矩阵,其形状为(D,M)的numpy array,D表示输入数据维度,M表示输出数据维度
可以将D看成输入的神经元个数,M看成输出神经元个数
- b: 偏置向量,其形状为(M,)的numpy array

Returns 元组:
- out: 形状为(N, M)的输出结果
- cache: 将输入进行缓存(x, w, b)
"""
out = None
# 任务: 实现全连接前向传播
# 注:首先你需要将输入数据重塑成行。
N=x.shape[0]
x_new=x.reshape(N,-1)#将x重塑成2维向量
out=np.dot(x_new,w)+b
cache = (x, w, b)
return out, cache


def affine_backward(dout, cache):
"""
计算仿射层的反向传播.

Inputs:
- dout: 形状为(N, M)的上层梯度
- cache: 元组:
- x: (N, d_1, ... d_k)的输入数据
- w: 形状为(D, M)的权重矩阵

Returns 元组:
- dx: 输入数据x的梯度,其形状为(N, d1, ..., d_k)
- dw: 权重矩阵w的梯度,其形状为(D,M)
- db: 偏置项b的梯度,其形状为(M,)
"""
x, w, b = cache
dx, dw, db = None, None, None
# 注意:你需要将x重塑成(N,D)后才能计算各梯度, #
# 完梯度后你需要将dx的形状与x重塑成一样
db = np.sum(dout,axis=0)
xx= x.reshape(x.shape[0],-1)
dw = np.dot(xx.T,dout)
dx = np.dot(dout,w.T)
dx=np.reshape(dx,x.shape)
return dx, dw, db


def relu_forward(x):
"""
计算tified linear units (ReLUs)激活函数的前向传播,并保存相应缓存

Input:
- x: 输入数据

Returns 元组:
- out: 和输入数据x形状相同
- cache: x
"""
out = None
# 实现ReLU 的前向传播. #
out =np.maximum(0,x)
cache = x
return out, cache


def relu_backward(dout, cache):
"""
计算 rectified linear units (ReLUs)激活函数的反向传播.

Input:
- dout: 上层误差梯度
- cache: 输入 x,其形状应该和dout相同

Returns:
- dx: x的梯度
"""
dx, x = None, cache
# 实现 ReLU 反向传播.
dx=dout
dx[x<=0]=0
return dx

def affine_relu_forward(x, w, b):
"""
ReLU神经元前向传播

Inputs:
- x: 输入到 affine层的数据
- w, b: affine层的权重矩阵和偏置向量

Returns 元组:
- out: Output from the ReLU的输出结果
- cache: 前向传播的缓存
"""
# 你需要调用affine_forward以及relu_forward函数,并将各自的缓存保存在cache中 #
a, fc_cache = affine_forward(x, w, b)
out, relu_cache = relu_forward(a)
cache = (fc_cache, relu_cache)
return out, cache


def affine_relu_backward(dout, cache):
"""
ReLU神经元的反向传播

Input:
- dout: 上层误差梯度
- cache: affine缓存,以及relu缓存

Returns:
- dx: 输入数据x的梯度
- dw: 权重矩阵w的梯度
- db: 偏置向量b的梯度
"""
fc_cache, relu_cache = cache
da = relu_backward(dout, relu_cache)
dx, dw, db = affine_backward(da, fc_cache)
return dx, dw, db




def softmax_loss(x, y):

probs = np.exp(x - np.max(x, axis=1, keepdims=True))
probs /= np.sum(probs, axis=1, keepdims=True)
N = x.shape[0]
loss = -np.sum(np.log(probs[np.arange(N), y])) / N
dx = probs.copy()
dx[np.arange(N), y] -= 1
dx /= N

return loss, dx

trainer.py

解耦训练器的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#-*- coding: utf-8 -*-
import numpy as np

import updater


class Trainer(object):
"""
使用形式:

data = {
'X_train': # 训练数据
'y_train': # 训练类标
'X_val': # 验证数据
'X_train': # 验证类标
}
model = MyAwesomeModel(hidden_size=100, reg=10)
Trainer = Trainer(model, data,
update_rule='sgd',
updater_config={
'learning_rate': 1e-3,
},
lr_decay=0.95,
num_epochs=10, batch_size=100,
print_every=100)
Trainer.train()
"""

def __init__(self, model, data, **kwargs):
"""
构造一个新的Trainer实例
必须参数:
- model: 网络模型
- data: 数据字典,其中:
'X_train': 形状为(N_train, d_1, ..., d_k)训练数据
'X_val': 形状为(N_val, d_1, ..., d_k) 验证数据
'y_train': 形状为(N_train,) 训练数据类标
'y_val': 形状为(N_val,) 验证数据类标

可选参数:
- update_rule: 更新规则,其存放在updater.py文件中,默认选项为'sgd'。
- updater_config: 字典类型的,更新规则所对应的超参数配置,同见updater.py文件。
- lr_decay: 学习率衰减系数。
- batch_size: 批量数据大小
- num_epochs: 训练周期
- print_every: 整数型; 每迭代多少次进行打印一次中间结果
- verbose: 布尔型; 是否在训练期间打印中间结果
"""
self.model = model
self.X_train = data['X_train']
self.y_train = data['y_train']
self.X_val = data['X_val']
self.y_val = data['y_val']

# 弹出可选参数,进行相关配置
self.update_rule = kwargs.pop('update_rule', 'sgd')
self.updater_config = kwargs.pop('updater_config', {})
self.lr_decay = kwargs.pop('lr_decay', 1.0)
self.batch_size = kwargs.pop('batch_size', 100)
self.num_epochs = kwargs.pop('num_epochs', 10)

self.print_every = kwargs.pop('print_every', 10)
self.verbose = kwargs.pop('verbose', True)

# 若可选参数错误,抛出异常
if len(kwargs) > 0:
extra = ', '.join('"%s"' % k for k in kwargs.keys())
raise ValueError('Unrecognized arguments %s' % extra)


#确认updater中含有更新规则
if not hasattr(updater, self.update_rule):
raise ValueError('Invalid update_rule "%s"' % self.update_rule)
self.update_rule = getattr(updater, self.update_rule)

# 初始化相关变量
self.epoch = 0
self.best_val_acc = 0
self.best_params = {}
self.loss_history = []
self.train_acc_history = []
self.val_acc_history = []

# 对updater_config中的参数进行深拷贝
self.updater_configs = {}
for p in self.model.params:
d = {k: v for k, v in self.updater_config.items()}
self.updater_configs[p] = d


def _step(self):
"""
执行单步梯度更新
"""
# 采样批量数据
num_train = self.X_train.shape[0]
batch_mask = np.random.choice(num_train, self.batch_size)
X_batch = self.X_train[batch_mask]
y_batch = self.y_train[batch_mask]

# 计算损失及梯度
loss, grads = self.model.loss(X_batch, y_batch)
self.loss_history.append(loss)

# 更新参数
for p, w in self.model.params.items():
dw = grads[p]
config = self.updater_configs[p]
next_w, next_config = self.update_rule(w, dw, config)
self.model.params[p] = next_w
self.updater_configs[p] = next_config


def check_accuracy(self, X, y, num_samples=None, batch_size=100):
"""
根据提供的数据检验精度,若数据集过大,可进行采样测试。

Inputs:
- X: 形状为(N, d_1, ..., d_k)的数据
- y: 形状为 (N,)的数据类标
- num_samples: 采样次数
- batch_size:批量数据大小

Returns:
- acc: 测试数据正确率
"""

# 对数据进行采样
N = X.shape[0]
if num_samples is not None and N > num_samples:
mask = np.random.choice(N, num_samples)
N = num_samples
X = X[mask]
y = y[mask]

# 计算精度
num_batches = int(N / batch_size)
if N % batch_size != 0:
num_batches += 1
y_pred = []
for i in range(num_batches):
start = i * batch_size
end = (i + 1) * batch_size
scores = self.model.loss(X[start:end])
y_pred.append(np.argmax(scores, axis=1))
y_pred = np.hstack(y_pred)
acc = np.mean(y_pred == y)

return acc


def train(self):
"""
根据配置训练模型
"""
num_train = self.X_train.shape[0]
iterations_per_epoch = max(num_train / self.batch_size, 1)
num_iterations = int(self.num_epochs * iterations_per_epoch)

for t in range(num_iterations):
self._step()

# 打印损失值
if self.verbose and t % self.print_every == 0:
print('(迭代 %d / %d) 损失值: %f' % (
t + 1, num_iterations, self.loss_history[-1]))

# 更新学习率
epoch_end = (t + 1) % iterations_per_epoch == 0
if epoch_end:
self.epoch += 1
for k in self.updater_configs:
self.updater_configs[k]['learning_rate'] *= self.lr_decay


#在训练的开始,末尾,每一轮训练周期检验精度
first_it = (t == 0)
last_it = (t == num_iterations + 1)
if first_it or last_it or epoch_end:
train_acc = self.check_accuracy(self.X_train, self.y_train,
num_samples=1000)
val_acc = self.check_accuracy(self.X_val, self.y_val)
self.train_acc_history.append(train_acc)
self.val_acc_history.append(val_acc)

if self.verbose:
print('(周期 %d / %d) 训练精度: %f; 验证精度: %f' % (
self.epoch, self.num_epochs, train_acc, val_acc))

# 记录最佳模型
if val_acc > self.best_val_acc:
self.best_val_acc = val_acc
self.best_params = {}
for k, v in self.model.params.items():
self.best_params[k] = v.copy()

# 训练结束后返回最佳模型
self.model.params = self.best_params


updater.py

解耦更新器,主要负责更新神经网络的权重,其传入参数有神经网络的权重w ww、当前权重的梯度d w dwdw及相应的更新配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#-*- coding: utf-8 -*-
import numpy as np

"""
频繁使用在训练神经网络中的一阶梯度更新规则。每次更新接受当前的权重,
对应的梯度,以及相关配置进行权重更新。
def update(w, dw, config=None):
Inputs:
- w:当前权重.
- dw: 和权重形状相同的梯度.
- config: 字典型超参数配置,比如学习率,动量值等。如果更新规则需要用到缓存,
在配置中需要保存相应的缓存。

Returns:
- next_w: 更新后的权重.
- config: 更新规则相应的配置.
"""


def sgd(w, dw, config=None):
"""
随机梯度下降更新规则.

config 格式:
- learning_rate: 学习率.
"""
if config is None: config = {}
config.setdefault('learning_rate', 1e-2)

w -= config['learning_rate'] * dw
return w, config



def sgd_momentum(w, dw, config=None):
"""
动量随机梯度下降更新规则

config 格式:
- learning_rate: 学习率.
- momentum: [0,1]的动量,0表示不使用动量,退化为SGD
- velocity: 和w,dw同形的速度
"""
if config is None: config = {}
config.setdefault('learning_rate', 1e-2)
config.setdefault('momentum', 0.9)
v = config.setdefault('velocity', np.zeros_like(w))

next_w = None
v =config['momentum']*config['velocity'] - config['learning_rate'] * dw
next_w = w + v
config['velocity'] = v

return next_w, config



def rmsprop(w, dw, config=None):
"""
RMSProp 更新规则

config 格式:
- learning_rate: 学习率.
- decay_rate:用于衰减历史梯度值的衰减率,取值为[0,1]
- epsilon: 避免除零异常的小数.
- cache:历史梯度缓存.
"""
if config is None: config = {}
config.setdefault('learning_rate', 1e-2)
config.setdefault('decay_rate', 0.99)
config.setdefault('epsilon', 1e-8)
config.setdefault('cache', np.zeros_like(w))

next_w = None
config['cache'] = config['decay_rate'] * config['cache'] + (1 - config['decay_rate']) * dw**2
next_w = w - config['learning_rate'] * dw /(np.sqrt(config['cache'] + config['epsilon']))

return next_w, config


def adam(w, dw, config=None):
"""
使用 Adam更新规则 ,融合了“热身”更新

config 格式:
- learning_rate: 学习率.
- beta1: 动量衰减率.
- beta2: 学习步长衰减率.
- epsilon: 防除0小数.
- m: 梯度.
- v: 梯度平方.
- t: 迭代次数.
"""
if config is None: config = {}
config.setdefault('learning_rate', 1e-3)
config.setdefault('beta1', 0.9)
config.setdefault('beta2', 0.999)
config.setdefault('epsilon', 1e-8)
config.setdefault('m', np.zeros_like(w))
config.setdefault('v', np.zeros_like(w))
config.setdefault('t', 0)

next_w = None
# 将更新后的权重存放在next_w中,记得将m,v,t存放在相应的config中
config['t'] += 1
beta1 = config['beta1']
beta2 = config['beta2']
epsilon = config['epsilon']
learning_rate = config['learning_rate']
config['m'] = beta1 * config['m'] + (1-beta1) * dw
config['v'] = beta2 * config['v'] + (1-beta2) * dw**2
mb = config['m']/(1 - beta1**config['t'])
vb = config['v']/(1 - beta2**config['t'])
next_w = w - learning_rate * mb / (np.sqrt(vb)+epsilon)
return next_w, config


bn_layers.py

实现BN算法的前向传播、反向传播。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#-*- coding: utf-8 -*-
import numpy as np
from layers import *
from dropout_layers import *

def batchnorm_forward(x, gamma, beta, bn_param):
"""

使用使用类似动量衰减的运行时平均,计算总体均值与方差 例如:

running_mean = momentum * running_mean + (1 - momentum) * sample_mean
running_var = momentum * running_var + (1 - momentum) * sample_var
Input:
- x: 数据(N, D)
- gamma: 缩放参数 (D,)
- beta: 平移参数 (D,)
- bn_param: 字典型,使用下列键值:
- mode: 'train' 或'test';
- eps: 保证数值稳定
- momentum: 运行时平均衰减因子
- running_mean: 形状为(D,)的运行时均值
- running_var : 形状为 (D,)的运行时方差

Returns 元组:
- out: 输出(N, D)
- cache: 用于反向传播的缓存
"""
mode = bn_param['mode']
eps = bn_param.get('eps', 1e-5)
momentum = bn_param.get('momentum', 0.9)

N, D = x.shape
running_mean = bn_param.get('running_mean', np.zeros(D, dtype=x.dtype))
running_var = bn_param.get('running_var', np.zeros(D, dtype=x.dtype))

out, cache = None, None
if mode == 'train':
# Forward pass
# Step 1 - shape of mu (D,)
mu = 1 / float(N) * np.sum(x, axis=0)
# Step 2 - shape of var (N,D)
xmu = x - mu
# Step 3 - shape of carre (N,D)
carre = xmu**2
# Step 4 - shape of var (D,)
var = 1 / float(N) * np.sum(carre, axis=0)
# Step 5 - Shape sqrtvar (D,)
sqrtvar = np.sqrt(var + eps)
# Step 6 - Shape invvar (D,)
invvar = 1. / sqrtvar
# Step 7 - Shape va2 (N,D)
va2 = xmu * invvar
# Step 8 - Shape va3 (N,D)
va3 = gamma * va2
# Step 9 - Shape out (N,D)
out = va3 + beta
running_mean = momentum * running_mean + (1.0 - momentum) * mu
running_var = momentum * running_var + (1.0 - momentum) * var
cache = (mu, xmu, carre, var, sqrtvar, invvar,va2, va3, gamma, beta, x, bn_param)
elif mode == 'test':
# 使用运行时均值与方差归一化数据
mu = running_mean
var = running_var
xhat = (x - mu) / np.sqrt(var + eps)
# 使用gamma和beta参数缩放,平移数据。
out = gamma * xhat + beta
cache = (mu, var, gamma, beta, bn_param)
else:
raise ValueError('无法识别的BN模式: "%s"' % mode)

# 更新运行时均值,方差
bn_param['running_mean'] = running_mean
bn_param['running_var'] = running_var

return out, cache


def batchnorm_backward(dout, cache):
"""
BN反向传播
Inputs:
- dout: 上层梯度 (N, D)
- cache: 前向传播时的缓存.

Returns 元组:
- dx: 数据梯度 (N, D)
- dgamma: gamma梯度 (D,)
- dbeta: beta梯度 (D,)
"""
dx, dgamma, dbeta = None, None, None

mu, xmu, carre, var, sqrtvar, invvar, va2, va3, gamma, beta, x, bn_param = cache
eps = bn_param.get('eps', 1e-5)
N, D = dout.shape
# Backprop Step 9
dva3 = dout
dbeta = np.sum(dout, axis=0)
# Backprop step 8
dva2 = gamma * dva3
dgamma = np.sum(va2 * dva3, axis=0)
# Backprop step 7
dxmu = invvar * dva2
dinvvar = np.sum(xmu * dva2, axis=0)
# Backprop step 6
dsqrtvar = -1. / (sqrtvar**2) * dinvvar
# Backprop step 5
dvar = 0.5 * (var + eps)**(-0.5) * dsqrtvar
# Backprop step 4
dcarre = 1 / float(N) * np.ones((carre.shape)) * dvar
# Backprop step 3
dxmu += 2 * xmu * dcarre
# Backprop step 2
dx = dxmu
dmu = - np.sum(dxmu, axis=0)
# Basckprop step 1
dx += 1 / float(N) * np.ones((dxmu.shape)) * dmu

return dx, dgamma, dbeta


def batchnorm_backward_alt(dout, cache):
"""
可选的BN反向传播
"""
dx, dgamma, dbeta = None, None, None
mu, xmu, carre, var, sqrtvar, invvar, va2, va3, gamma, beta, x, bn_param = cache
eps = bn_param.get('eps', 1e-5)
N, D = dout.shape
dbeta = np.sum(dout, axis=0)
dgamma = np.sum((x - mu) * (var + eps)**(-1. / 2.) * dout, axis=0)
dx = (1./N) * gamma * (var + eps)**(-1./2.)*(N*dout-np.sum(
dout, axis=0)-(x-mu)*(var+eps)**(-1.0)*np.sum(dout*(x-mu),axis=0))

return dx, dgamma, dbeta


def affine_bn_relu_forward(x,w,b,gamma, beta,bn_param):
x_affine,cache_affine= affine_forward(x,w,b)
x_bn,cache_bn = batchnorm_forward(x_affine,gamma, beta,bn_param)
out,cache_relu = relu_forward(x_bn)
cache = (cache_affine,cache_bn,cache_relu)
return out,cache


def affine_bn_relu_backward(dout,cache):
cache_affine,cache_bn,cache_relu = cache
drelu = relu_backward(dout,cache_relu)
dbn,dgamma, dbeta= batchnorm_backward_alt(drelu,cache_bn)
dx,dw,db = affine_backward(dbn,cache_affine)
return dx,dw,db,dgamma,dbeta


dropout_layers.py

包含了Dropout前向传播以及反向传播,组合Dropout传播层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#-*- coding: utf-8 -*-
import numpy as np
from layers import *


def dropout_forward(x, dropout_param):
"""
执行dropout前向传播
Inputs:
- x: 输入数据
- dropout_param: 字典类型,使用下列键值:
- p: dropout参数。每个神经元的激活概率p
- mode: 'test'或'train'. 训练模式使用dropout;测试模式仅仅返回输入值。
- seed: 随机数生成种子.

Outputs:
- out: 和输入数据相同形状
- cache:元组(dropout_param, mask).
训练模式时,掩码mask用于激活该层神经元,测试模式时不使用
"""
p, mode = dropout_param['p'], dropout_param['mode']
if 'seed' in dropout_param:
np.random.seed(dropout_param['seed'])

mask = None
out = None

if mode == 'train':
mask = (np.random.rand(*x.shape) < p)/p
out =x*mask
elif mode == 'test':
out = x

cache = (dropout_param, mask)
out = out.astype(x.dtype, copy=False)

return out, cache


def dropout_backward(dout, cache):
"""
dropout反向传播

Inputs:
- dout: 上层梯度
- cache: dropout_forward中的缓存(dropout_param, mask)。
"""
dropout_param, mask = cache
mode = dropout_param['mode']

dx = None
if mode == 'train':
dx =dout*mask
elif mode == 'test':
dx = dout
return dx

def affine_relu_dropout_forward(x,w,b,dropout_param):
"""
组合affine_relu_dropout前向传播
Inputs:
- x: 输入数据
- w: 权重参数
- b: 偏置项
- dropout_param: 字典类型,使用下列键值:
- p: dropout参数。每个神经元的激活概率p
- mode: 'test'或'train'. 训练模式使用dropout;测试模式仅仅返回输入值。
- seed: 随机数生成种子.

Outputs:
- out: 和输入数据相同形状
- cache:缓存包含(cache_affine,cache_relu,cache_dropout)
"""
out_dropout = None
cache =None
out_affine, cache_affine = affine_forward(x,w,b)
out_relu,cache_relu =relu_forward(out_affine)
out_dropout,cache_dropout =dropout_forward(out_relu,dropout_param)
cache = (cache_affine,cache_relu,cache_dropout)
return out_dropout,cache

def affine_relu_dropout_backward(dout,cache):
"""
affine_relu_dropout神经元的反向传播

Input:
- dout: 上层误差梯度
- cache: 缓存(cache_affine,cache_relu,cache_dropout)

Returns:
- dx: 输入数据x的梯度
- dw: 权重矩阵w的梯度
- db: 偏置向量b的梯度
"""
cache_affine,cache_relu,cache_dropout = cache
dx,dw,db=None,None,None
ddropout = dropout_backward(dout,cache_dropout)
drelu = relu_backward(ddropout,cache_relu)
dx,dw,db = affine_backward(drelu,cache_affine)
return dx,dw,db

cnn.py

接下来我们实现简单的浅层卷积网络,该网络由一层卷积层,两层全连接层组成:输入 - conv - relu - 2x2 max pool - affine - relu - affine - softmax。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#-*- coding: utf-8 -*-
import sys, os
sys.path.append(os.path.realpath(os.path.dirname(os.path.realpath(__file__))))

import numpy as np
from cnn_layers import *

class ThreeLayerConvNet(object):
"""
conv - relu - 2x2 max pool - affine - relu - affine - softmax
"""

def __init__(self, input_dim=(3, 32, 32), num_filters=32, filter_size=7,
hidden_dim=100, num_classes=10, weight_scale=1e-3, reg=0.0,):
"""
初始化网络.

Inputs:
- input_dim: 输入数据形状 (C, H, W)
- num_filters: 卷积核个数
- filter_size: 卷积核尺寸
- hidden_dim: 全连接层隐藏层个数
- num_classes: 分类个数
- weight_scale: 权重规模(标准差)
- reg:权重衰减因子
"""
self.params = {}
self.reg = reg
############################################################################
# 任务:初始化权重参数 #
# 'W1'为卷积层参数,形状为(num_filters,C,filter_size,filter_size) #
# 'W2'为卷积层到全连接层参数,形状为((H/2)*(W/2)*num_filters, hidden_dim) #
# 'W3'隐藏层到全连接层参数 #
############################################################################
C, H, W = input_dim
self.params['W1'] = weight_scale*np.random.randn(num_filters, C,
filter_size, filter_size)
self.params['b1'] = np.zeros(num_filters)
self.params['W2'] = weight_scale*np.random.randn(int((H/2)*(W/2)*num_filters),
hidden_dim)
self.params['b2'] = np.zeros(hidden_dim)
self.params['W3'] = weight_scale*np.random.randn(hidden_dim, num_classes)
self.params['b3'] = np.zeros(num_classes)
############################################################################
# 结束编码 #
############################################################################


def loss(self, X, y=None):

# 初始化参数
W1, b1 = self.params['W1'], self.params['b1']
W2, b2 = self.params['W2'], self.params['b2']
W3, b3 = self.params['W3'], self.params['b3']
# 使用卷积层
filter_size = W1.shape[2]
# 设置卷积层和池化层所需要的参数
conv_param = {'stride': 1, 'pad': int((filter_size - 1) / 2)}
pool_param = {'pool_height': 2, 'pool_width': 2, 'stride': 2}

scores = None
############################################################################
# 任务: 实现前向传播 #
# 计算每类得分,将其存放在scores中 #
############################################################################
# 组合卷积层:卷积,ReLU,池化
conv_forward_out_1, cache_forward_1 = conv_relu_pool_forward(X,
self.params['W1'], self.params['b1'], conv_param, pool_param)
# affine层
affine_forward_out_2, cache_forward_2 = affine_forward(conv_forward_out_1,
self.params['W2'], self.params['b2'])
# relu层
affine_relu_2, cache_relu_2 = relu_forward(affine_forward_out_2)
# affine层
scores, cache_forward_3 = affine_forward(affine_relu_2, self.params['W3'],
self.params['b3'])
############################################################################
# 结束编码 #
############################################################################
if y is None:
return scores

loss, grads = 0, {}
############################################################################
# 任务:实现反向转播 #
# 注意:别忘了权重衰减项 #
############################################################################
loss, dout = softmax_loss(scores, y)
loss += self.reg*0.5*(np.sum(self.params['W1']**2)
+np.sum(self.params['W2']**2)
+np.sum(self.params['W3']**2))
dX3, grads['W3'], grads['b3'] = affine_backward(dout, cache_forward_3)
dX2 = relu_backward(dX3, cache_relu_2)
dX2, grads['W2'], grads['b2'] = affine_backward(dX2, cache_forward_2)
dX1, grads['W1'], grads['b1'] = conv_relu_pool_backward(dX2, cache_forward_1)
grads['W3'] = grads['W3']+self.reg*self.params['W3']
grads['W2'] = grads['W2']+self.reg*self.params['W2']
grads['W1'] = grads['W1']+self.reg*self.params['W1']
############################################################################
# 结束编码 #
############################################################################
return loss, grads



编码实现卷积神经网络
https://fulequn.github.io/2021/01/Article202101103/
作者
Fulequn
发布于
2021年1月10日
许可协议