T1:手动实现FNN模型
4.1实验过程
🤣啃PPT还有python的numpy文档
😅和bugs斗智斗勇
欢迎讨论以及批评指正
4.2 实验代码以及工程文件
数据处理文件就不放出来了,比较基础~
有机会贴一下github叭,不过看样子大概率会拖到毕业再填坑🤣
main.py
#########################################################################
# # 手动实现FNN实验:
# # (1)dropout,激活函数,损失函数修改
# # (2)网络层数、内部结点个数修改
#########################################################################
from model import BPNN
from utils import DataLoader
import numpy as np
import matplotlib.pyplot as plt
import argparse
import json
import datetime
def example_1(optimizer,epoch,batch_size=32,lr=0.001,dropout=None,num_layers=4,hidden_nodes=1024,activate='tanh'):
"""
:param epoch:
:param batch_size: 批次
:param lr: 学习率
:param dropout:
:param num_layers: 层数
:param hidden_nodes: 隐层结点数
:return:
"""
# 加载数据
#初始化层的结点数
unit_list=[]
unit_list.append(32*32)
for i in range(num_layers-2):
unit_list.append(hidden_nodes)
unit_list.append(10)
train_data=DataLoader(batch_size=batch_size,data_type='train',scale=False)
test_data=DataLoader(batch_size=batch_size,data_type='test',scale=False)
# 定义神经网络的时候,需要申明batch_size
model=BPNN(num_layers=num_layers,unit_list=unit_list,initializer='xavier',optimizer=optimizer,batch_size=batch_size,dropout=dropout,activate=activate)
# 训练
loss_dict=model.train(train_data,train_data,epoch=epoch,lr=lr,dropout_prob=dropout)# 这里测试集就是验证集
plt.plot(loss_dict['train_loss'])
print("loss:",loss_dict['train_loss'])
#plt.savefig("1.png")
# 显示损失曲线
filename = "./log/" + "BP-scratch-" + datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + '.jpg'
plt.savefig(filename)
# 问题
h,label=model.predict(test_data)
acc=np.sum(h==label)/len(label)
print("*"*50)
print("测试集上的正确率为{:.3f}".format(acc))
return acc
if __name__=="__main__":
#show100_ada()
# 命令行调用
parser=argparse.ArgumentParser()
parser.add_argument("--batch_size",default=64,type=int)
parser.add_argument("--epoch",default=280,type=int)
parser.add_argument("--num_layers",default=4,type=int)
parser.add_argument("--hidden_nodes",default=505,type=int)
parser.add_argument("--dropout",default=0.3,type=float)
parser.add_argument("--optimizer",default="adam",type=str)
parser.add_argument("--lr",default=0.01,type=float)
parser.add_argument("--loss_func",default="mle",type=str)
parser.add_argument("--activate",default="sigmoid",type=str)
parser.add_argument("--acc",default=0.0,type=float)
args=parser.parse_args()
#实验模块
print("*"*50)
args.acc = example_1(epoch=args.epoch,batch_size=args.batch_size,dropout=args.dropout,lr=args.lr,num_layers=args.num_layers,optimizer=args.optimizer,activate=args.activate)
#记录实验设置的参数以及结果
with open('./log/bp_scratch_args.txt', 'a') as f:
json.dump(args.__dict__, f, indent=2)
print('参数保存成功')
model.py
############################################################################################
# 构建FNN反向梯度更新模型
# 需要手动构建dropout
# 需要自动的网络层数
# 需要自动的隐层结点个数
# 损失函数
# 激活函数
# 结论:
#############################################################################################
import numpy as np
from tqdm import tqdm
from time import sleep
# 自己实现的相关工具
from initializers import xavier,zeros
from optimizers import Adam,SGD,AdaGrad,RMSProp
from activations import softmax,tanh,softmax_gradient,tanh_gradient,sigmoid,sigmoid_gradient
from utils import onehot
from loss import cross_entropy,loss_error,mle
def dropout(x,dropout_prob):
keep_prob=1-dropout_prob
d_temp=np.random.binomial(1,keep_prob,size=x.shape[1:])/keep_prob
#print(d_temp)
d_temp=d_temp.reshape(-1)
#print(d_temp)
x_dropout=x*d_temp
return x_dropout,d_temp
class BPNN(object):
def __init__(self,num_layers,unit_list=None,initializer=None,optimizer='adam',activate='tanh',loss_fn='mle',dropout=0,batch_size=64):
"""
:param num_layers: 层数
:param unit_list: 隐层参数
:param initializer: 初始化函数
:param optimizer: 优化器
:param activate: 激活函数
"""
self.hidden_num=num_layers-1
self.loss_fn=loss_fn
self.batch_size=batch_size
self.dropout=dropout
# 初始化的方式
if initializer=='xavier':
self.params=xavier(num_layers,unit_list)
else:
self.params=zeros(num_layers,unit_list)
# 激活函数
self.activate=activate
# 优化方式为随机梯度下降
if optimizer=='sgd':
print("sgd")
self.optimizer=SGD()
elif optimizer=='adam':
self.optimizer=Adam(weights=self.params, weight_num=self.hidden_num)
elif optimizer=='ada':
print("ada")
self.optimizer=AdaGrad(weights=self.params, weight_num=self.hidden_num)
elif optimizer=='rmsprop':
print("rmsprop")
self.optimizer=RMSProp(weights=self.params, weight_num=self.hidden_num)
else:
# 默认是最好的优化器
self.optimizer=Adam(weights=self.params, weight_num=self.hidden_num)
def forward(self,x,dropout_prob=None):
"""
前向传播,针对一个批次处理数据
"""
net_inputs=[]#各层的输入
net_outputs=[]#各层输出
net_d=[]
# 为了层号对应,将输入层直接添加
net_inputs.append(x)
net_outputs.append(x)
net_d.append(np.ones(x.shape[1:]))# 输入层没有丢弃概率
for i in range(1,self.hidden_num):
x=x@self.params['w'+str(i)].T# @是矩阵的叉乘
net_inputs.append(x)
# 激活函数
if self.activate=='tanh':
x=tanh(x)
else:
x=sigmoid(x)
if dropout_prob:
# 丢弃一些参数
x,d_temp=dropout(x,dropout_prob)
net_d.append(d_temp)#某一层的遮罩
net_outputs.append(x)
out=x@self.params['w'+str(self.hidden_num)].T # 最后一层的输出
net_inputs.append(out)# 本层的输出作为下一次的输入
out=softmax(out)
net_outputs.append(out)
# 用字典储存每一层的状况,便于backpropagation
return {'net_inputs':net_inputs,'net_outputs':net_outputs,'d':net_d},out
def backward(self,nets,y,h,drop_prob=None):
"""反向更新参数"""
# 最后一层单独讨论
grads=dict()#反向存储梯度
if self.loss_fn=='mle':
grads['dz'+str(self.hidden_num)]=h-y # 最后一层的梯度就是”预测值-真实值"
else:
grads['dz' + str(self.hidden_num)] = loss_error(h,y)
# 这边,可以换成其他的损失函数
grads['dw'+str(self.hidden_num)]=grads['dz'+str(self.hidden_num)].T@nets['net_outputs'][self.hidden_num-1]#误差*输入
#其他层套用模板
for i in reversed(range(1,self.hidden_num)):
# 误差*该层的梯度
if self.activate=='tanh':
temp=grads['dz'+str(i+1)]@self.params['w'+str(i+1)]*tanh_gradient(nets['net_inputs'][i])
else:
temp=grads['dz'+str(i+1)]@self.params['w'+str(i+1)]*sigmoid_gradient(nets['net_inputs'][i])
if drop_prob:
temp=temp*nets['d'][i]/(1-drop_prob)
grads['dz'+str(i)]=temp#当前层的结果,是需要输入前一层的误差
grads['dw'+str(i)]=grads['dz'+str(i)].T@nets['net_outputs'][i-1]
return grads
def train(self,dataloader,valid_loader,epoch,lr,dropout_prob=None):
"""
:param dataloader:
:param valid_loader:
:param epoch:
:param lr:
:param dropout_prob:
:return:
"""
train_loss=[]
test_loss=[]
epochs=epoch
for epoch in range(epochs):
epoch_loss = 0.0
for step, (x, y) in enumerate(dataloader):
x = x.reshape(-1, 32 * 32)
y = onehot(y, 10)
nets, h = self.forward(x, dropout_prob)
if self.loss_fn == 'mle':
loss = cross_entropy(y, h)
else:
loss=cross_entropy(y,h)
epoch_loss += loss
grads = self.backward(nets, y, h, dropout_prob)
self.params = self.optimizer.optimize(self.hidden_num, self.params, grads, self.batch_size)
if step % 100 == 0:
print("epoch:{} step:{} loss:{:.4f}".format(epoch,step,loss))
train_loss.append(epoch_loss)
print("*"*50)
print("epoch_loss:",epoch_loss)
dataloader.restart()
# 验证部分一个
epoch_loss_test = 0.0
# for step, (x, y) in enumerate(valid_loader):
# x = x.reshape(-1, 32 * 32)
# y = onehot(y, 10)
# nets, h = self.forward(x, dropout_prob)
# loss = cross_entropy(y, h)
#
# epoch_loss_test += loss
# test_loss.append(epoch_loss_test)
# valid_loader.restart()
loss_all = {'train_loss': train_loss, 'valid_loss': test_loss}
return loss_all
def predict(self,data_loader,bn=False):
labels=[]#真实的标签值
h=[]#预测值
losses=0
for (x,y) in data_loader:
x=x.reshape(-1,32*32)#将数据展开成为1维向量
y=onehot(y,10)#标签,输出为10类
if bn:
# TODO:正则化层
_,out=self.forward(x,'test')
else:
_,out=self.forward(x)
loss=cross_entropy(y,out)
losses+=loss
out=list(np.argmax(out,axis=-1).flatten())
y=list(np.argmax(y,axis=1).flatten())
labels+=y
h+=out
return np.array(h).astype('int'), np.array(labels).astype('int')
loss.py
###########################################
# 多分类的损失函数
# (1)交叉熵损失函数
# 其他损失函数
# (2)mle损失函数
#
###########################################
import numpy as np
def loss_error(y,h):
# 交叉熵损失
"""onehot编码,y真实值,h预测值"""
if y.ndim==1 or h.ndim==1:
y=y.reshape(1,-1)
h=h.reshape(1,-1)
return y*np.log(h)
# 交叉熵损失函数
def cross_entropy(y,h):
"""onehot编码,y真实值,h预测值"""
if y.ndim==1 or h.ndim==1:
y=y.reshape(1,-1)
h=h.reshape(1,-1)
return -np.sum(y*np.log(h+1e-5))/y.shape[0]
def mle(y,h):
if y.ndim==1 or h.ndim==1:
y=y.reshape(1,-1)
h=h.reshape(1,-1)
return np.sum(h-y)
optimizers.py
########################################################################################
# 优化器:
# 一直都是直接调用库函数,从来没有注意过具体实现,感谢这次有机会实现:
# 深度学习流行优化方式公式算法见此专栏:https://zhuanlan.zhihu.com/p/90169812
# SGD
# * Adam(目前最流行的方式)
# Ada
# RMSProp
#######################################################################################
import numpy as np
np.random.seed(2021)
class SGD(object):
def __init__(self,lr=0.001):
self.lr=lr
def optimize(self,weight_num,params,grad,batch_size):
"""随机梯度下降,按照批次更新权重"""
for i in range(1,weight_num+1):
params['w'+str(i)]-=self.lr*grad['dw'+str(i)]/batch_size
return params
class SGD_M(object):
"""
带动量的SGD
"""
def __init__(self,lr=0.001):
pass
def optimize(self,weight_num,params,grad,batch_size,bn=False):
pass
class Adam(object):
def __init__(self,lr=0.0001,beta1=0.9,beta2=0.999,epsilon=1e-8,weights=None,weight_num=None):
"""
Adam优化器
:param lr:
:param beta1:
:param beta2:
:param epsilon:
:param weights:
:param weight_num:
"""
self.lr=lr
self.t=0
self.beta1=beta1
self.beta2=beta2
self.epsilon=epsilon
self.m=dict()
self.v=dict()
self.r=dict()
for i in range(1,weight_num+1):
self.m['m'+str(i)]=np.zeros(shape=weights['w'+str(i)].shape)
self.v['v'+str(i)]=np.zeros(shape=weights['w'+str(i)].shape)
def optimize(self,weight_num,params,grads,batch_size=64):
"""
Adam优化的方式:
:param weight_num:
:param params:
:param grads:
:param batch_size:
:return:
"""
self.t+=1
for i in range(1,weight_num+1):
w=params['w'+str(i)]
g=grads['dw'+str(i)]/batch_size
self.m['m'+str(i)]=self.beta1*self.m['m'+str(i)]+(1-self.beta1)*g
self.v['v'+str(i)]=self.beta2*self.v['v'+str(i)]+(1-self.beta2)*(g**2)
m_hat=self.m['m'+str(i)]/(1-self.beta1**self.t)
v_hat=self.v['v'+str(i)]/(1-self.beta2**self.t)
w=w-self.lr*m_hat/(np.sqrt(v_hat)+self.epsilon)
params['w'+str(i)]=w # 更新第i层的权重
return params
class AdaGrad(object):
def __init__(self,lr=0.0001,epsilon=1e-7,weights=None,weight_num=None):
self.epsilon=epsilon# 全局最小
self.lr=lr
self.r=dict()
for i in range(1,weight_num+1):
self.r['r'+str(i)]=np.zeros(shape=weights['w'+str(i)].shape)
def optimize(self,weight_num,params,grads,batch_size=64):
for i in range(1,weight_num+1):
w=params['w'+str(i)]
g=grads['dw'+str(i)]/batch_size
# 积累平方梯度
self.r['r'+str(i)]+=g*g
dw=g*self.lr/(self.epsilon+np.sqrt(self.r['r'+str(i)]))
params['w'+str(i)]-=dw
return params
class RMSProp(object):
def __init__(self,lr=0.0001,epsilon=1e-7,mu=0.9,weights=None,weight_num=None):
self.epsilon=epsilon# 全局最小
self.lr=lr
self.mu=mu # 衰减速率
self.r=dict()
for i in range(1,weight_num+1):
self.r['r'+str(i)]=np.zeros(shape=weights['w'+str(i)].shape)
def optimize(self,weight_num,params,grads,batch_size=64):
for i in range(1,weight_num+1):
w=params['w'+str(i)]
g=grads['dw'+str(i)]/batch_size
# 积累平方梯度-----与AdaGrad唯一的区别
self.r['r'+str(i)]=self.r['r'+str(i)]*self.mu+(1-self.mu)*g*g
dw=g*self.lr/(self.epsilon+np.sqrt(self.r['r'+str(i)]))
params['w'+str(i)]-=dw
return params
activations.py
#################################
# 激活函数
# tanh,softmax
#################################
import numpy as np
def sigmoid(x):
return 1/(1+np.exp(-x))
def sigmoid_gradient(x):
"""对于sigmoid函数求导"""
return sigmoid(x)*(1-sigmoid(x))
def tanh(x):
"""tanh激活函数"""
return np.tanh(x)
def tanh_gradient(x):
"""对于tanh求导"""
return 1-tanh(x)**2
def softmax(x):
"""softmax激活函数"""
# 将输出变成概率,归一化
if x.ndim==1:
x=x.reshape(1,-1)#求转置
f1=lambda x:np.exp(x-np.max(x))
f2=lambda x:x/np.sum(x)
#TODO: 沿着行的维度应用上述的变化
x=np.apply_along_axis(f1,axis=1,arr=x)
x=np.apply_along_axis(f2,axis=1,arr=x)
return x
def softmax_gradient(x,label):
"""softmax梯度:s-y"""
return softmax(x)-label
initializers.py
##############################################
# 初始化的一些函数
# (1)xavier
# (2)zero
##############################################
import numpy as np
#固定随机化种子
np.random.seed(2021)
def xavier(num_layers,units_list):
"""
保证参数服从[-a,a]的均匀分布,其中a为sqrt(6)/sqrt(n_in+n_out)
:param num_layers:
:param units_list:
:return:
"""
params={}#每一层的参数用字典
for layer in range(1,num_layers):
# 第一层是输入层,没有参数,之后每一层都有参数
a=np.sqrt(6)/np.sqrt(units_list[layer-1]+units_list[layer])
params['w'+str(layer)]=np.random.uniform(-a,a,size=(units_list[layer],units_list[layer-1]))
params['gamma'+str(layer)]=np.ones(shape=(1,units_list[layer]))#偏置项
params['beta'+str(layer)]=np.zeros(shape=(1,units_list[layer]))
return params
def zeros(num_layers,units_list):
"""
全零初始化
:param num_layers:
:param units_list:
:return:
"""
params = {} # 每一层的参数用字典
for layer in range(1, num_layers):
params['w' + str(layer)] = np.zeros(shape=(units_list[layer], units_list[layer - 1]))
return params
4.3 优化的一些idea
- 本来想从数据入手的,但考虑课程性质还是从算法本身切入,因为是机器学习,所以上课老师并没有讲深度学习的一些算法,正好可以“钻空子”,手写了一下adam、ada等等依次尝试代替原本的sgd取得了还阔以的实验结果,最后魔改了一下四舍五入算是自己的“优化算法”啦,论
懒货的自我修养
如果你对于这些算法不了解,传送门在这里 - 关于数据的话图片特征提取的算法可以试一下,还有图片增强的一些trick,但是咱本身数据就已经60000了,清晰度也不高,感觉效果应该不会很好就搁置了这个尝试的想法
############################################################################
# 优化器:
# 一直都是直接调用库函数,从来没有注意过具体实现,感谢这次有机会实现:
# 深度学习流行优化方式公式算法见此专栏:https://zhuanlan.zhihu.com/p/90169812
# SGD
# * Adam(目前最流行的方式)
#
############################################################################
import numpy as np
np.random.seed(2021)
class SGD(object):
def __init__(self,lr=0.001):
self.lr=lr
def optimize(self,weight_num,params,grad,batch_size,bn=False):
"""随机梯度下降,按照批次更新权重"""
for i in range(1,weight_num):
params['w'+str(i)]-=self.lr*grad['dw'+str(i)]/batch_size
if bn:
params['gamma'+str(i)]-=self.lr*grad['dgamma'+str(i)]/batch_size
params['beta'+str(i)]-=self.lr*grad['dbeta'+str(i)]/batch_size
return params
class Adam(object):
def __init__(self,lr=0.0001,beta1=0.9,beta2=0.999,epsilon=1e-8,weights=None,weight_num=None):
"""
Adam优化器
:param lr:
:param beta1:
:param beta2:
:param epsilon:
:param weights:
:param weight_num:
"""
self.lr=lr
self.t=0
self.beta1=beta1
self.beta2=beta2
self.epsilon=epsilon
self.m=dict()
self.v=dict()
for i in range(1,weight_num+1):
self.m['m'+str(i)]=np.zeros(shape=weights['w'+str(i)].shape)
self.v['v'+str(i)]=np.zeros(shape=weights['w'+str(i)].shape)
def optimize(self,weight_num,params,grads,batch_size=64):
"""
Adam优化的方式:
:param weight_num:
:param params:
:param grads:
:param batch_size:
:return:
"""
self.t+=1
for i in range(1,weight_num+1):
w=params['w'+str(i)]
g=grads['dw'+str(i)]/batch_size
self.m['m'+str(i)]=self.beta1*self.m['m'+str(i)]+(1-self.beta1)*g
self.v['v'+str(i)]=self.beta2*self.v['v'+str(i)]+(1-self.beta2)*(g**2)
m_hat=self.m['m'+str(i)]/(1-self.beta1**self.t)
v_hat=self.v['v'+str(i)]/(1-self.beta2**self.t)
w=w-self.lr*m_hat/(np.sqrt(v_hat)+self.epsilon)
params['w'+str(i)]=w # 更新第i层的权重
return params
class AdaGrad(object):
def __init__(self,lr=0.0001,epsilon=1e-7,weights=None,weight_num=None):
self.epsilon=epsilon# 全局最小
self.lr=lr
for i in range(1,weight_num+1):
self.r['r'+str(i)]=np.zeros(shape=weights['w'+str(i)].shape)
def optimize(self,weight_num,params,grads,batch_size=64):
for i in range(1,weight_num+1):
w=params['w'+str(i)]
g=grads['dw'+str(i)]/batch_size
# 积累平方梯度
self.r['r'+str(i)]+=g*g
dw=g*self.lr/(self.epsilon+np.sqrt(self.r['r'+str(i)]))
params['w'+str(i)]-=dw
return params
class RMSProp(object):
def __init__(self,lr=0.0001,epsilon=1e-7,mu=0.9,weights=None,weight_num=None):
self.epsilon=epsilon# 全局最小
self.lr=lr
self.mu=mu # 衰减速率
for i in range(1,weight_num+1):
self.r['r'+str(i)]=np.zeros(shape=weights['w'+str(i)].shape)
def optimize(self,weight_num,params,grads,batch_size=64):
for i in range(1,weight_num+1):
w=params['w'+str(i)]
g=grads['dw'+str(i)]/batch_size
# 积累平方梯度-----与AdaGrad唯一的区别
self.r['r'+str(i)]=self.r['r'+str(i)]*self.mu+(1-self.mu)*g*g
dw=g*self.lr/(self.epsilon+np.sqrt(self.r['r'+str(i)]))
params['w'+str(i)]-=dw
return params
4.4实验结果
- 效率上还有提升空间,但是基本可以与torch框架做出来的差不多啦,
- 自己尝试的话,隐层节点还是设置得大一些会好,学习率可以适当调高一点,亲测有效😃
- 因为可变参数还是蛮多的,
时间问题还有放假在家比较懒所以很多情况没有做消融实验,我自己尝试最高正确率是40%+,结点变少+dropout=0.3在大多数情况下效果是30%左右,看损失图应该还没有完全收敛,可以试着改lr或者epoch~因为确实跑一次很慢…… - 提醒一下main中的保存实验数据写得不是很好,读者可以自己加一下专门处理实验参数和对应结果的part,实现结果与models自动收割,我自己在这上面吃亏蛮大的,导致一些比较好的models参数没有保存,后来就难复现🐸
- loss那边应该是两个写混了,有点bug,这个坑之后自己良心发现再填叭,希望没被改作业的老师发现🤪
T2:基于torch框架实现FNN
范式,注意一下规范和数据的读取就好
不过偷懒直接用了包里自带的数据,和老师发的其实是一样的,由于torch的加速所以可以同时尝试灰度数据与彩色数据的正确率,后者正确率高一点50%+
###########################################################
# 基于torch的FNN
# 修改层数,结点
# 修改优化器,dropout
############################################################
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
import torch.optim as optim
import torch.nn as nn
import datetime
# 可视化与进度条
import json
from tqdm import tqdm
import matplotlib.pyplot as plt
import argparse
plt.rcParams['font.sans-serif'] = ['SimHei'] # 指定默认字体
plt.rcParams['axes.unicode_minus']=False # 正常显示负号
# 定义BP神经网络
class FNN(nn.Module):
"""初始化参数"""
def __init__(self,dropout=0.1,hidden_nodes=2048,numlayers=2,activate='relu'):
'''三层网络,四层网络'''
super(FNN,self).__init__()
self.numlayers=numlayers
self.activate=activate
# 定义不同的层数
if numlayers==3:
self.fc1 = nn.Linear(32 * 32 * 3, hidden_nodes)
self.fc2 = nn.Linear(hidden_nodes,hidden_nodes)
self.fc3 = nn.Linear(hidden_nodes, 10)
elif numlayers==2:
self.fc1 = nn.Linear(32 * 32 * 3, hidden_nodes)
self.fc2 = nn.Linear(hidden_nodes, 10)
else:
RuntimeError("numlayers must choose from 2 or 3")
if dropout !=0:
# 设置dropout
self.dropout=nn.Dropout(dropout)
else:
self.dropout=None
def forward(self,x):
x=x.view(-1,32*32*3)# 展平处理
if self.numlayers==3:
x=self.fc1(x)
if self.dropout is not None:
x=self.dropout(x)
x = F.relu(x)
x=self.fc2(x)
if self.dropout is not None:
x=self.dropout(x)
x = F.relu(x)
x = self.fc3(x)
else:
x=self.fc1(x)
if self.dropout is not None:
x=self.dropout(x)
x = F.relu(x)
x = self.fc2(x)
return x
def test_FNN(lr=0.001,EPOCH=1,BATCH_SIZE=4,hidden_nodes=2048,num_layers=2,dropout=0.1,optimizer='sgd'):
'''Step 1: 加载本地的cifar数据集&数据预处理'''
transform = transforms.Compose([transforms.Grayscale(num_output_channels=3),#去掉的话是彩色数据
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])]
)
# 划分训练集
trainset = torchvision.datasets.CIFAR10(root='D:/homework_for_ML/task4/data', train=True,
download=False, transform=transform)
trainloader = torch.utils.data.DataLoader(
trainset, batch_size=BATCH_SIZE, shuffle=True, num_workers=1
)
# 划分测试集
testset = torchvision.datasets.CIFAR10(root='D:/homework_for_ML/task4/data', train=False,
download=False, transform=transform)
testloader = torch.utils.data.DataLoader(
testset, batch_size=BATCH_SIZE, shuffle=False, num_workers=1
)
print("加载数据集结束~")
# 类别标签的定义
classes = ('plane', 'car', 'bird', 'cat',
'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
net=FNN(hidden_nodes=hidden_nodes,numlayers=num_layers,dropout=dropout)
if optimizer=='sgd':
optimizer=optim.SGD(net.parameters(),lr=lr,momentum=0.9)
if optimizer=='adam':
optimizer=optim.Adam(net.parameters(),lr=lr)
if optimizer=='resprop':
optimizer=optim.rmsprop.RMSprop(net.parameters())
else:
optimizer=optim.Adagrad(net.parameters())
loss_func=torch.nn.CrossEntropyLoss()#采用的是交叉熵损失
all_loss=[]
'''Step2:训练'''
for epoch in range(EPOCH):
cur_loss=0.0
print("\n第",epoch+1,"次训练")
for step,data in enumerate(tqdm(trainloader)):
bx,by=data # 特征bx 标签by
outputs=net.forward(bx)
loss=loss_func(outputs,by)
optimizer.zero_grad()# 梯度清零
loss.backward()
optimizer.step()
cur_loss+=loss.item()
if step%100==0 and step!=0:
# 显示损失
#print("当前损失:",cur_loss/100)
all_loss.append(cur_loss/100)
cur_loss=0.0
print("结束训练")
#显示损失曲线
plt.plot(all_loss)
filename = "./log/" + "BPNN-" + datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + '.jpg'
plt.savefig(filename)
'''Step3:预测'''
dataiter=iter(trainloader)
images,labels=dataiter.next()
predicts=net.forward(images)
correct=0
total=0
with torch.no_grad():
for (images,labels) in testloader:
#print("images",images)
#print("labels",labels)
outputs=net(images)
numbers,predicted=torch.max(outputs.data,1)
total+=labels.size(0)
correct+=(predicted==labels).sum().item()
print("*"*50)
print('FNN测试集的正确率为:{:.2f}%'.format(100*correct/total))
return correct*1.0/total
#测试
if __name__ == '__main__':
# 命令行调用
parser = argparse.ArgumentParser()
parser.add_argument("--batch_size", default=32, type=int)
parser.add_argument("--epoch", default=1, type=int)
parser.add_argument("--num_layers", default=2,help="2 or 3", type=int)
parser.add_argument("--hidden_nodes", default=1024, type=int)
parser.add_argument("--dropout", default=0.1, type=float)
parser.add_argument("--optimizer", default="adam", type=str)
parser.add_argument("--lr", default=0.001, type=float)
parser.add_argument("--loss_func", default="cross_entropy", type=str)
parser.add_argument("--acc", default=0.0, type=float)
args = parser.parse_args()
# 实验模块
print("*" * 50)
args.acc =test_FNN(EPOCH=args.epoch,BATCH_SIZE=args.batch_size,lr=args.lr,hidden_nodes=args.hidden_nodes,dropout=args.dropout,num_layers=args.num_layers,optimizer=args.optimizer)# 可以手动设置迭代次数以及学习率
# 记录实验设置的参数以及结果
with open('./log/bp_nn_args.txt', 'a') as f:
json.dump(args.__dict__, f, indent=2)
print('参数保存成功')
T3:基于torch框架实现CNN
同上,就是多了卷积和池化的“积木”
"""
基于Torch的CNN模型
超参数说明:
(1)batchsize:设置大一点是为了跑得快一些
(2)dropout:防止过拟合
(3)优化器:SGD以及Adam等
(4)loss记录放在./log文件夹下
"""
import numpy as np
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
import torch.optim as optim
import torch.nn as nn
import argparse
import datetime
# 可视化与进度条
import json
from tqdm import tqdm
import matplotlib.pyplot as plt
class CNN(nn.Module):
"""初始化参数"""
def __init__(self,stride=1,padding=0,dropout=0.1):
super(CNN,self).__init__()
self.conv1=torch.nn.Conv2d(in_channels=3,
out_channels=64,
kernel_size=5,
stride=stride,
padding=padding)
self.pool=torch.nn.MaxPool2d(kernel_size=3,
stride=stride*2)
self.conv2=torch.nn.Conv2d(64,64,5)
self.fc1=torch.nn.Linear(64*4*4,384)
self.fc2=torch.nn.Linear(384,192)
self.fc3=torch.nn.Linear(192,10)
if dropout !=0:
# 设置dropout
self.dropout=nn.Dropout(dropout)
else:
self.dropout=None
def forward(self,x):
x=self.pool(F.relu(self.conv1(x)))
x=self.pool(F.relu(self.conv2(x)))
x=x.view(-1,64*4*4)
x=self.fc1(x)
if self.dropout is not None:
x = self.dropout(x)
x=F.relu(x)
x=self.fc2(x)
if self.dropout is not None:
x = self.dropout(x)
x=F.relu(x)
x=self.fc3(x)
return x
def test_CNN(stride=1,padding=0,lr=0.001,EPOCH=1,BATCH_SIZE=4,dropout=0.1):
'''Step 1: 加载本地的cifar数据集&数据预处理'''
# If num_output_channels == 3 : returned image is 3 channel with r == g == b
transform = transforms.Compose([transforms.Grayscale(num_output_channels=3),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])]
)
# 划分训练集
trainset = torchvision.datasets.CIFAR10(root='D:/homework_for_ML/task4/data', train=True,
download=False, transform=transform)
trainloader = torch.utils.data.DataLoader(
trainset, batch_size=BATCH_SIZE, shuffle=True, num_workers=1
)
# 划分测试集
testset = torchvision.datasets.CIFAR10(root='D:/homework_for_ML/task4/data', train=False,
download=False, transform=transform)
testloader = torch.utils.data.DataLoader(
testset, batch_size=BATCH_SIZE, shuffle=False, num_workers=1
)
print("加载数据集结束~")
# 类别标签的定义
classes = ('plane', 'car', 'bird', 'cat',
'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
net=CNN(stride=stride,padding=padding)
optimizer=optim.SGD(net.parameters(),lr=lr,momentum=0.9)
loss_func=torch.nn.CrossEntropyLoss()#采用的是交叉熵损失
all_loss=[]
'''Step2:训练'''
for epoch in range(EPOCH):
cur_loss=0.0
print("第",epoch+1,"次训练")
for step,data in enumerate(tqdm(trainloader)):
bx,by=data # 特征bx 标签by
outputs=net.forward(bx)
loss=loss_func(outputs,by)
optimizer.zero_grad()# 梯度清零
loss.backward()
optimizer.step()
cur_loss+=loss.item()
if step%100==0 and step!=0:
# 显示损失
#print("当前损失:",cur_loss/100)
all_loss.append(cur_loss/100)
cur_loss=0.0
print("结束训练")
#显示损失曲线
plt.plot(all_loss)
#plt.show()
filename = "./log/" + "CNN-" + datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + '.jpg'
plt.savefig(filename)
'''Step3:预测'''
correct=0
total=0
step=0
with torch.no_grad():
for (images,labels) in testloader:
outputs=net(images)
# 测试
step+=1
numbers, predicted = torch.max(outputs.data, 1)
if step%100==0:
imshow(torchvision.utils.make_grid(images),predicted,labels)
total+=labels.size(0)
correct+=(predicted==labels).sum().item()
print("*"*50)
print('CNN测试集的正确率为:{:.2f}%'.format(100*correct/total))
return 1.0*correct/total
def imshow(img,labels,real_labels=None):
"""
:param img:
:return:
"""
classes = ('plane', 'car', 'bird', 'cat',
'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
img=img/2+0.5
npimg=img.numpy()
plt.imshow(np.transpose(npimg,(1,2,0)))
print(' '.join('%5s' % classes[labels[j]] for j in range(16)))
print(' '.join('%5s' % classes[real_labels[j]] for j in range(16)))
plt.show()
def show():
classes = ('plane', 'car', 'bird', 'cat',
'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
# 加载数据集合
transform = transforms.Compose([
transforms.Grayscale(num_output_channels=3),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
]
)
testset = torchvision.datasets.CIFAR10(root='D:/homework_for_ML/task4/data', train=False,
download=False, transform=transform)
testloader = torch.utils.data.DataLoader(
testset, batch_size=16, shuffle=False, num_workers=1
)
dataiter = iter(testloader)
images, labels = dataiter.next()
print(' '.join('%5s' % classes[labels[j]] for j in range(16)))
imshow(torchvision.utils.make_grid(images),labels)
#测试
if __name__ == '__main__':
# 显示图片
#show()
# 命令行调用
parser=argparse.ArgumentParser()
parser.add_argument("--batch_size", default=64, type=int)
parser.add_argument("--epoch", default=1, type=int)
parser.add_argument("--dropout", default=0, type=float)
parser.add_argument("--lr", default=0.001, type=float)
parser.add_argument("--stride",default=1,type=int)
parser.add_argument("--padding",default=0,type=int)
parser.add_argument("--acc",default=0,type=float)
args = parser.parse_args()
acc=test_CNN(lr=args.lr,BATCH_SIZE=args.batch_size,EPOCH=args.epoch,dropout=args.dropout)
args.acc=acc
# 记录实验设置的参数以及结果
with open('./log/cnn_args.txt', 'a') as f:
json.dump(args.__dict__, f, indent=2)
print('参数保存成功')
T4:基于LibSVM实现SVM
4.1实验过程
- 首先,安装libsvm,命令行直接pip就好,网上的一些教程是面向C和matlab的,不要踩坑~
- 接着,数据预处理,< label > < index1 >:< value1 > < index2 >:< value2 >
- 直接调用相关命令就好,注意看一下官网和博客样例
- 建议存一下效果最好的模型,因为跑一次要很久,还没法加速就难顶
一份实例代码
############################################
# 测试LibSvm在图像分类上的表现
#
#---------------------------------------------------------
# 参数说明
#---------------------------------------------------------
#-s svm_type : set type of SVM (default 0)
# 0 -- C-SVC
# 1 -- nu-SVC
# 2 -- one-class SVM
# 3 -- epsilon-SVR
# 4 -- nu-SVR
#-t kernel_type : set type of kernel function (default 2)
# 0 -- linear: u'*v
# 1 -- polynomial: (gamma*u'*v + coef0)^degree
# 2 -- radial basis function: exp(-gamma*|u-v|^2)
# 3 -- sigmoid: tanh(gamma*u'*v + coef0)
#-h shrinking: whether to use the shrinking heuristics, 0 or 1 (default 1)
#是否使用收缩试探法
#
############################################
from libsvm.svm import *
from libsvm.svmutil import *
from libsvm.commonutil import *
import argparse
# 数据集加载
train_label,train_value=svm_read_problem('./data/gray_scale_train_svm.txt') #改变了函数的存放位置:target只有一个
test_label,test_value=svm_read_problem('./data/gray_scale_test_svm.txt')
print('*'*50)
print("数据集加载完成")
# 模型加载
def Exam(t,c):
model=svm_train(train_label,train_value,t+' '+c)
print('*' * 50)
print("训练完成")
# 正确率的检验
print('*' * 50)
print("*"*50)
print("test:")
print(t,c)
p_label, p_acc, p_val = svm_predict(train_label, train_value, model)
print(p_acc)
p_label, p_acc, p_val = svm_predict(test_label, test_value, model)
print(p_acc)
if __name__=='__main__':
# 首先是线性核函数
#Exam('-t 0','-c 10')
#Exam('-t 0','-c 1000000000')
# polynominal核函数
#Exam('-t 1', '-c 10')
#Exam('-t 1', '-c 1000000000')#软间隔设置很小
# radial核函数
#Exam('-t 2', '-c 10')
#Exam('-t 2', '-c 10')#test: 38.52%
#Exam('-t 2', '-c 100')
# sigmoid核函数
#Exam('-t 3', '-c 10')
#Exam('-t 3', '-c 10000000')
# 首先是线性核函数
parser=argparse.ArgumentParser()
parser.add_argument("--t",default="2",help="0:linear, 1:polynominal, 2:radial, 3:sigmoid")
parser.add_argument("--c",default="1000",help="0->1000000:hard->soft")
parser.add_argument("--save",default=False,help="save or not",type=bool)
args=parser.parse_args()
Exam("-t "+args.t,"-c "+args.c)
关于数据处理:
没有贴上所有的代码,因为我自己做的时候是从txt->txt,效率不是很高,中间生成的temp文件很杂,其实有更好的做法的~
def covert(src_file,target_file):
'''
+1 4:-0.320755
-1 1:0.583333 2:-1 3:0.333333
+1 1:0.166667 2:1 3:-0.333333 4:-0.433962
-1 1:0.458333 3:1 4:-0.358491
'''
# 将指定的数据转换成目标文件
# read data file
readin = open(src_file, 'r')
# write data file
output = open(target_file, 'w')
try:
the_line = readin.readline()
while the_line:
# delete the \n
the_line = the_line.strip('\n')
index = 0
output_line = ''
for sub_line in the_line.split(' '):
# the label col
if index == 0:
output_line = sub_line
# the features cols
if sub_line != 'NULL' and index != 0:
the_text = ' ' + str(index) + ':' + sub_line
output_line = output_line + the_text
index = index + 1
output_line = output_line + '\n'
output.write(output_line)
the_line = readin.readline()
finally:
readin.close()
4.2 实验结果
数据一定要归一化,尤其是svm,不归一化的话,你会训练两个晚上然后收获一枚“人工智障”,
.官方给的解释是:缩放的主要优点是避免较大数值范围内的属性支配较小数值范围内的属性。另一个优点是在计算过程中避免数值困难。由于核值通常依赖于特征向量的内积,例如线性核和多项式核,因此较大的属性值可能会导致数值问题。建议将每个属性线性缩放到范围[-1;+1]或[0;1]。
目前我尝试过的核函数里面最好的也是用的最多的==RBF核==,在归一化后的灰度数据的cifar10数据上的正确率可以冲到==40%+==,应该经过特征提取后可以用svm达到更好的效果,但是这部分没有继续深入了(可以之后有兴趣做下图像特征提取的算法)
线性核函数跑得很慢,效果没有前者好
不过目前尝试下来,sigmoid核好像是表现最不理想的~虽然速度还是比较快的(相对于前两者跑一整天天而言)