CS231n assignment 2

2019-05-04
學校課程/ 圖像辨識

簡介

這次的作業相較於第一次作業又更深入了一些,這次作業要依序實作 Fully Connected Network、Batch Normalization、Dropout、Convolutional Neural Network 等方法,並將裡頭的步驟模組化。

Fully Connected Network

Modular network

首先要將 FCN 的 forward pass 以及 backward 模組化,包括 affine layer 與 ReLU activation。 在 forward pass 時使用cache將所需變數儲存,以便 backward pass 的時候使用。

1
2
3
4
5
def affine_forward(x, w, b):
out = None
out = x.reshape(x.shape[0], -1).dot(w) + b
cache = (x, w, b)
return out, cache
1
2
3
4
5
6
7
8
9
def affine_backward(dout, cache):
x, w, b = cache
dx, dw, db = None, None, None

dx = dout.dot(w.T).reshape(x.shape)
dw = x.reshape(x.shape[0], -1).T.dot(dout)
db = np.sum(dout, axis=0)

return dx, dw, db
1
2
3
4
5
def relu_forward(x):
out = None
out = np.maximum(x, 0)
cache = x
return out, cache
1
2
3
4
def relu_backward(dout, cache):
dx, x = None, cache
dx = dout * (x > 0)
return dx

利用剛剛完成的模組來建構TwoLayerNet以及可以自訂 size 的FullyConnectedNet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class TwoLayerNet(object):

def __init__(self, input_dim=3*32*32, hidden_dim=100, num_classes=10,
weight_scale=1e-3, reg=0.0):

self.params = {}
self.reg = reg

self.params['W1'] = np.random.normal(0, weight_scale, (input_dim, hidden_dim))
self.params['b1'] = np.zeros((hidden_dim, ))
self.params['W2'] = np.random.normal(0, weight_scale, (hidden_dim, num_classes))
self.params['b2'] = np.zeros((num_classes, ))


def loss(self, X, y=None):
scores = None

h1, cache_h1 = affine_relu_forward(X, self.params['W1'], self.params['b1'])
scores, cache_scores = affine_forward(h1, self.params['W2'], self.params['b2'])

# If y is None then we are in test mode so just return scores
if y is None:
return scores

loss, grads = 0, {}

loss, dS = softmax_loss(scores, y)
dh1, grads['W2'], grads['b2'] = affine_backward(dS, cache_scores)
dx, grads['W1'], grads['b1'] = affine_relu_backward(dh1, cache_h1)

loss += 0.5 * self.reg * (np.sum(self.params['W2'] ** 2) + np.sum(self.params['W1'] ** 2))
grads['W1'] += self.reg * self.params['W1']
grads['W2'] += self.reg * self.params['W2']

return loss, grads
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class FullyConnectedNet(object):

def __init__(self, hidden_dims, input_dim=3*32*32, num_classes=10,
dropout=1, normalization=None, reg=0.0,
weight_scale=1e-2, dtype=np.float32, seed=None):

self.normalization = normalization
self.use_dropout = dropout != 1
self.reg = reg
self.num_layers = 1 + len(hidden_dims)
self.dtype = dtype
self.params = {}

self.params['W1'] = np.random.normal(0, weight_scale, (input_dim, hidden_dims[0]))
self.params['b1'] = np.zeros((hidden_dims[0], ))

for i in range(1, self.num_layers - 1):
self.params['W'+str(i+1)] = np.random.normal(0, weight_scale, (hidden_dims[i-1], hidden_dims[i]))
self.params['b'+str(i+1)] = np.zeros((hidden_dims[i], ))

self.params['W'+str(self.num_layers)] = np.random.normal(0, weight_scale, (hidden_dims[-1], num_classes))
self.params['b'+str(self.num_layers)] = np.zeros((num_classes, ))

# When using dropout we need to pass a dropout_param dictionary to each
# dropout layer so that the layer knows the dropout probability and the mode
# (train / test). You can pass the same dropout_param to each dropout layer.
self.dropout_param = {}
if self.use_dropout:
self.dropout_param = {'mode': 'train', 'p': dropout}
if seed is not None:
self.dropout_param['seed'] = seed

