|
- import os
- import torch
- import torch.nn as nn
- from sklearn.preprocessing import StandardScaler, MinMaxScaler
- from sklearn.metrics import classification_report
- from torch.utils.data import Dataset, TensorDataset
- from tqdm import tqdm
-
- from dataset import TimeSeriesDataset, data2classes, data2features
- from models.resnet import resnet101, resnet50, resnet18
-
-
- def data2str(used_data:list):
- # 使用列表推导式提取每个字符串的第六个字符
- sixth_chars = [x[5] for x in used_data]
- # 使用 join() 方法将字符列表连接成一个长字符串,彼此以 '_' 隔开
- merged_string = '_'.join(sixth_chars)
- return merged_string
-
- class Config():
- root_dir = r"D:\1\2024phm\Preliminary stage\Data_Pre Stage\Training data"
- used_data = ["data_motor.csv","data_gearbox.csv",
- "data_leftaxlebox.csv","data_rightaxlebox.csv"]
- seq_length = 4096 # 时间步长,就是利用多少时间窗口 1024
- img_size = 112
- batch_size = 32 # 批次大小 1024
- feature_size = data2features(used_data) # 每个步长对应的特征数量 21 =6 + 9 + 3 +
- # hidden_size = 384 # 隐层大小 128
- _, num_classes = data2classes(used_data)
- epochs = 100 # 迭代轮数
- best_accuracy = 0.00 # 记录准确率
- learning_rate = 0.0001 # 学习率
- weight_decay = 0.00001 # 学习率
- model_name = 'resnet' # 模型名称
- dir = data2str(used_data)
- save_dir = './checkpoint/{}_sample1'.format(dir) # 最优模型保存路径
- if not os.path.exists(save_dir):
- os.makedirs(save_dir)
-
-
- if __name__ == '__main__':
- config = Config()
-
- # Create dataset
- train_data = TimeSeriesDataset(config.root_dir, config.seq_length,
- img_size=config.img_size, used_data=config.used_data, mode="train")
- test_data = TimeSeriesDataset(config.root_dir, config.seq_length,
- img_size=config.img_size, used_data=config.used_data, mode="valid")
-
- # 将数据加载成迭代器
- train_loader = torch.utils.data.DataLoader(train_data,
- config.batch_size,
- True)
-
- test_loader = torch.utils.data.DataLoader(test_data,
- config.batch_size,
- True)
-
- model = resnet18(in_channels=config.feature_size*3, num_classes=config.num_classes).cuda()
-
- loss_function = nn.CrossEntropyLoss() # 定义损失函数
- # optimizer = torch.optim.SGD(model.parameters(), lr=config.learning_rate)
- optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate,weight_decay=config.weight_decay)
- # 模型训练
- for epoch in range(config.epochs):
- model.train()
- running_loss = 0
- train_bar = tqdm(train_loader) # 形成进度条
- for data in train_bar:
- x_train, y_train = data # 解包迭代器中的X和Y
- y_train_pred = model(x_train.cuda()).softmax(dim=1)
- loss = loss_function(y_train_pred, y_train.cuda())
-
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- # 打印参数梯度
- # for name, param in model.named_parameters():
- # print(f'Parameter: {name}, Gradient: {param.grad}')
-
- running_loss += loss.item()
- train_bar.desc = "train epoch[{}/{}] loss:{:.6f}".format(epoch + 1,
- config.epochs,
- loss)
-
- # 在验证集上评估模型性能
- model.eval()
- correct = 0
- total = 0
- with torch.no_grad():
- for inputs, labels in test_loader:
- outputs = model(inputs.cuda())
- _, predicted = torch.max(outputs, 1)
- _, labels = torch.max(labels, 1)
- total += labels.size(0)
- # correct += (predicted == torch.max(labels, 1)).sum().item()
- correct += (predicted.cpu().numpy() == labels.cpu().numpy()).sum() # Convert to numpy arrays
-
- accuracy = 100 * correct / total
-
- print("Epoch [{}/{}], Validation Accuracy: {:.2f}%".format(epoch + 1,
- config.epochs, accuracy))
- if accuracy > config.best_accuracy:
- config.best_accuracy = accuracy
- save_path = '{}/epoch{}_best_{}.pth'.format(config.save_dir,epoch,accuracy) # 最优模型保存路径
- torch.save(model.state_dict(), save_path)
-
- print('Finished Training')
|