使用循环神经网络进行自然语言处理

构建数据加载器

import os.path
import re

import torch
from torch.utils.data import DataLoader, Dataset

from lib import ws, max_len, batch_size

train_data_path=r'E:\aclImdb\train'
test_data_path=r'E:\aclImdb\test'

def tokenlie(content):
    content=re.sub('<.*?>','',content)
    filter=['\.','\t','\n','\x97','\x96','#','$','%','^','&']
    content=re.sub('|'.join(filter),'',content)
    tokens=[i.strip().lower() for i in content.split()]
    return tokens

def collate_fn(batch):
    content,label=list(zip(*batch))
    content=[ws.transform(i,max_len=max_len) for i in content]
    content=torch.LongTensor(content)
    label=torch.LongTensor(label)
    #label=torch.tensor([[label]])
    return content,label

class ImdbDataset(Dataset):
    def __init__(self,train=True):
        self.train_data_path=train_data_path
        self.test_data_path=test_data_path
        data_path=self.train_data_path if train else self.test_data_path
        temp_data_path=[os.path.join(data_path,'pos'),os.path.join(data_path,'neg')]
        self.total_file_path=[]
        for path in temp_data_path:
            file_name_list=os.listdir(path)
            file_path_list=[os.path.join(path,i) for i in file_name_list if i.endswith('.txt')]
            self.total_file_path.extend(file_path_list)
    def __getitem__(self, index):
        file_path=self.total_file_path[index]
        label_str=file_path.split('\\')[-2]
        label=0 if label_str=='neg' else 1
        tokens=tokenlie(open(file_path,encoding='utf-8').read())
        return tokens,label
    def __len__(self):
        return len(self.total_file_path)

