|
- # Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- import os
- import parl
- import queue
- import six
- import threading
- import time
- import numpy as np
- from actor import Actor
- from opensim_model import OpenSimModel
- from opensim_agent import OpenSimAgent
- from parl.utils import logger, summary, get_gpu_count
- from parl.utils.window_stat import WindowStat
- from parl.remote.client import get_global_client
- from parl.utils import machine_info
- from shutil import copy2
-
- ACT_DIM = 22
- VEL_DIM = 19
- OBS_DIM = 98 + VEL_DIM
- GAMMA = 0.96
- TAU = 0.001
- ACTOR_LR = 3e-5
- CRITIC_LR = 3e-5
-
-
- class TransitionExperience(object):
- """ A transition of state, or experience"""
-
- def __init__(self, obs, action, reward, info, **kwargs):
- """ kwargs: whatever other attribute you want to save"""
- self.obs = obs
- self.action = action
- self.reward = reward
- self.info = info
- for k, v in six.iteritems(kwargs):
- setattr(self, k, v)
-
-
- class ActorState(object):
- """Maintain incomplete trajectories data of actor."""
-
- def __init__(self):
- self.memory = [] # list of Experience
- self.model_name = None
-
- def reset(self):
- self.memory = []
-
-
- class Evaluator(object):
- def __init__(self, args):
- if machine_info.is_gpu_available():
- assert get_gpu_count() == 1, 'Only support training in single GPU,\
- Please set environment variable: `export CUDA_VISIBLE_DEVICES=[GPU_ID_TO_USE]` .'
-
- else:
- cpu_num = os.environ.get('CPU_NUM')
- assert cpu_num is not None and cpu_num == '1', 'Only support training in single CPU,\
- Please set environment variable: `export CPU_NUM=1`.'
-
- model = OpenSimModel(OBS_DIM, VEL_DIM, ACT_DIM)
- algorithm = parl.algorithms.DDPG(
- model,
- gamma=GAMMA,
- tau=TAU,
- actor_lr=ACTOR_LR,
- critic_lr=CRITIC_LR)
- self.agent = OpenSimAgent(algorithm, OBS_DIM, ACT_DIM)
-
- self.evaluate_result = []
-
- self.lock = threading.Lock()
- self.model_lock = threading.Lock()
- self.model_queue = queue.Queue()
-
- self.best_shaping_reward = 0
- self.best_env_reward = 0
-
- if args.offline_evaluate:
- self.offline_evaluate()
- else:
- t = threading.Thread(target=self.online_evaluate)
- t.start()
-
- with self.lock:
- while True:
- model_path = self.model_queue.get()
- if not args.offline_evaluate:
- # online evaluate
- while not self.model_queue.empty():
- model_path = self.model_queue.get()
- try:
- self.agent.restore(model_path)
- break
- except Exception as e:
- logger.warn("Agent restore Exception: {} ".format(e))
-
- self.cur_model = model_path
-
- self.create_actors()
-
- def create_actors(self):
- """Connect to the cluster and start sampling of the remote actor.
- """
- parl.connect(args.cluster_address, ['official_obs_scaler.npz'])
-
- for i in range(args.actor_num):
- logger.info('Remote actor count: {}'.format(i + 1))
-
- remote_thread = threading.Thread(target=self.run_remote_sample)
- remote_thread.setDaemon(True)
- remote_thread.start()
-
- # There is a memory-leak problem in osim-rl package.
- # So we will dynamically add actors when remote actors killed due to excessive memory usage.
- time.sleep(10 * 60)
- parl_client = get_global_client()
- while True:
- if parl_client.actor_num < args.actor_num:
- logger.info(
- 'Dynamic adding acotr, current actor num:{}'.format(
- parl_client.actor_num))
- remote_thread = threading.Thread(target=self.run_remote_sample)
- remote_thread.setDaemon(True)
- remote_thread.start()
- time.sleep(5)
-
- def offline_evaluate(self):
- ckpt_paths = set([])
- for x in os.listdir(args.saved_models_dir):
- path = os.path.join(args.saved_models_dir, x)
- ckpt_paths.add(path)
- ckpt_paths = list(ckpt_paths)
- steps = [int(x.split('-')[-1]) for x in ckpt_paths]
- sorted_idx = sorted(range(len(steps)), key=lambda k: steps[k])
- ckpt_paths = [ckpt_paths[i] for i in sorted_idx]
- ckpt_paths.reverse()
- logger.info("All checkpoints: {}".format(ckpt_paths))
- for ckpt_path in ckpt_paths:
- self.model_queue.put(ckpt_path)
-
- def online_evaluate(self):
- last_model_step = None
- while True:
- ckpt_paths = set([])
- for x in os.listdir(args.saved_models_dir):
- path = os.path.join(args.saved_models_dir, x)
- ckpt_paths.add(path)
- if len(ckpt_paths) == 0:
- time.sleep(60)
- continue
- ckpt_paths = list(ckpt_paths)
- steps = [int(x.split('-')[-1]) for x in ckpt_paths]
- sorted_idx = sorted(range(len(steps)), key=lambda k: steps[k])
- ckpt_paths = [ckpt_paths[i] for i in sorted_idx]
- model_step = ckpt_paths[-1].split('-')[-1]
- if model_step != last_model_step:
- logger.info("Adding new checkpoint: :{}".format(
- ckpt_paths[-1]))
- self.model_queue.put(ckpt_paths[-1])
- last_model_step = model_step
- time.sleep(60)
-
- def run_remote_sample(self):
- remote_actor = Actor(
- difficulty=args.difficulty,
- vel_penalty_coeff=args.vel_penalty_coeff,
- muscle_penalty_coeff=args.muscle_penalty_coeff,
- penalty_coeff=args.penalty_coeff,
- only_first_target=args.only_first_target)
-
- actor_state = ActorState()
-
- while True:
- actor_state.model_name = self.cur_model
- actor_state.reset()
-
- obs = remote_actor.reset()
-
- while True:
- if actor_state.model_name != self.cur_model:
- break
-
- actor_state.memory.append(
- TransitionExperience(
- obs=obs,
- action=None,
- reward=None,
- info=None,
- timestamp=time.time()))
-
- action = self.pred_batch(obs)
-
- obs, reward, done, info = remote_actor.step(action)
-
- actor_state.memory[-1].reward = reward
- actor_state.memory[-1].info = info
- actor_state.memory[-1].action = action
- if done:
- self._parse_memory(actor_state)
- break
-
- def _parse_memory(self, actor_state):
- mem = actor_state.memory
- n = len(mem)
- episode_shaping_reward = np.sum(
- [exp.info['shaping_reward'] for exp in mem])
- episode_env_reward = np.sum([exp.info['env_reward'] for exp in mem])
-
- with self.lock:
- if actor_state.model_name == self.cur_model:
- self.evaluate_result.append({
- 'shaping_reward':
- episode_shaping_reward,
- 'env_reward':
- episode_env_reward,
- 'episode_length':
- mem[-1].info['frame_count'],
- 'falldown':
- not mem[-1].info['timeout'],
- })
- logger.info('{}, finish_cnt: {}'.format(
- self.cur_model, len(self.evaluate_result)))
- logger.info('{}'.format(self.evaluate_result[-1]))
- if len(self.evaluate_result) >= args.evaluate_times:
- mean_value = {}
- for key in self.evaluate_result[0].keys():
- mean_value[key] = np.mean(
- [x[key] for x in self.evaluate_result])
- logger.info('Model: {}, mean_value: {}'.format(
- self.cur_model, mean_value))
-
- eval_num = len(self.evaluate_result)
- falldown_num = len(
- [x for x in self.evaluate_result if x['falldown']])
- falldown_rate = falldown_num / eval_num
- logger.info('Falldown rate: {}'.format(falldown_rate))
- for key in self.evaluate_result[0].keys():
- mean_value[key] = np.mean([
- x[key] for x in self.evaluate_result
- if not x['falldown']
- ])
- logger.info(
- 'Model: {}, Exclude falldown, mean_value: {}'.format(
- self.cur_model, mean_value))
- if mean_value['shaping_reward'] > self.best_shaping_reward:
- self.best_shaping_reward = mean_value['shaping_reward']
- copy2(self.cur_model, './model_zoo')
- logger.info(
- "[best shaping reward updated:{}] path:{}".format(
- self.best_shaping_reward, self.cur_model))
- if mean_value[
- 'env_reward'] > self.best_env_reward and falldown_rate < 0.3:
- self.best_env_reward = mean_value['env_reward']
- copy2(self.cur_model, './model_zoo')
- logger.info(
- "[best env reward updated:{}] path:{}, falldown rate: {}"
- .format(self.best_env_reward, self.cur_model,
- falldown_num / eval_num))
-
- self.evaluate_result = []
- while True:
- model_path = self.model_queue.get()
- if not args.offline_evaluate:
- # online evaluate
- while not self.model_queue.empty():
- model_path = self.model_queue.get()
- try:
- self.agent.restore(model_path)
- break
- except Exception as e:
- logger.warn(
- "Agent restore Exception: {} ".format(e))
- self.cur_model = model_path
- else:
- actor_state.model_name = self.cur_model
- actor_state.reset()
-
- def pred_batch(self, obs):
- batch_obs = np.expand_dims(obs, axis=0)
- with self.model_lock:
- action = self.agent.predict(batch_obs.astype('float32'))
-
- action = np.squeeze(action, axis=0)
- return action
-
-
- if __name__ == '__main__':
- from evaluate_args import get_args
- args = get_args()
- if args.logdir is not None:
- logger.set_dir(args.logdir)
-
- evaluate = Evaluator(args)
|