# With batch normalization we need to keep track of running means and
# variances, so we need to pass a special bn_param object to each batch
# normalization layer. You should pass self.bn_params[0] to the forward pass
# of the first batch normalization layer, self.bn_params[1] to the forward
# pass of the second batch normalization layer, etc.
self.bn_params = []
if self.normalization=='batchnorm':
self.bn_params = [{'mode': 'train'} for i in range(self.num_layers - 1)]
if self.normalization=='layernorm':
self.bn_params = [{} for i in range(self.num_layers - 1)]

# Cast all parameters to the correct datatype
for k, v in self.params.items():
self.params[k] = v.astype(dtype)


def loss(self, X, y=None):
X = X.astype(self.dtype)
mode = 'test' if y is None else 'train'

# Set train/test mode for batchnorm params and dropout param since they
# behave differently during training and testing.
if self.use_dropout:
self.dropout_param['mode'] = mode
if self.normalization=='batchnorm':
for bn_param in self.bn_params:
bn_param['mode'] = mode
scores = None

h, cache = [None] * (self.num_layers + 1), [None] * (self.num_layers + 1)
h[0] = X
for i in range(self.num_layers - 1):
W, b = self.params['W'+str(i+1)], self.params['b'+str(i+1)]
h[i+1], cache[i+1] = affine_relu_forward(h[i], W, b)
else:
i += 1
W, b = self.params['W'+str(i+1)], self.params['b'+str(i+1)]
h[i+1], cache[i+1] = affine_forward(h[i], W, b)

scores = h[-1]

# If test mode return early
if mode == 'test':
return scores

loss, grads = 0.0, {}

loss, dS = softmax_loss(scores, y)
dh = [None] * (self.num_layers + 1)
dh[-1] = dS

i = self.num_layers
dh[i-1], grads['W'+str(i)], grads['b'+str(i)] = affine_backward(dh[i], cache[i])
loss += 0.5 * self.reg * np.sum(self.params['W'+str(i)] ** 2)
grads['W'+str(i)] += 0.5 * self.reg * (self.params['W'+str(i)] ** 2)
i -= 1

while i > 0:
dh[i-1], grads['W'+str(i)], grads['b'+str(i)] = affine_relu_backward(dh[i], cache[i])
loss += 0.5 * self.reg * np.sum(self.params['W'+str(i)] ** 2)
grads['W'+str(i)] += self.reg * self.params['W'+str(i)]
i -= 1

return loss, grads

SGD + Momentum

原始的 Stochastic Gradient Descent:

\(x_{t+1}=x_t-\alpha\nabla f(x_t)\)

SGD + Momumtum:

\(v_{t+1}=\rho v_t-\alpha\nabla f(x_t) \\ x_{t+1}=x_t+v_{t+1}\)

\(v\) 代表目前的方向速度,初始值為 0,如果負梯度與目前方向相同,則速度會越來越快,參數的更新幅度就會變大;反之則越來越慢,參數的更新幅度會變小。

至於 \(\rho\) 則是一個 hyperparameter,通常設在 0.9 左右。

使用 SGD + Momentum 通常比 Vanilla SGD 能夠更快收斂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def sgd_momentum(w, dw, config=None):
if config is None: config = {}
config.setdefault('learning_rate', 1e-2)
config.setdefault('momentum', 0.9)
v = config.get('velocity', np.zeros_like(w))

next_w = None

v = config['momentum'] * v - config['learning_rate'] * dw
next_w = w + v

config['velocity'] = v

return next_w, config

RMSProp

\(v_t=\rho v_{t-1}+(1-\rho) \times (\nabla f(x_t))^2\)

\(\Delta x_t=-\dfrac{\alpha}{\sqrt{v_t+\epsilon}} \times \nabla f(x_t)\)

\(x_{t+1}=x_t+\Delta x_t\)

\(\rho\) 為 decay rate,通常設為 0.9、0.99、0.999。

\(\epsilon\) 是一個很小的值,為了避免除以 0 的情況產生。

1
2
3
4
5
6
7
8
9
10
11
12
13
def rmsprop(w, dw, config=None):
if config is None: config = {}
config.setdefault('learning_rate', 1e-2)
config.setdefault('decay_rate', 0.99)
config.setdefault('epsilon', 1e-8)
config.setdefault('cache', np.zeros_like(w))

