|
- # -*- coding: utf-8 -*-
- """
- @author: huangxs
- @License: (C)Copyright 2021, huangxs
- @CreateTime: 2021/11/16 16:59:35
- @Filename: han_net
- service api views
- """
- import os
-
- import mindspore.nn as nn
- import mindspore.ops.functional as F
- import mindspore.ops.operations as P
-
- from mindspore import dtype as mstype
- import mindspore.ops as ops
- import numpy as np
- from mindspore import Tensor
- from mindspore.common.initializer import One, Normal
-
- from mindspore import load_checkpoint, load_param_into_net
- from PIL import Image
- import mindspore.dataset.vision.py_transforms as py_vision
-
- from mindspore import context
-
-
- # context.set_context(mode=context.PYNATIVE_MODE, device_target="GPU")
- # context.set_context(mode=context.PYNATIVE_MODE, device_target="Ascend")
-
-
- def ConvLayer(in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1):
- layer = nn.SequentialCell()
- layer.append(nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size,
- stride=stride, pad_mode='pad', padding=padding, dilation=dilation,
- has_bias=False, group=groups))
- layer.append(nn.LeakyReLU())
- layer.append(nn.BatchNorm2d(out_channels))
- return layer
-
-
- # --- different types of layers --- #
- class BasicLayer(nn.SequentialCell):
- def __init__(self, in_channels, growth_rate, drop_rate, dilation=1):
- super(BasicLayer, self).__init__()
- self.conv = ConvLayer(in_channels, growth_rate, kernel_size=3, stride=1, padding=int(dilation),
- dilation=int(dilation))
- self.drop_rate = drop_rate
-
- def construct(self, x):
- out = self.conv(x)
- if self.drop_rate > 0:
- out = nn.Dropout(keep_prob=self.drop_rate)(out)
- return ops.Concat(axis=1)((x, out))
-
-
- def choose_hybrid_dilations(n_layers, dilation_schedule, is_hybrid):
- # key: (dilation, n_layers)
- HD_dict = {(1, 4): [1, 1, 1, 1],
- (2, 4): [1, 2, 3, 2],
- (4, 4): [1, 2, 5, 9],
- (8, 4): [3, 7, 10, 13],
- (16, 4): [13, 15, 17, 19],
- (1, 6): [1, 1, 1, 1, 1, 1],
- (2, 6): [1, 2, 3, 1, 2, 3],
- (4, 6): [1, 2, 3, 5, 6, 7],
- (8, 6): [2, 5, 7, 9, 11, 14],
- (16, 6): [10, 13, 16, 17, 19, 21]}
-
- dilation_list = np.zeros((len(dilation_schedule), n_layers), dtype=np.int32)
-
- for i in range(len(dilation_schedule)):
- dilation = dilation_schedule[i]
- if is_hybrid:
- dilation_list[i] = HD_dict[(dilation, n_layers)]
- else:
- dilation_list[i] = [dilation for k in range(n_layers)]
-
- return dilation_list
-
-
- # --- dense block structure --- #
- # class DenseBlock(nn.SequentialCell):
- # def __init__(self, in_channels, growth_rate, drop_rate, layer_type, dilations):
- # super(DenseBlock, self).__init__()
- # for i in range(len(dilations)):
- # layer = layer_type(in_channels + i * growth_rate, growth_rate, drop_rate, dilations[i])
- # self.insert_child_to_cell('denselayer{:d}'.format(i + 1), layer)
-
- def DenseBlock(in_channels, growth_rate, drop_rate, layer_type, dilations):
- layer = nn.SequentialCell()
- for i in range(len(dilations)):
- sub_layer = layer_type(in_channels + i * growth_rate, growth_rate, drop_rate, dilations[i])
- layer.append(sub_layer)
- return layer
-
-
- class sSE(nn.Cell):
- def __init__(self, in_channels):
- super().__init__()
- self.Conv1x1 = nn.Conv2d(in_channels, 1, kernel_size=1, has_bias=False)
- self.norm = nn.Sigmoid()
-
- def construct(self, U):
- q = self.Conv1x1(U) # U:[bs,c,h,w] to q:[bs,1,h,w]
- q = self.norm(q)
- return U * q # 广播机制
-
-
- class cSE(nn.Cell):
- def __init__(self, in_channels):
- super().__init__()
- # self.avgpool = ops.AdaptiveAvgPool2D(1)
-
- self.avgpool2d_16 = nn.AvgPool2d(kernel_size=16)
- self.avgpool2d_32 = nn.AvgPool2d(kernel_size=32)
- self.avgpool2d_64 = nn.AvgPool2d(kernel_size=64)
- self.avgpool2d_128 = nn.AvgPool2d(kernel_size=128)
- self.avgpool2d_256 = nn.AvgPool2d(kernel_size=256)
-
- # self.avgpool2d_16 = nn.MaxPool2d(kernel_size=16)
- # self.avgpool2d_32 = nn.MaxPool2d(kernel_size=32)
- # self.avgpool2d_64 = nn.MaxPool2d(kernel_size=64)
- # self.avgpool2d_128 = nn.MaxPool2d(kernel_size=128)
- # self.avgpool2d_256 = nn.MaxPool2d(kernel_size=256)
-
- self.Conv_Squeeze = nn.Conv2d(in_channels, in_channels // 2, kernel_size=1, has_bias=False)
- self.Conv_Excitation = nn.Conv2d(in_channels // 2, in_channels, kernel_size=1, has_bias=False)
- self.norm = nn.Sigmoid()
-
- def construct(self, U):
- u_shape = U.shape
- if u_shape[2] == 256:
- z = self.avgpool2d_256(U)
- if u_shape[2] == 128:
- z = self.avgpool2d_128(U)
- if u_shape[2] == 64:
- z = self.avgpool2d_64(U)
- if u_shape[2] == 32:
- z = self.avgpool2d_32(U)
- if u_shape[2] == 16:
- z = self.avgpool2d_16(U)
-
- # z = self.avgpool(U) # shape: [bs, c, h, w] to [bs, c, 1, 1]
- z = self.Conv_Squeeze(z) # shape: [bs, c/2]
- z = self.Conv_Excitation(z) # shape: [bs, c]
- z = self.norm(z)
- return U * z.expand_as(U)
-
-
- # Series attention module SAM 20200311
- class SAM(nn.Cell):
- def __init__(self, in_channels):
- super().__init__()
- self.cSE = cSE(in_channels)
- self.sSE = sSE(in_channels)
-
- def construct(self, U):
- U_cse = self.cSE(U)
- U_sse = self.sSE(U_cse)
- return U_sse
-
- # Parallel attention module PAM = csSE 20200311
-
-
- class PAM(nn.Cell):
- def __init__(self, in_channels):
- super().__init__()
- self.cSE = cSE(in_channels)
- self.sSE = sSE(in_channels)
-
- def construct(self, U):
- U_sse = self.sSE(U)
- U_cse = self.cSE(U)
- return U_cse + U_sse
-
-
- class attetion_conv_block_V3(nn.Cell):
- def __init__(self, in_channels, out_channels):
- super(attetion_conv_block_V3, self).__init__()
-
- self.conv_block1 = nn.SequentialCell(
- nn.Conv2d(in_channels, out_channels, kernel_size=3, pad_mode='pad', padding=1),
- nn.BatchNorm2d(out_channels),
- nn.ReLU(),
- )
-
- self.conv_block2 = nn.SequentialCell(
- nn.Conv2d(in_channels, out_channels, kernel_size=1, pad_mode='pad', padding=0),
- nn.BatchNorm2d(out_channels),
- nn.ReLU(),
- )
-
- self.SAM = SAM(out_channels)
- self.PAM = PAM(out_channels)
-
- def construct(self, x):
- x1 = self.conv_block1(x)
- x1 = self.SAM(x1)
- x2 = self.conv_block2(x)
- x2 = self.PAM(x2)
-
- return (x1 + x2) / 2
-
-
- def resizeScale(x_copy, x):
- diffY = x_copy.shape[2] - x.shape[2]
- diffX = x_copy.shape[3] - x.shape[3]
- if (diffY != 0 or diffX != 0):
- x = ops.Pad(((0, 0), (0, 0), (diffX // 2, diffX - diffX // 2), (diffY // 2, diffY - diffY // 2)))(x)
- return x
-
-
- def debug_view(x):
- return '%s---%.5f---%.5f' % (str(x.shape), float(x.min().asnumpy()), float(x.max().asnumpy()))
-
-
- class HanNet(nn.Cell):
- """
- HanNet
- """
-
- def __init__(self,
- in_channels,
- output_channels=3,
- n_layers=6,
- growth_rate=24,
- compress_ratio=0.5,
- drop_rate=0.1,
- dilations=(1, 2, 4, 8, 16, 8, 4, 2, 1),
- is_hybrid=True,
- layer_type='basic'
- ):
- super(HanNet, self).__init__()
- layer_type = BasicLayer
- dilations = (2, 2, 2, 2, 2)
-
- dilation_list = choose_hybrid_dilations(n_layers, dilations, is_hybrid)
- # print('n_blocks = ', n_blocks)
- # print('dilation_list = ')
- # print(dilation_list)
- channel_n1 = 64
- filters = [channel_n1, channel_n1 * 2, channel_n1 * 4, channel_n1 * 8,
- channel_n1 * 16] # 64, 128, 256, 512, 1024
-
- self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
- # self.up = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
- self.Up = nn.ResizeBilinear() # (x, scale_factor=2, align_corners=True)
-
- # 1st conv before any dense block
- self.conv1 = ConvLayer(in_channels, filters[0], kernel_size=3, padding=1)
- # 第一层的第一次放在这里
- # self.conv0_0 = attetion_conv_block(in_channels, filters[0]) # 3, 64
-
- # channel_n1 = 64
- self.denseblock0 = DenseBlock(channel_n1 * 1, growth_rate, drop_rate, layer_type, dilation_list[3])
- # self.transblock0 = ConvLayer(channel_n1 * 1 + growth_rate * n_layers, channel_n1*2, kernel_size=1, padding=0)
- self.transblock0 = attetion_conv_block_V3(channel_n1 * 1 + growth_rate * n_layers, channel_n1 * 2)
-
- self.denseblock1 = DenseBlock(channel_n1 * 2, growth_rate, drop_rate, layer_type, dilation_list[0])
- # self.transblock1 = ConvLayer(channel_n1 * 2 + growth_rate * n_layers, channel_n1*2, kernel_size=1, padding=0)
- self.transblock1 = attetion_conv_block_V3(channel_n1 * 2 + growth_rate * n_layers, channel_n1 * 2)
-
- self.denseblock2 = DenseBlock(channel_n1 * 2, growth_rate, drop_rate, layer_type, dilation_list[1])
- # self.transblock2 = ConvLayer(channel_n1 * 2 + growth_rate * n_layers, channel_n1*2, kernel_size=1, padding=0)
- self.transblock2 = attetion_conv_block_V3(channel_n1 * 2 + growth_rate * n_layers, channel_n1 * 2)
-
- self.denseblock3 = DenseBlock(channel_n1 * 2, growth_rate, drop_rate, layer_type, dilation_list[2])
- # self.transblock3 = ConvLayer(channel_n1 * 2 + growth_rate * n_layers, channel_n1*2, kernel_size=1, padding=0)
- self.transblock3 = attetion_conv_block_V3(channel_n1 * 2 + growth_rate * n_layers, channel_n1 * 2)
-
- self.denseblock4 = DenseBlock(channel_n1 * 2, growth_rate, drop_rate, layer_type, dilation_list[3])
- # self.transblock4 = ConvLayer(channel_n1 * 2 + growth_rate * n_layers, channel_n1*2, kernel_size=1, padding=0)
- self.transblock4 = attetion_conv_block_V3(channel_n1 * 2 + growth_rate * n_layers, channel_n1 * 2)
-
- # final conv
- # self.conv2 = nn.Conv2d(channel_n1 * 4 + growth_rate * n_layers, output_channels, kernel_size=3, stride=1, padding=1, bias=False)
-
- self.conv1_0 = attetion_conv_block_V3(filters[0] * 2, filters[1]) # 64, 128
- self.conv2_0 = attetion_conv_block_V3(filters[1], filters[2]) # 128,256
- self.conv3_0 = attetion_conv_block_V3(filters[2], filters[3]) # 256,512
- self.conv4_0 = attetion_conv_block_V3(filters[3], filters[4]) # 512,1024
-
- # self.conv0_1 = attetion_conv_block(filters[0] + filters[1], filters[0])
- self.conv1_1 = attetion_conv_block_V3(filters[0] * 2 + filters[1] + filters[2], filters[1])
- self.conv2_1 = attetion_conv_block_V3(filters[1] + filters[2] + filters[3], filters[2])
- self.conv3_1 = attetion_conv_block_V3(filters[2] + filters[3] + filters[4], filters[3])
-
- # self.conv0_2 = attetion_conv_block(filters[0] * 2 + filters[1], filters[0])
- self.conv1_2 = attetion_conv_block_V3(filters[0] * 2 + filters[1] * 2 + filters[2], filters[1])
- self.conv2_2 = attetion_conv_block_V3(filters[1] + filters[2] * 2 + filters[3], filters[2])
-
- # self.conv0_3 = attetion_conv_block(filters[0] * 3 + filters[1], filters[0])
- self.conv1_3 = attetion_conv_block_V3(filters[0] * 2 + filters[1] * 3 + filters[2], filters[1])
-
- # self.conv0_4 = attetion_conv_block(filters[0] * 4 + filters[1], filters[0])
-
- self.final_conv = nn.Conv2d(64 * 2, output_channels, kernel_size=1)
-
- def construct(self, x):
- x0_0 = self.conv1(x) # [4, 64, 208, 208]
- x0_0 = self.denseblock0(x0_0)
- # x0_0 = self.csSE0_0(x0_0)
- x0_0 = self.transblock0(x0_0) # [4, 64, 208, 208]
-
- _x0_0 = self.pool(x0_0)
- x1_0 = self.conv1_0(_x0_0)
-
- _x1_0 = self.Up(x1_0, scale_factor=2, align_corners=True)
- x1_0_up = resizeScale(x0_0, _x1_0)
-
- # x0_1 = self.denseblock1(torch.cat([x0_0, self.Up(x1_0)], 1)) # [4, 64*1+128+144=336, 208, 208]
- x0_1 = self.denseblock1(x0_0 + x1_0_up)
- # x0_1 = self.csSE0_1(x0_1)
- x0_1 = self.transblock1(x0_1) # [4, 64, 208, 208]
-
- x2_0 = self.conv2_0(self.pool(x1_0))
- x2_0_up = resizeScale(x1_0, self.Up(x2_0, scale_factor=2, align_corners=True))
-
- x1_1 = self.conv1_1(P.Concat(axis=1)((self.pool(x0_1), x1_0, x2_0_up)))
- x1_1_up = resizeScale(x0_1, self.Up(x1_1, scale_factor=2, align_corners=True))
- # x0_2 = self.denseblock2(torch.cat([x0_1, self.Up(x1_1)], 1)) # [4, 64*1+128+144=400, 208, 208]
- x0_2 = self.denseblock2(x0_1 + x1_1_up)
- # x0_2 = self.csSE0_2(x0_2)
- x0_2 = self.transblock2(x0_2) # [4, 64, 208, 208]
-
- x3_0 = self.conv3_0(self.pool(x2_0))
- x3_0_up = resizeScale(x2_0, self.Up(x3_0, scale_factor=2, align_corners=True))
-
- x2_1 = self.conv2_1(P.Concat(axis=1)((self.pool(x1_1), x2_0, x3_0_up)))
- x2_1_up = resizeScale(x1_1, self.Up(x2_1, scale_factor=2, align_corners=True))
-
- x1_2 = self.conv1_2(P.Concat(axis=1)((self.pool(x0_2), x1_0, x1_1, x2_1_up)))
- x1_2_up = resizeScale(x0_2, self.Up(x1_2, scale_factor=2, align_corners=True))
- # x0_3 = self.denseblock3(torch.cat([x0_2, self.Up(x1_2)], 1)) # [4, 64*1+128+144=464, 208, 208]
- x0_3 = self.denseblock3(x0_2 + x1_2_up)
- # x0_3 = self.csSE0_3(x0_3)
- x0_3 = self.transblock3(x0_3) # [4, 64, 208, 208]
-
- x4_0 = self.conv4_0(self.pool(x3_0))
- x4_0_up = resizeScale(x3_0, self.Up(x4_0, scale_factor=2, align_corners=True))
-
- x3_1 = self.conv3_1(P.Concat(axis=1)((self.pool(x2_1), x3_0, x4_0_up)))
- x3_1_up = resizeScale(x2_1, self.Up(x3_1, scale_factor=2, align_corners=True))
-
- x2_2 = self.conv2_2(P.Concat(axis=1)((self.pool(x1_2), x2_0, x2_1, x3_1_up)))
- x2_2_up = resizeScale(x1_2, self.Up(x2_2, scale_factor=2, align_corners=True))
-
- x1_3 = self.conv1_3(P.Concat(axis=1)((self.pool(x0_3), x1_0, x1_1, x1_2, x2_2_up)))
- x1_3_up = resizeScale(x0_3, self.Up(x1_3, scale_factor=2, align_corners=True))
- # x0_4 = self.conv0_4(torch.cat([x0_0, x0_1, x0_2, x0_3, self.Up(x1_3)], 1))
- # x0_4 = self.denseblock4(torch.cat([x0_3, self.Up(x1_3)], 1)) # [4, 64*1+128+144=528, 208, 208]
- x0_4 = self.denseblock4(x0_3 + x1_3_up)
- # x0_4 = self.csSE0_4(x0_4)
- x0_4 = self.transblock4(x0_4)
-
- x_final = self.final_conv(x0_4)
-
- return x_final
-
-
- if __name__ == "__main__":
- _image = Image.open(
- '/tmp/pycharm_cdnet_i7/data/MoNuSeg_oridata/images/train_300/TCGA-18-5592-01Z-00-DX1_0.png').convert('RGB')
- _input = []
- _input.append(py_vision.ToTensor()(_image))
- _input.append(py_vision.ToTensor()(_image))
- _input = Tensor(_input, dtype=mstype.float32)
-
- _hannet = HanNet(3, 3, n_layers=6, growth_rate=24)
- _output = _hannet(_input)
-
- print(_output.shape)
|