|
- import time
- import torch
- import logging
- from sklearn import metrics
- from torch import nn
- import numpy as np
-
- def load_weight(model, weight_path):
- checkpoint = torch.load(weight_path)
- model.load_state_dict(checkpoint['state_dict'])
- return model
-
-
- def train_epoch_pre(summary, summary_writer, args, model, loss_fn,
- optimizer, dataloader_tumor, dataloader_normal, text):
- '''
- :param summary: 数据记录
- :param summary_writer: 数据写入tensorboardX
- :param args: 超参数
- :param model: 模型
- :param loss_fn: 损失函数
- :param optimizer: 优化器
- :param dataloader_tumor: tumor数据加载器
- :param dataloader_normal: normal数据加载器
- :param text: 文本向量
- :return: summary记录的数据
- '''
- steps = len(dataloader_tumor)
- dataiter_tumor = iter(dataloader_tumor)
- dataiter_normal = iter(dataloader_normal)
-
- # 切换训练模式
- model.train()
- # 记录时间
- time_now = time.time()
- # 训练
- for step in range(steps):
- # tumor和normal各获取一批
- tumor_img, tumor_label = next(dataiter_tumor)
- normal_img, normal_label = next(dataiter_normal)
- tumor_img, tumor_label = tumor_img.to(args.device), tumor_label.to(args.device)
- normal_img, normal_label = normal_img.to(args.device), normal_label.to(args.device)
- # 拼接
- imgs = torch.concat([tumor_img, normal_img])
- labels = torch.concat([tumor_label, normal_label])
- # 送入模型
- if args.clip:
- img_feats, text_feats = model(imgs, text)
- # text向量和tumor向量分为一组,normal向量分为一组
- tumor_feats = torch.concat([text_feats, img_feats[:args.batch_size]])
- normal_feats = img_feats[args.batch_size:]
- else:
- img_feats = model(imgs)
- # tumor向量分为一组,normal向量分为一组
- tumor_feats = img_feats[:args.batch_size]
- normal_feats = img_feats[args.batch_size:]
- # 计算损失
- loss1 = loss_fn[0](normal_feats)
- loss2 = loss_fn[1](normal_feats, tumor_feats)
- loss3 = loss_fn[2](tumor_feats)
- loss = loss1 + loss2 + loss3
- # 损失回传,优化
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- # 计算一步所需时间
- time_spent = time.time() - time_now
- time_now = time.time()
-
- # 记录日志
- logging.info(
- '{}, Epoch: {}, Step: {}, Training Loss: {:.4f}+{:.4f}+{:.4f}={:.4f}, Run Time: {:.2f}'
- .format(
- time.strftime("%Y-%m-%d %H:%M:%S"), summary['epoch']+1,
- summary['step']+1, loss1, loss2, loss3, loss, time_spent)
- )
-
- summary['step'] += 1
- # 记录训练损失和训练精度到summary_writer
- if summary['step'] % 100 == 0:
- summary_writer.add_scalar('train/loss1', loss1.item(), summary['step'])
- summary_writer.add_scalar('train/loss2', loss2.item(), summary['step'])
- summary_writer.add_scalar('train/loss3', loss3.item(), summary['step'])
- summary_writer.add_scalar('train/loss', loss.item(), summary['step'])
-
- summary['epoch'] += 1
-
- return summary
-
-
- def valid_epoch_pre(summary, args, model, loss_fn, dataloader_tumor,
- dataloader_normal, text):
- '''
- :param summary: 数据记录
- :param args: 超参数
- :param model: 模型
- :param loss_fn: 损失函数
- :param dataloader_tumor: tumor数据加载器
- :param dataloader_normal: normal数据加载器
- :param text: 文本向量
- :param threshold: 判断阈值
- :return: summary记录的数据
- '''
- steps = len(dataloader_tumor)
- dataiter_tumor = iter(dataloader_tumor)
- dataiter_normal = iter(dataloader_normal)
- # 切换到验证模式
- model.eval()
- with torch.no_grad():
- predicts_all = []
- label_all = []
- for step in range(steps):
- # tumor和normal各获取一批
- tumor_img, tumor_label = next(dataiter_tumor)
- normal_img, normal_label = next(dataiter_normal)
- tumor_img, tumor_label = tumor_img.to(args.device), tumor_label.to(args.device)
- normal_img, normal_label = normal_img.to(args.device), normal_label.to(args.device)
- # 拼接
- imgs = torch.concat([tumor_img, normal_img])
- labels = torch.concat([tumor_label, normal_label])
- # 送入模型
- img_feats, text_feats = model(imgs, text)
-
- # cosine similarity as logits
- logits_per_image = img_feats @ text_feats.t()
- logits_per_text = logits_per_image.t()
- # 求概率
- probs = torch.sigmoid(logits_per_image)
- # 预测
- predicts = (probs >= 0.5)
- predicts_all += predicts.cpu().numpy().tolist()
- label_all += labels.cpu().numpy().tolist()
-
- # 准确率
- val_acc = metrics.accuracy_score(label_all, predicts_all)
-
- summary['val_acc'] = val_acc
-
- return summary
-
-
- def test_pre(summary, args, model, loss_fn, dataloader_tumor,
- dataloader_normal, text):
- '''
- :param summary: 数据记录
- :param args: 超参数
- :param model: 模型
- :param loss_fn: 损失函数
- :param dataloader_tumor: tumor数据加载器
- :param dataloader_normal: normal数据加载器
- :param text: 文本向量
- :param threshold: 判断阈值
- :return: summary记录的数据
- '''
- steps = len(dataloader_tumor)
- dataiter_tumor = iter(dataloader_tumor)
- dataiter_normal = iter(dataloader_normal)
- # 切换到验证模式
- model.eval()
- with torch.no_grad():
- predicts_all = []
- label_all = []
- for step in range(steps):
- # tumor和normal各获取一批
- tumor_img, tumor_label = next(dataiter_tumor)
- normal_img, normal_label = next(dataiter_normal)
- tumor_img, tumor_label = tumor_img.to(args.device), tumor_label.to(args.device)
- normal_img, normal_label = normal_img.to(args.device), normal_label.to(args.device)
- # 拼接
- imgs = torch.concat([tumor_img, normal_img])
- labels = torch.concat([tumor_label, normal_label])
- # 送入模型
- img_feats, text_feats = model(imgs, text)
-
- # cosine similarity as logits
- logits_per_image = img_feats @ text_feats.t()
- logits_per_text = logits_per_image.t()
- # 求概率
- probs = torch.sigmoid(logits_per_image)
- # 预测
- predicts = (probs >= 0.5)
- predicts_all += predicts.cpu().numpy().tolist()
- label_all += labels.cpu().numpy().tolist()
-
- # 混淆矩阵
- confusion_matrix = metrics.confusion_matrix(label_all, predicts_all)
- test_acc = metrics.accuracy_score(label_all, predicts_all)
- test_f1 = metrics.f1_score(label_all, predicts_all)
- test_auc = metrics.roc_auc_score(label_all, predicts_all)
- test_precision = metrics.precision_score(label_all, predicts_all)
- test_recall = metrics.recall_score(label_all, predicts_all)
- TP = confusion_matrix[1][1]
- TN = confusion_matrix[0][0]
- FP = confusion_matrix[1][0]
- FN = confusion_matrix[0][1]
-
- summary['test_acc'] = test_acc
- summary['test_f1'] = test_f1
- summary['test_auc'] = test_auc
- summary['test_precision'] = test_precision
- summary['test_recall'] = test_recall
- summary['test_confusion_matrix'] = 'TP:{}, TN:{}, FP:{}, FN:{}'.format(TP, TN, FP, FN)
-
- return summary
-
-
- def train_epoch(summary, summary_writer, args, model, loss_fn,
- optimizer, dataloader_tumor, dataloader_normal, text):
- '''
- :param summary: 数据记录
- :param summary_writer: 数据写入tensorboardX
- :param args: 超参数
- :param model: 模型
- :param loss_fn: 损失函数
- :param optimizer: 优化器
- :param dataloader_tumor: tumor数据加载器
- :param dataloader_normal: normal数据加载器
- :param text: 文本向量
- :return: summary记录的数据
- '''
- steps = len(dataloader_tumor)
- dataiter_tumor = iter(dataloader_tumor)
- dataiter_normal = iter(dataloader_normal)
-
- # 切换训练模式
- model.train()
- # 记录时间
- time_now = time.time()
- # 训练
- for step in range(steps):
- # tumor和normal各获取一批
- tumor_img, tumor_label = next(dataiter_tumor)
- normal_img, normal_label = next(dataiter_normal)
- tumor_img, tumor_label = tumor_img.to(args.device), tumor_label.to(args.device)
- normal_img, normal_label = normal_img.to(args.device), normal_label.to(args.device)
- # 拼接
- imgs = torch.concat([tumor_img, normal_img])
- labels = torch.concat([tumor_label, normal_label])
- # 送入模型
- if args.clip:
- outputs = model(imgs, text)
- else:
- outputs = model(imgs)
- # 计算损失
- loss = loss_fn(outputs, labels)
- # 损失回传,优化
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- # 求概率
- probs = outputs.sigmoid()
- # 预测
- _, predicts = probs.max(1)
- # 记录训练精度和损失
- acc_data = (predicts == labels).sum().item() * 1.0 / (args.batch_size*2)
- loss_data = loss.item()
-
- # 计算一步所需时间
- time_spent = time.time() - time_now
- time_now = time.time()
-
- # 记录日志
- logging.info(
- '{}, Epoch: {}, Step: {}, Training Loss: {:.5f}, Training Acc: {:.3f}, Run Time: {:.2f}'
- .format(
- time.strftime("%Y-%m-%d %H:%M:%S"), summary['epoch'] + 1,
- summary['step'] + 1, loss_data, acc_data, time_spent)
- )
-
- summary['step'] += 1
- # 记录训练损失和训练精度到summary_writer
- if summary['step'] % 100 == 0:
- summary_writer.add_scalar('train/loss', loss_data, summary['step'])
- summary_writer.add_scalar('train/acc', acc_data, summary['step'])
-
- summary['epoch'] += 1
-
- return summary
-
-
- def valid_epoch(summary, args, model, loss_fn, dataloader_tumor,
- dataloader_normal, text):
- '''
- :param summary: 数据记录
- :param args: 超参数
- :param model: 模型
- :param loss_fn: 损失函数
- :param dataloader_tumor: tumor数据加载器
- :param dataloader_normal: normal数据加载器
- :param text: 文本向量
- :param threshold: 判断阈值
- :return: summary记录的数据
- '''
- steps = len(dataloader_tumor)
- dataiter_tumor = iter(dataloader_tumor)
- dataiter_normal = iter(dataloader_normal)
- # 切换到验证模式
- model.eval()
- with torch.no_grad():
- loss_sum = 0
- acc_sum = 0
- for step in range(steps):
- # tumor和normal各获取一批
- tumor_img, tumor_label = next(dataiter_tumor)
- normal_img, normal_label = next(dataiter_normal)
- tumor_img, tumor_label = tumor_img.to(args.device), tumor_label.to(args.device)
- normal_img, normal_label = normal_img.to(args.device), normal_label.to(args.device)
- # 拼接
- imgs = torch.concat([tumor_img, normal_img])
- labels = torch.concat([tumor_label, normal_label])
- # 送入模型
- if args.clip:
- outputs = model(imgs, text)
- else:
- outputs = model(imgs)
- # 计算损失
- loss = loss_fn(outputs, labels)
- # 求概率
- probs = outputs.sigmoid()
- # 预测
- _, predicts = probs.max(1)
- # 记录验证精度和损失
- acc_data = (predicts == labels).sum().item() * 1.0 / (args.batch_size*2)
- loss_data = loss.item()
-
- loss_sum += loss_data
- acc_sum += acc_data
-
- summary['val_loss'] = loss_sum / (step + 1)
- summary['val_acc'] = acc_sum / (step + 1)
-
- return summary
-
-
- def test(summary, args, model, loss_fn, dataloader_tumor,
- dataloader_normal, text):
- '''
- :param summary: 数据记录
- :param args: 超参数
- :param model: 模型
- :param loss_fn: 损失函数
- :param dataloader_tumor: tumor数据加载器
- :param dataloader_normal: normal数据加载器
- :param text: 文本向量
- :param threshold: 判断阈值
- :return: summary记录的数据
- '''
- steps = len(dataloader_tumor)
- dataiter_tumor = iter(dataloader_tumor)
- dataiter_normal = iter(dataloader_normal)
- # 切换到验证模式
- model.eval()
- with torch.no_grad():
- predicts_all = []
- label_all = []
- for step in range(steps):
- # tumor和normal各获取一批
- tumor_img, tumor_label = next(dataiter_tumor)
- normal_img, normal_label = next(dataiter_normal)
- tumor_img, tumor_label = tumor_img.to(args.device), tumor_label.to(args.device)
- normal_img, normal_label = normal_img.to(args.device), normal_label.to(args.device)
- # 拼接
- imgs = torch.concat([tumor_img, normal_img])
- labels = torch.concat([tumor_label, normal_label])
- # 送入模型
- if args.clip:
- outputs = model(imgs, text)
- else:
- outputs = model(imgs)
- # 计算损失
- loss = loss_fn(outputs, labels)
- # 求概率
- probs = outputs.sigmoid()
- # 预测
- _, predicts = probs.max(1)
- predicts_all += predicts.cpu().numpy().tolist()
- label_all += labels.cpu().numpy().tolist()
-
- # 混淆矩阵
- confusion_matrix = metrics.confusion_matrix(label_all, predicts_all)
- test_acc = metrics.accuracy_score(label_all, predicts_all)
- test_f1 = metrics.f1_score(label_all, predicts_all)
- test_auc = metrics.roc_auc_score(label_all, predicts_all)
- test_precision = metrics.precision_score(label_all, predicts_all)
- test_recall = metrics.recall_score(label_all, predicts_all)
- TP = confusion_matrix[1][1]
- TN = confusion_matrix[0][0]
- FP = confusion_matrix[1][0]
- FN = confusion_matrix[0][1]
-
- summary['test_acc'] = test_acc
- summary['test_f1'] = test_f1
- summary['test_auc'] = test_auc
- summary['test_precision'] = test_precision
- summary['test_recall'] = test_recall
- summary['test_confusion_matrix'] = 'TP:{}, TN:{}, FP:{}, FN:{}'.format(TP, TN, FP, FN)
-
- return summary
|