next_w = None

config['cache'] = config['decay_rate'] * config['cache'] + (1 - config['decay_rate']) * (dw ** 2)
next_w = w + -config['learning_rate'] * dw / np.sqrt(config['cache'] + config['epsilon'])

return next_w, config

Adam

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def adam(w, dw, config=None):
if config is None: config = {}
config.setdefault('learning_rate', 1e-3)
config.setdefault('beta1', 0.9)
config.setdefault('beta2', 0.999)
config.setdefault('epsilon', 1e-8)
config.setdefault('m', np.zeros_like(w))
config.setdefault('v', np.zeros_like(w))
config.setdefault('t', 0)

next_w = None

config['t'] += 1
config['m'] = config['beta1'] * config['m'] + (1 - config['beta1']) * dw
mt = config['m'] / (1 - config['beta1'] ** config['t'])
config['v'] = config['beta2'] * config['v'] + (1 - config['beta2']) * (dw ** 2)
vt = config['v'] / (1 - config['beta2'] ** config['t'])
next_w = w + -config['learning_rate'] * mt / (np.sqrt(vt) + config['epsilon'])

return next_w, config

比較不同 optimizer 的表現:

Optimizer Comparison
Optimizer Comparison

Batch Normalization

實作過程參考以下論文:

Batch Normalization: Accelerating Deep Network Training by Reducing Internal Covariate Shift

batch normalization 的目的是為了讓每一層的輸出都保持高斯分佈,主要的目的是為了避免 gradient vanishing。做法是將 fordward pass 時用來訓練的批次資料計算 mean 以及 variance,利用 mini-batch 的 mean 及 vairance 來更新整體的 mean 及 variance。

Forward Pass

論文中具體的實作方法如下:

batch_normalization
batch_normalization
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def batchnorm_forward(x, gamma, beta, bn_param):
mode = bn_param['mode']
eps = bn_param.get('eps', 1e-5)
momentum = bn_param.get('momentum', 0.9)

N, D = x.shape
running_mean = bn_param.get('running_mean', np.zeros(D, dtype=x.dtype))
running_var = bn_param.get('running_var', np.zeros(D, dtype=x.dtype))

out, cache = None, None
if mode == 'train':
sample_mean = x.mean(axis=0)
sample_var = x.var(axis=0)
sqrtvar = np.sqrt(sample_var + eps)
xmu = x - sample_mean
ivar = 1./sqrtvar
x_hat = xmu * ivar
out = gamma * x_hat + beta

cache = (xmu, sample_var, ivar, sqrtvar, x_hat, gamma, eps)

running_mean = momentum * running_mean + (1 - momentum) * sample_mean
running_var = momentum * running_var + (1 - momentum) * sample_var
elif mode == 'test':
x_hat = (x - running_mean) / np.sqrt(running_var + eps)
out = gamma * x_hat + beta
else:
raise ValueError('Invalid forward batchnorm mode "%s"' % mode)

# Store the updated running means back into bn_param
bn_param['running_mean'] = running_mean
bn_param['running_var'] = running_var

return out, cache

Backward Pass

論文中對於計算 BN 的反向傳播也有一些描述:

BN backward
BN backward

真的是滿複雜的,最好還是自己畫過一次計算圖之後再試著去計算 backward pass,這部分的話這篇文章寫得滿不錯的,可以參考一下。

自己畫的計算圖
自己畫的計算圖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def batchnorm_backward(dout, cache):
dx, dgamma, dbeta = None, None, None

N, D = dout.shape
xmu, var, ivar, sqrtvar, x_hat, gamma, eps = cache

dbeta = np.sum(dout, axis=0)
dgamma_x = dout
dgamma = np.sum(dgamma_x * x_hat, axis=0)

dx_hat = dgamma_x * gamma
divar = np.sum(dx_hat * xmu, axis=0)
dx_mu1 = dx_hat * ivar

dsqrtvar = -divar / (sqrtvar ** 2)
dvar = 0.5 * dsqrtvar / np.sqrt(var + eps)
dsq = dvar * np.ones((N, D)) / N

dx_mu2 = 2 * xmu * dsq

