构建数据加载器
import os.path
import re
import torch
from torch.utils.data import DataLoader, Dataset
from lib import ws, max_len, batch_size
train_data_path=r'E:\aclImdb\train'
test_data_path=r'E:\aclImdb\test'
def tokenlie(content):
content=re.sub('<.*?>','',content)
filter=['\.','\t','\n','\x97','\x96','#','$','%','^','&']
content=re.sub('|'.join(filter),'',content)
tokens=[i.strip().lower() for i in content.split()]
return tokens
def collate_fn(batch):
content,label=list(zip(*batch))
content=[ws.transform(i,max_len=max_len) for i in content]
content=torch.LongTensor(content)
label=torch.LongTensor(label)
#label=torch.tensor([[label]])
return content,label
class ImdbDataset(Dataset):
def __init__(self,train=True):
self.train_data_path=train_data_path
self.test_data_path=test_data_path
data_path=self.train_data_path if train else self.test_data_path
temp_data_path=[os.path.join(data_path,'pos'),os.path.join(data_path,'neg')]
self.total_file_path=[]
for path in temp_data_path:
file_name_list=os.listdir(path)
file_path_list=[os.path.join(path,i) for i in file_name_list if i.endswith('.txt')]
self.total_file_path.extend(file_path_list)
def __getitem__(self, index):
file_path=self.total_file_path[index]
label_str=file_path.split('\\')[-2]
label=0 if label_str=='neg' else 1
tokens=tokenlie(open(file_path,encoding='utf-8').read())
return tokens,label
def __len__(self):
return len(self.total_file_path)
def get_dataloader(train=True,batch=batch_size ):
imdb_dataset = ImdbDataset()
data_loader = DataLoader(imdb_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
return data_loader
if __name__ == '__main__':
for idx,(input,target) in enumerate(get_dataloader(train=True)):
print(idx,input,target)
break
将文本转换为数据
class Word2Sequence():
UNK_TAG='UNK'
PAD_TAG='PAD'
UNK=0
PAD=1
def __init__(self):
self.dict={
self.UNK_TAG:self.UNK,
self.PAD_TAG:self.PAD
}
self.count={}
def fit(self,sentence):
for word in sentence:
self.count[word]=self.count.get(word,0)+1
def bulid_vocab(self,min=None,max=None,max_futures=None):
if min is not None:
self.count={word:value for word,value in self.count.items() if value>min}
if max is not None:
self.count = {word: value for word, value in self.count.items() if value <max}
if max_futures is not None:
temp=sorted(self.count.items(),key=lambda x:x[-1],reverse=True)[:max_futures]
self.count=dict(temp)
for word in self.count:
self.dict[word]=len(self.dict)
self.inverse_dict=dict(zip(self.dict.values(),self.dict.keys()))
def transform(self,sentense,max_len=None):
if max_len is not None:
if max_len>len(sentense):
sentense=sentense+[self.PAD_TAG]*(max_len-len(sentense))
if max_len<len(sentense):
sentense=sentense[:max_len]
return [self.dict.get(word,self.UNK) for word in sentense]
def inverse_transform(self,indices):
return [self.inverse_dict.get(idx) for idx in indices]
def __len__(self):
return len(self.dict)
保存转换为数据的文本
import os
import pickle
from tqdm import tqdm
from dataset import tokenlie
from word_sequence import Word2Sequence
if __name__ == '__main__':
ws=Word2Sequence()
path=r'E:\aclImdb\train'
temp_data_path=[os.path.join(path,'pos'),os.path.join(path,'neg')]
for data_path in temp_data_path:
file_paths=[os.path.join(data_path,file_name) for file_name in os.listdir(data_path) if file_name.endswith('txt')]
for file_path in tqdm(file_paths):
sentence=tokenlie(open(file_path,encoding='utf-8').read())
ws.fit(sentence)
ws.bulid_vocab(min=10,max_futures=10000)
pickle.dump(ws, open('../data/model/情感分类/ws.pkl', 'wb'))
print(len(ws))
配置文件
import pickle
import torch
ws=pickle.load(open('../../data/model/情感分类/ws.pkl', 'rb'))
max_len=200
batch_size=1024
hidden_size=128
num_layers=2
bidirectional=True
dropout=0.4
device=torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
#print(len(ws))
建立神经网络模型
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from tqdm import tqdm
from dataset import get_dataloader
from lib import ws, max_len,hidden_size,num_layers,bidirectional,dropout,device
class Mymodel(nn.Module):
def __init__(self):
super(Mymodel, self).__init__()
#词语的数量,维度
self.embedding=nn.Embedding(len(ws),300)
self.lstm=nn.LSTM(input_size=300,hidden_size=hidden_size,num_layers=num_layers,
batch_first=True,bidirectional=bidirectional,dropout=dropout)
self.fc=nn.Linear(hidden_size*2,2)
def forward(self,input):
'''
:param input: [batch_size,max_len]
:return:
'''
x=self.embedding(input)#形状:[batch_size,max_len(行),100(列)]
#print(f'x:{x}')
#x [batch_size,max_len,num_layers*hidden_size],h_n [2*num_layers,batch_size,hidden_size]
x,(h_n,c_n)=self.lstm(x)
#获取两个方向最后一次的output,进行concat
output_fw=h_n[-2,:,:]#正向最后一次的输出
output_bw=h_n[-1,:,:]#反向最后一次的输出
output=torch.concat([output_fw,output_bw],dim=-1)#[batch_size,hidden_size*2]
out=self.fc(output)
#print(f'out:{out}')
#计算概率
print(F.softmax(out, dim=-1))
#print(F.log_softmax(out, dim=-1))
# a=list(map(fy,F.log_softmax(out,dim=-1).max(dim=1)[-1]))
# print(a)
#print(F.log_softmax(out,dim=-1))
#print('类别:',F.log_softmax(out,dim=-1).max(dim=1)[-1])
return F.log_softmax(out,dim=-1)
def fy(a):
if a==1:
return '积极'
else:
return '消极'
model=Mymodel().to(device)
#model.load_state_dict(torch.load('../../data/model/情感分类/model.pt'))
optimizer=Adam(model.parameters(),0.01)
#optimizer.load_state_dict(torch.load('../../data/model/情感分类/model_optimizer.pt'))
def train(epoch):
for idx,(input,target) in enumerate(get_dataloader(train=True)):
# print(f'input:{input}')
# print(f'target:{target}')
input=input.to(device)
target=target.to(device)
optimizer.zero_grad()
output=model.forward(input)
#print(output)
loss=F.nll_loss(output,target)
loss.backward()
optimizer.step()
if epoch%2==0:
print('损失:',loss.item())
# print(list(model.parameters())[0])
# print(list(model.parameters())[0].size())
# print(list(model.parameters())[1])
# print(list(model.parameters())[1].size())
# print(list(model.parameters())[2])
# print(list(model.parameters())[2].size())
def test():
loss_list=[]
acc_list=[]
mode = False
model.eval()
test_dataloader = get_dataloader(train=mode,batch=1000)
for idx, (data, target) in enumerate(test_dataloader):
with torch.no_grad():
output=model(data)
cur_loss=F.nll_loss(output,target)
loss_list.append(cur_loss)
pred=output.max(dim=1)[-1]
cur_acc=pred.eq(target).float().mean()
acc_list.append(cur_acc)
print(f"平均准确率:{np.mean(acc_list)}\n平均损失:{np.mean(loss_list)}")
if __name__ == '__main__':
for i in tqdm(range(1)):
train(i)
# torch.save(model.state_dict(),'../data/model/情感分类/model.pt')
# torch.save(optimizer.state_dict(),'../data/model/情感分类/model_optimizer.pt')
#test()
文章出处登录后可见!
已经登录?立即刷新