|
- # Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- import unittest
-
- import numpy as np
- from op_test import OpTest
-
- import paddle
- from paddle import base
-
-
- class TestMultiplexOp(OpTest):
- def setUp(self):
- self.op_type = "multiplex"
- self.init_dtype()
- self.python_api = paddle.tensor.multiplex
- rows = 4
- index = np.arange(0, rows).astype('int32')
- np.random.shuffle(index)
- index = np.reshape(index, (rows, 1))
- ins1 = np.random.random((rows, 25)).astype(self.dtype)
- ins2 = np.random.random((rows, 25)).astype(self.dtype)
- ins3 = np.random.random((rows, 25)).astype(self.dtype)
- ins4 = np.random.random((rows, 25)).astype(self.dtype)
- if self.dtype == 'complex64' or self.dtype == 'complex128':
- ins1 = (
- np.random.random((rows, 25)) + 1j * np.random.random((rows, 25))
- ).astype(self.dtype)
- ins2 = (
- np.random.random((rows, 25)) + 1j * np.random.random((rows, 25))
- ).astype(self.dtype)
- ins3 = (
- np.random.random((rows, 25)) + 1j * np.random.random((rows, 25))
- ).astype(self.dtype)
- ins4 = (
- np.random.random((rows, 25)) + 1j * np.random.random((rows, 25))
- ).astype(self.dtype)
-
- self.inputs = {
- 'Ids': index,
- 'X': [('x1', ins1), ('x2', ins2), ('x3', ins3), ('x4', ins4)],
- }
- # multiplex output
- output = np.zeros_like(ins1)
- for i in range(0, rows):
- k = index[i][0]
- output[i] = self.inputs['X'][k][1][i]
- self.outputs = {'Out': output}
-
- def init_dtype(self):
- self.dtype = 'float64'
-
- def test_check_output(self):
- self.check_output(check_pir=True)
-
- def test_check_grad(self):
- self.check_grad(['x1', 'x2', 'x3', 'x4'], 'Out', check_pir=True)
-
- def test_check_grad_ignore_x1(self):
- self.check_grad(
- ['x2', 'x3', 'x4'], 'Out', no_grad_set=set('x1'), check_pir=True
- )
-
- def test_check_grad_ignore_x1_x2(self):
- self.check_grad(
- ['x3', 'x4'], 'Out', no_grad_set={'x1', 'x2'}, check_pir=True
- )
-
- def test_check_grad_ignore_x3(self):
- self.check_grad(
- ['x1', 'x2', 'x4'], 'Out', no_grad_set=set('x3'), check_pir=True
- )
-
-
- class TestMultiplexOp_complex64(TestMultiplexOp):
- def init_dtype(self):
- self.dtype = "complex64"
-
-
- class TestMultiplexOp_complex128(TestMultiplexOp):
- def init_dtype(self):
- self.dtype = "complex128"
-
-
- class TestMultiplexOpError(unittest.TestCase):
- def test_errors(self):
- with base.program_guard(base.Program(), base.Program()):
- x1 = paddle.static.data(name='x1', shape=[None, 2], dtype='int64')
- x2 = paddle.static.data(name='x2', shape=[None, 2], dtype='int64')
- index = paddle.static.data(
- name='index', shape=[None, 1], dtype='int32'
- )
-
- def test_list():
- # the inputs type must be list
- paddle.multiplex(inputs=x1, index=index)
-
- self.assertRaises(TypeError, test_list)
-
- def test_len():
- paddle.multiplex(inputs=[x1], index=index)
-
- self.assertRaises(ValueError, test_len)
-
- def test_type():
- y1 = paddle.static.data(
- name='y1', shape=[None, 2], dtype='int16'
- )
- y2 = paddle.static.data(
- name='y2', shape=[None, 2], dtype='int16'
- )
- paddle.multiplex(inputs=[y1, y2], index=index)
-
- self.assertRaises(TypeError, test_type)
-
- def test_type2():
- index2 = paddle.static.data(
- name='index2', shape=[None, 1], dtype='int16'
- )
- paddle.multiplex(inputs=[x1, x2], index=index2)
-
- self.assertRaises(TypeError, test_type2)
-
-
- class TestMultiplexODygrap(unittest.TestCase):
- def setUp(self):
- self.init_dtype()
- self.img1 = np.array([[1, 2], [3, 4]]).astype(self.dtype)
- self.img2 = np.array([[5, 6], [7, 8]]).astype(self.dtype)
- if self.dtype == np.complex64 or self.dtype == np.complex128:
- self.img1 = (
- np.array([[1, 2], [3, 4]]) + 1j * np.array([[1, 2], [3, 4]])
- ).astype(self.dtype)
- self.img2 = (
- np.array([[5, 6], [7, 8]]) + 1j * np.array([[1, 2], [3, 4]])
- ).astype(self.dtype)
-
- def init_dtype(self):
- self.dtype = np.float32
-
- def test_multiplex_dygraph(self):
- paddle.disable_static()
- inputs = [paddle.to_tensor(self.img1), paddle.to_tensor(self.img2)]
- index = paddle.to_tensor(np.array([[1], [0]]).astype(np.int32))
- res = paddle.multiplex(inputs, index)
- paddle.enable_static()
-
- def test_dygraph_api(self):
- with base.dygraph.guard():
- inputs = [paddle.to_tensor(self.img1), paddle.to_tensor(self.img2)]
- index = paddle.to_tensor(np.array([[1], [0]]).astype(np.int32))
- inputs[0].stop_gradient = False
- inputs[1].stop_gradient = False
- res = paddle.multiplex(inputs, index)
- res.backward()
- inputs_eager = [
- paddle.to_tensor(self.img1),
- paddle.to_tensor(self.img2),
- ]
- index_eager = paddle.to_tensor(
- np.array([[1], [0]]).astype(np.int32)
- )
- inputs_eager[0].stop_gradient = False
- inputs_eager[1].stop_gradient = False
- res_eager = paddle.multiplex(inputs_eager, index_eager)
- res_eager.backward()
- self.assertEqual((res.numpy() == res_eager.numpy()).all(), True)
- self.assertEqual(
- (inputs[0].grad.numpy() == inputs_eager[0].grad.numpy()).all(),
- True,
- )
- self.assertEqual(
- (inputs[1].grad.numpy() == inputs_eager[1].grad.numpy()).all(),
- True,
- )
-
-
- class TestMultiplexODygrap_complex64(TestMultiplexODygrap):
- def init_dtype(self):
- self.dtype = np.complex64
-
-
- class TestMultiplexODygrap_complex128(TestMultiplexODygrap):
- def init_dtype(self):
- self.dtype = np.complex128
-
-
- if __name__ == '__main__':
- unittest.main()
|