dx1 = dx_mu1 + dx_mu2
dmu = -np.sum(dx_mu1 + dx_mu2, axis=0)
dx2 = dmu * np.ones((N, D)) / N

dx = dx1 + dx2

return dx, dgamma, dbeta

簡化版:

1
2
3
4
5
6
7
8
9
10
def batchnorm_backward_alt(dout, cache):
dx, dgamma, dbeta = None, None, None
xmu, var, ivar, sqrtvar, x_hat, gamma, eps = cache
N, D = dout.shape

dbeta = np.sum(dout, axis=0)
dgamma = np.sum(x_hat * dout, axis=0)
dx = (gamma * ivar / N) * (N * dout - x_hat * dgamma - dbeta)

return dx, dgamma, dbeta

Layer Normalization

batch normalization 使得類神經網路的訓練更有效率,但是對於複雜的網路結構來說,在 batch size 不夠大的時候效果可能不會太好。因此另一個方法是對 feature 進行 normalize,參考論文:Layer Normalization

1
2
3
4
5
6
7
8
9
10
11
12
13
def layernorm_forward(x, gamma, beta, ln_param):
out, cache = None, None
eps = ln_param.get('eps', 1e-5)

x_T = x.T
sample_mean = np.mean(x_T, axis=0)
sample_var = np.var(x_T, axis=0)
x_norm_T = (x_T - sample_mean) / np.sqrt(sample_var + eps)
x_norm = x_norm_T.T
out = x_norm * gamma + beta
cache = (x, x_norm, gamma, sample_mean, sample_var, eps)

return out, cache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def layernorm_backward(dout, cache):
dx, dgamma, dbeta = None, None, None

x, x_norm, gamma, sample_mean, sample_var, eps = cache
x_T = x.T
dout_T = dout.T
N = x_T.shape[0]
dbeta = np.sum(dout, axis=0)
dgamma = np.sum(x_norm * dout, axis=0)

dx_norm = dout_T * gamma[:,np.newaxis]
dv = ((x_T - sample_mean) * -0.5 * (sample_var + eps)**-1.5 * dx_norm).sum(axis=0)
dm = (dx_norm * -1 * (sample_var + eps)**-0.5).sum(axis=0) + (dv * (x_T - sample_mean) * -2 / N).sum(axis=0)
dx_T = dx_norm / (sample_var + eps)**0.5 + dv * 2 * (x_T - sample_mean) / N + dm / N
dx = dx_T.T

return dx, dgamma, dbeta

Dropout

Dropout: A Simple Way to Prevent Neural Networks from Overfitting

drouput 是一種正規化的方法,在 forward pass 時隨機將某些 neuron 的值丟掉,跟 L1, L2 regularization 一樣,目的都是為了避免 overfitting。

dropout
dropout

實作方法是在 training 時根據一個機率 p 來隨機產生一個 mask (值為 True or False),將 x 乘上 mask 就可以將部分 neuron 的值設為 0, predicting 的時候就直接將 x 乘上 p。

但與其在 predicting 時乘上 p,其實我們可以在 training 的時候就除以 p,這樣就可以減少 predicting 的計算量,因為我們通常比較在意 predicting 時的效率,這個技巧稱為 inverted dropout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def dropout_forward(x, dropout_param):
p, mode = dropout_param['p'], dropout_param['mode']
if 'seed' in dropout_param:
np.random.seed(dropout_param['seed'])

mask = None
out = None

if mode == 'train':
mask = (np.random.rand(*x.shape) < p) / p
out = x * mask
elif mode == 'test':
out = x

cache = (dropout_param, mask)
out = out.astype(x.dtype, copy=False)

return out, cache
1
2
3
4
5
6
7
8
9
10
def dropout_backward(dout, cache):
dropout_param, mask = cache
mode = dropout_param['mode']

dx = None
if mode == 'train':
dx = dout * mask
elif mode == 'test':
dx = dout
return dx

Convolutional Neural Network

Convolution Layer Forward Pass

實作 CNN 的 forward pass,輸入 \(x\) 的大小為 \((N,C,H,W)\),以及 \(F\) 個 filter,合起來成為一個 \((F,C,HH,WW)\) 的矩陣,經過 convolution 的計算後,輸出一個 \((N,F,H^\prime,W^\prime)\) 的矩陣。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def conv_forward_naive(x, w, b, conv_param):
out = None

