|
- from typing import Type, Union, List, Optional
- from mindspore import nn, train
- from mindspore.common.initializer import initializer as init
- import mindspore as ms
- import numpy as np
- from mindspore.ops import operations
-
-
-
- class MultiTaskIQANet(nn.Cell):
- """
- Hyper network for learning perceptual rules.
-
- Args:
- lda_out_channels: local distortion aware module output size.
- hyper_in_channels: input feature channels for hyper network.
- target_in_size: input vector size for target network.
- target_fc(i)_size: fully connection layer size of target network.
- feature_size: input feature map width/height for hyper network.
-
- Note:
- For size match, input args must satisfy: 'target_fc(i)_size * target_fc(i+1)_size' is divisible by 'feature_size ^ 2'.
-
- """
- def __init__(self, lda_out_channels: int, target_in_size: int, target_fc1_size: int, target_fc4_size: int):
- super(MultiTaskIQANet, self).__init__()
-
- self.target_in_size = target_in_size
- self.f1 = target_fc1_size
- self.f4 = target_fc4_size
- self.num_cls = 11
-
- self.res = resnet50(lda_out_channels, target_in_size)
-
- self.fcs = nn.SequentialCell([
- nn.Dense(self.target_in_size, self.f1),
- nn.LeakyReLU(), # nn.Sigmoid(),
- nn.Dense(self.f1, self.f4),
- nn.LeakyReLU()] # nn.Sigmoid(),
- )
-
- # self.pool = nn.AdaptiveAvgPool2d((1, 1))
- # self.Softmax = nn.Softmax(dim=1)
-
- self.fc_cls = nn.SequentialCell([
- nn.Dense(self.f4, self.f4),
- nn.LeakyReLU(),
- nn.Dense(self.f4, self.num_cls)]
- )
- self.fc_mos = nn.Dense(self.f4, 1)
-
-
- def construct(self, x):
- res_out = self.res(x)
- target_in_vec = res_out.view(-1, self.target_in_size)
- fcs_out = self.fcs(target_in_vec)
-
-
- mos = self.fc_mos(fcs_out)
- # out['cls'] = self.Softmax(self.fc_cls(fcs_out))
- cls = self.fc_cls(fcs_out)
-
- return mos, cls
-
-
- class ResidualBlockBase(nn.Cell):
- expansion: int = 1 # 最后一个卷积核数量与第一个卷积核数量相等
-
- def __init__(self, in_channel: int, out_channel: int,
- stride: int = 1, norm: Optional[nn.Cell] = None,
- down_sample: Optional[nn.Cell] = None) -> None:
- super(ResidualBlockBase, self).__init__()
- if not norm:
- self.norm = nn.BatchNorm2d(out_channel)
- else:
- self.norm = norm
-
- self.conv1 = nn.Conv2d(in_channel, out_channel,
- kernel_size=3, stride=stride)
- self.conv2 = nn.Conv2d(in_channel, out_channel,
- kernel_size=3)
- self.relu = nn.ReLU()
- self.down_sample = down_sample
-
- def construct(self, x):
- """ResidualBlockBase construct."""
- identity = x # shortcuts分支
-
- out = self.conv1(x) # 主分支第一层:3*3卷积层
- out = self.norm(out)
- out = self.relu(out)
- out = self.conv2(out) # 主分支第二层:3*3卷积层
- out = self.norm(out)
-
- if self.down_sample is not None:
- identity = self.down_sample(x)
- out += identity # 输出为主分支与shortcuts之和
- out = self.relu(out)
-
- return out
-
- class ResidualBlock(nn.Cell):
- expansion = 4 # 最后一个卷积核的数量是第一个卷积核数量的4倍
-
- def __init__(self, in_channel: int, out_channel: int,
- stride: int = 1, down_sample: Optional[nn.Cell] = None) -> None:
- super(ResidualBlock, self).__init__()
-
- self.conv1 = nn.Conv2d(in_channel, out_channel,
- kernel_size=1)
- self.norm1 = nn.BatchNorm2d(out_channel)
- self.conv2 = nn.Conv2d(out_channel, out_channel,
- kernel_size=3, stride=stride)
- self.norm2 = nn.BatchNorm2d(out_channel)
- self.conv3 = nn.Conv2d(out_channel, out_channel * self.expansion,
- kernel_size=1)
- self.norm3 = nn.BatchNorm2d(out_channel * self.expansion)
-
- self.relu = nn.ReLU()
- self.down_sample = down_sample
-
- def construct(self, x):
-
- identity = x # shortscuts分支
-
- out = self.conv1(x) # 主分支第一层:1*1卷积层
- out = self.norm1(out)
- out = self.relu(out)
- out = self.conv2(out) # 主分支第二层:3*3卷积层
- out = self.norm2(out)
- out = self.relu(out)
- out = self.conv3(out) # 主分支第三层:1*1卷积层
- out = self.norm3(out)
-
- if self.down_sample is not None:
- identity = self.down_sample(x)
-
- out += identity # 输出为主分支与shortcuts之和
- out = self.relu(out)
-
- return out
-
- def make_layer(last_out_channel, block: Type[Union[ResidualBlockBase, ResidualBlock]],
- channel: int, block_nums: int, stride: int = 1):
- down_sample = None # shortcuts分支
-
-
- if stride != 1 or last_out_channel != channel * block.expansion:
-
- down_sample = nn.SequentialCell([
- nn.Conv2d(last_out_channel, channel * block.expansion,
- kernel_size=1, stride=stride),
- nn.BatchNorm2d(channel * block.expansion)
- ])
-
- layers = []
- layers.append(block(last_out_channel, channel, stride=stride, down_sample=down_sample))
-
- in_channel = channel * block.expansion
- # 堆叠残差网络
- for _ in range(1, block_nums):
-
- layers.append(block(in_channel, channel))
-
- return nn.SequentialCell(layers)
-
- from mindspore import load_checkpoint, load_param_into_net
-
-
- class ResNet(nn.Cell):
- def __init__(self, block: Type[Union[ResidualBlockBase, ResidualBlock]],
- layer_nums: List[int], lda_out_channels: int, input_channel: int) -> None:
- super(ResNet, self).__init__()
-
- self.relu = nn.ReLU()
- # 第一个卷积层,输入channel为3(彩色图像),输出channel为64
- self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2)
- self.norm = nn.BatchNorm2d(64)
- # 最大池化层,缩小图片的尺寸
- self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
- # 各个残差网络结构块定义,
- self.layer1 = make_layer(64, block, 64, layer_nums[0])
- self.layer2 = make_layer(64 * block.expansion, block, 128, layer_nums[1], stride=2)
- self.layer3 = make_layer(128 * block.expansion, block, 256, layer_nums[2], stride=2)
- self.layer4 = make_layer(256 * block.expansion, block, 512, layer_nums[3], stride=2)
- # 平均池化层
- self.avg_pool = nn.AvgPool2d()
- # flattern层
- self.flatten = nn.Flatten()
-
-
- # local distortion aware module
- self.lda1_pool = nn.SequentialCell([
- nn.Conv2d(256, 16, kernel_size=1, stride=1),
- nn.AvgPool2d(kernel_size=7, stride=7)
- ])
- self.lda1_fc = nn.Dense(16 * 17 * 17, lda_out_channels)
-
- self.lda2_pool = nn.SequentialCell([
- nn.Conv2d(512, 32, kernel_size=1, stride=1),
- nn.AvgPool2d(kernel_size=7, stride=7)
- ])
- self.lda2_fc = nn.Dense(32 * 8 * 8, lda_out_channels)
-
- self.lda3_pool = nn.SequentialCell([
- nn.Conv2d(1024, 64, kernel_size=1, stride=1),
- nn.AvgPool2d(kernel_size=7, stride=7)
- ])
- self.lda3_fc = nn.Dense(64 * 4 * 4, lda_out_channels)
-
- self.lda4_pool = nn.AvgPool2d(kernel_size=7, stride=7)
- self.lda4_fc = nn.Dense(8192, input_channel - lda_out_channels * 3)
-
- def construct(self, x):
-
- x = self.conv1(x)
- x = self.norm(x)
- x = self.relu(x)
- x = self.max_pool(x)
- x = self.layer1(x)
-
- temp = self.lda1_pool(x)
- temp = self.lda1_pool(x).view(x.shape[0], -1)
- lda_1 = self.lda1_fc(self.lda1_pool(x).view(x.shape[0], -1))
- x = self.layer2(x)
- lda_2 = self.lda2_fc(self.lda2_pool(x).view(x.shape[0], -1))
- x = self.layer3(x)
- lda_3 = self.lda3_fc(self.lda3_pool(x).view(x.shape[0], -1))
- x = self.layer4(x)
- x = self.lda4_pool(x).view(x.shape[0], -1)
- lda_4 = self.lda4_fc(x)
-
- op = operations.Concat(axis=1)
- vec = op((lda_1, lda_2, lda_3, lda_4))
-
- return vec
-
-
- def _resnet(block: Type[Union[ResidualBlockBase, ResidualBlock]],
- layers: List[int], lda_out_channels: int,
- input_channel: int):
- model = ResNet(block, layers, lda_out_channels, input_channel)
- param_dict = load_checkpoint('/cache/data/resnet50/resnet50_224_new.ckpt')
- load_param_into_net(model, param_dict)
-
-
- return model
-
-
- def resnet50(lda_out_channels: int, input_channel: int):
- "ResNet50模型"
- return _resnet(ResidualBlock, [3, 4, 6, 3], lda_out_channels, input_channel)
|