def get_dataloader(train=True,batch=batch_size ):
    imdb_dataset = ImdbDataset()
    data_loader = DataLoader(imdb_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    return data_loader


if __name__ == '__main__':
    for idx,(input,target) in enumerate(get_dataloader(train=True)):
        print(idx,input,target)
        break

将文本转换为数据

class Word2Sequence():
    UNK_TAG='UNK'
    PAD_TAG='PAD'
    UNK=0
    PAD=1
    def __init__(self):
        self.dict={
            self.UNK_TAG:self.UNK,
            self.PAD_TAG:self.PAD
        }
        self.count={}
    def fit(self,sentence):
        for word in sentence:
            self.count[word]=self.count.get(word,0)+1

    def bulid_vocab(self,min=None,max=None,max_futures=None):
        if min is not None:
            self.count={word:value for word,value in self.count.items() if value>min}
        if max is not None:
            self.count = {word: value for word, value in self.count.items() if value <max}
        if max_futures is not None:
            temp=sorted(self.count.items(),key=lambda x:x[-1],reverse=True)[:max_futures]
            self.count=dict(temp)
        for word in self.count:
            self.dict[word]=len(self.dict)
        self.inverse_dict=dict(zip(self.dict.values(),self.dict.keys()))

    def transform(self,sentense,max_len=None):
        if max_len is not None:
            if max_len>len(sentense):
                sentense=sentense+[self.PAD_TAG]*(max_len-len(sentense))
            if max_len<len(sentense):
                sentense=sentense[:max_len]
        return [self.dict.get(word,self.UNK) for word in sentense]
    def inverse_transform(self,indices):
        return [self.inverse_dict.get(idx) for idx in indices]
    def __len__(self):
        return len(self.dict)

保存转换为数据的文本

import os
import pickle

from tqdm import tqdm

from dataset import tokenlie
from word_sequence import Word2Sequence

if __name__ == '__main__':
    ws=Word2Sequence()
    path=r'E:\aclImdb\train'
    temp_data_path=[os.path.join(path,'pos'),os.path.join(path,'neg')]
    for data_path in temp_data_path:
        file_paths=[os.path.join(data_path,file_name) for file_name in os.listdir(data_path) if file_name.endswith('txt')]
        for file_path in tqdm(file_paths):
            sentence=tokenlie(open(file_path,encoding='utf-8').read())
            ws.fit(sentence)
    ws.bulid_vocab(min=10,max_futures=10000)
    pickle.dump(ws, open('../data/model/情感分类/ws.pkl', 'wb'))
    print(len(ws))

配置文件

import pickle

import torch

ws=pickle.load(open('../../data/model/情感分类/ws.pkl', 'rb'))

max_len=200
batch_size=1024
hidden_size=128
num_layers=2
bidirectional=True
dropout=0.4

device=torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
#print(len(ws))

建立神经网络模型

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from tqdm import tqdm

from dataset import get_dataloader
from lib import ws, max_len,hidden_size,num_layers,bidirectional,dropout,device


class Mymodel(nn.Module):
    def __init__(self):
        super(Mymodel, self).__init__()
        #词语的数量,维度
        self.embedding=nn.Embedding(len(ws),300)
        self.lstm=nn.LSTM(input_size=300,hidden_size=hidden_size,num_layers=num_layers,
                batch_first=True,bidirectional=bidirectional,dropout=dropout)
        self.fc=nn.Linear(hidden_size*2,2)
    def forward(self,input):
        '''

        :param input: [batch_size,max_len]
        :return:
        '''
        x=self.embedding(input)#形状:[batch_size,max_len(行),100(列)]
        #print(f'x:{x}')

        #x [batch_size,max_len,num_layers*hidden_size],h_n [2*num_layers,batch_size,hidden_size]
        x,(h_n,c_n)=self.lstm(x)

        #获取两个方向最后一次的output,进行concat
        output_fw=h_n[-2,:,:]#正向最后一次的输出
        output_bw=h_n[-1,:,:]#反向最后一次的输出
        output=torch.concat([output_fw,output_bw],dim=-1)#[batch_size,hidden_size*2]

        out=self.fc(output)
        #print(f'out:{out}')
        #计算概率
        print(F.softmax(out, dim=-1))
        #print(F.log_softmax(out, dim=-1))
        # a=list(map(fy,F.log_softmax(out,dim=-1).max(dim=1)[-1]))
        # print(a)
        #print(F.log_softmax(out,dim=-1))
        #print('类别:',F.log_softmax(out,dim=-1).max(dim=1)[-1])
        return F.log_softmax(out,dim=-1)
def fy(a):
    if a==1:
        return '积极'
    else:
        return '消极'
model=Mymodel().to(device)
#model.load_state_dict(torch.load('../../data/model/情感分类/model.pt'))
optimizer=Adam(model.parameters(),0.01)
#optimizer.load_state_dict(torch.load('../../data/model/情感分类/model_optimizer.pt'))
def train(epoch):
    for idx,(input,target) in enumerate(get_dataloader(train=True)):
        # print(f'input:{input}')
        # print(f'target:{target}')
        input=input.to(device)
        target=target.to(device)
        optimizer.zero_grad()
        output=model.forward(input)
        #print(output)
        loss=F.nll_loss(output,target)
        loss.backward()
        optimizer.step()
        if epoch%2==0:
            print('损失:',loss.item())
            # print(list(model.parameters())[0])
            # print(list(model.parameters())[0].size())
            # print(list(model.parameters())[1])
            # print(list(model.parameters())[1].size())
            # print(list(model.parameters())[2])
            # print(list(model.parameters())[2].size())


def test():
    loss_list=[]
    acc_list=[]
    mode = False
    model.eval()
    test_dataloader = get_dataloader(train=mode,batch=1000)
    for idx, (data, target) in enumerate(test_dataloader):
        with torch.no_grad():
            output=model(data)
            cur_loss=F.nll_loss(output,target)
            loss_list.append(cur_loss)
            pred=output.max(dim=1)[-1]
            cur_acc=pred.eq(target).float().mean()
            acc_list.append(cur_acc)
    print(f"平均准确率:{np.mean(acc_list)}\n平均损失:{np.mean(loss_list)}")

if __name__ == '__main__':
    for i in tqdm(range(1)):
        train(i)
    # torch.save(model.state_dict(),'../data/model/情感分类/model.pt')
    # torch.save(optimizer.state_dict(),'../data/model/情感分类/model_optimizer.pt')
    #test()

文章出处登录后可见!

已经登录?立即刷新

共计人评分,平均

到目前为止还没有投票!成为第一位评论此文章。

(0)
扎眼的阳光的头像扎眼的阳光普通用户
上一篇 2022年5月9日
下一篇 2022年5月9日

相关推荐