N, C, H, W = x.shape
F, C, HH, WW = w.shape
pad, stride = conv_param['pad'], conv_param['stride']

x_pad = np.pad(x, ((0,), (0,), (pad,), (pad,)), 'constant', constant_values=0)
H_prime = 1 + (H + 2 * pad - HH) // stride
W_prime = 1 + (W + 2 * pad - WW) // stride

out = np.empty((N, F, H_prime, W_prime))

for f in range(F):
for i in range(H_prime):
for j in range(W_prime):
out[:, f, i, j] = np.sum(x_pad[:, :, i*stride:i*stride+HH, j*stride:j*stride+WW] * w[f], axis=(1,2,3))

out += b.reshape(F, 1, 1)

cache = (x, w, b, conv_param)
return out, cache

Convolution Layer Backward Pass

計算 convolution layer 的 backpropagation 可以參考這篇文章。因為 forward 時的計算也算是 \(x\) 乘上 \(w\),因此 backward 時計算 \(dx\) 就是用 \(dout\)\(w\) 做計算;計算 \(dw\) 時則是用 \(dout\)\(x\) 做計算,雖然概念上不難理解,但是要透過numpy實作的話對維度要有一定的掌握才行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def conv_backward_naive(dout, cache):
dx, dw, db = None, None, None

x, w, b, conv_param = cache

N, C, H, W = x.shape
F, C, HH, WW = w.shape
pad, stride = conv_param['pad'], conv_param['stride']

x_pad = np.pad(x, ((0,), (0,), (pad,), (pad,)), mode='constant', constant_values=0)
H_prime = 1 + (H + 2 * pad - HH) // stride
W_prime = 1 + (W + 2 * pad - WW) // stride

dx = np.zeros_like(x)
dx_pad = np.zeros_like(x_pad)
dw = np.zeros_like(w)
db = np.sum(dout, axis=(0,2,3))

for i in range(H_prime):
for j in range(W_prime):
for f in range(F):
dw[f] += np.sum(dout[:, f, i, j].reshape(-1,1,1,1) * x_pad[:, :, i*stride:i*stride+HH, j*stride:j*stride+WW], axis=0)
for n in range(N):
dx_pad[n, :, i*stride:i*stride+HH, j*stride:j*stride+WW] += np.sum(w * dout[n, :, i, j].reshape(-1,1,1,1), axis=0)

dx = dx_pad[:, :, pad:-pad, pad:-pad]

return dx, dw, db

Max Pooling Forward Pass

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def max_pool_forward_naive(x, pool_param):
out = None

N, C, H, W = x.shape
pool_height, pool_width, stride = pool_param['pool_height'], pool_param['pool_width'], pool_param['stride']

H_prime = 1 + (H - pool_height) // stride
W_prime = 1 + (W - pool_width) // stride
out = np.empty((N, C, H_prime, W_prime))

for i in range(H_prime):
for j in range(W_prime):
out[:, :, i, j] = np.max(x[:, :, i*stride:i*stride+pool_height, j*stride:j*stride+pool_width], axis=(2,3))

cache = (x, pool_param)
return out, cache

Max Pooling Backward Pass

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def max_pool_backward_naive(dout, cache):
dx = None

x, pool_param = cache
N, C, H, W = x.shape
pool_height, pool_width, stride = pool_param['pool_height'], pool_param['pool_width'], pool_param['stride']

H_prime = 1 + (H - pool_height) // stride
W_prime = 1 + (W - pool_width) // stride
dx = np.zeros_like(x)

for i in range(H_prime):
for j in range(W_prime):
arg = np.max(x[:, :, i*stride:i*stride+pool_height, j*stride:j*stride+pool_width], axis=(2,3), keepdims=True) == \
x[:, :, i*stride:i*stride+pool_height, j*stride:j*stride+pool_width]
dx[:, :, i*stride:i*stride+pool_height, j*stride:j*stride+pool_width] += arg * dout[:, :, i, j][:,:,np.newaxis,np.newaxis]

return dx

最後還有實作 Spatial Batch Normalization 以及 Group Normalization,但這部分不是很熟所以略過。