在上一篇文章ML-Agents与python的Low Level API通信中,我简要介绍了Python与Unity端的ML-Agents插件的通讯代码,如何正确运行一个能够进行强化学习训练的Unity环境,并获取到响应的信息,接下来将介绍如何利用自己的强化学习算法进行训练。
介绍
这里我利用了强化学习库PARL来训练ML-Agents的3DBall,使用的是PPO算法。
关于PPO的具体代码细节可以参考我以前的文章强化学习PPO代码讲解,这里不再讲述PPO的代码细节(之所以选择PARL,是因为感觉其代码通俗易懂)
PARL主要将代码分为了几个部分,首先是Model脚本,主要用来编写Actor,Critic等神经网络模型。然后是Algorithm脚本,主要编写具体的算法细节,主要有sample,predict,learn函数。还有storage脚本,主要用来存放经验池(reply buffer)。还有Config脚本,存放训练使用的超参数。Agent脚本,用来对Algorithm脚本进行进一步封装,是与环境交互的接口。最后才是训练入口脚本,调用agent脚本和环境进行交互。
主要源码分析
对于PPO算法,我们可以将其分为两个阶段。第一个是收集数据阶段,一个是训练模型阶段。
和SAC,DDPG等off-policy算法类型,PPO也有经验池,但是PPO是on-policy算法,所以收集数据和训练不能同时进行,每一次训练过后,我们都需要把旧的数据丢弃,重新用训练后的模型采集训练数据。
因此,大致流程是这样的:
- 所有智能体采集n个step的数据,存放到经验池中。
- 采集完成后,计算各个step的advantage,logprob等数据,同样存放起来。
- 利用经验池的数据进行m次PPO的更新
- 清空经验池数据,重新采样
from mlagents_envs.environment import UnityEnvironment
import numpy as np
from mlagents_envs.environment import ActionTuple
import argparse
import numpy as np
from parl.utils import logger, summary
from storage import RolloutStorage
from parl.algorithms import PPO
from agent import PPOAgent
from genenal_model import GenenalModel_Continuous_Divide
from genenal_config import genenal_config_continuous
from mlagents_envs.side_channel.engine_configuration_channel import EngineConfigurationChannel
# 创建环境
channel = EngineConfigurationChannel()
env = UnityEnvironment(file_name="UnityEnvironment", seed=1, side_channels=[channel])
channel.set_configuration_parameters(time_scale = 3.0)
env.reset()
# 获取环境信息
behavior_names = list(env.behavior_specs.keys())
behavior_value = list(env.behavior_specs.values())
for i in range(len(behavior_names)):
print(behavior_names[i])
print("obs:",behavior_value[i].observation_specs[0], " act:", behavior_value[0].action_spec)
discrete_actions = None
total_steps = 0
stepsNum = 0
obs_space = behavior_value[i].observation_specs[0]
act_space = behavior_value[i].action_spec.continuous_size
# 建立Actor Critic模型
model = GenenalModel_Continuous_Divide(obs_space, act_space, [256,128], [256,128])
config = genenal_config_continuous
config['batch_size'] = int(config['env_num'] * config['step_nums'])
config['num_updates'] = int(
config['train_total_steps'] // config['batch_size'])
# 建立PPO算法
ppo = PPO(
model,
clip_param=config['clip_param'],
entropy_coef=config['entropy_coef'],
initial_lr=config['initial_lr'],
continuous_action=config['continuous_action'])
agent = PPOAgent(ppo, config)
# 建立经验池
rollout = RolloutStorage(config['step_nums'], config['env_num'], obs_space,
act_space)
DecisionSteps, TerminalSteps = env.get_steps(behavior_names[0])
obs = DecisionSteps.obs[0]
agentsNum = len(DecisionSteps)
done = np.zeros(agentsNum, dtype='float32')
total_reward = np.zeros(agentsNum, dtype='float32')
this_action = np.zeros((agentsNum, act_space), dtype='float32')
next_obs = np.zeros((agentsNum, obs_space.shape[0]), dtype='float32')
for update in range(1, config['num_updates'] + 1):
# 数据收集
for step in range(0, config['step_nums']):
value, action, logprob, _ = agent.sample(obs)
agentsNumNow = len(DecisionSteps)
if agentsNumNow == 0:
action = np.random.rand(0, 2)
else:
action = action.reshape(agentsNumNow, act_space)
this_action = action
actions = ActionTuple(action, discrete_actions)
env.set_actions(behavior_names[0], actions)
env.step()
DecisionSteps, TerminalSteps = env.get_steps(behavior_names[0])
next_obs_Decision = DecisionSteps.obs[0]
next_obs_Terminal = TerminalSteps.obs[0]
if(len(next_obs_Terminal) != 0):
next_obs = np.zeros((agentsNum, obs_space.shape[-1]))
rewards = np.zeros(agentsNum, dtype=float)
next_done = np.zeros(agentsNum, dtype=bool)
j = 0
for i in TerminalSteps.agent_id:
next_obs[i] = next_obs_Terminal[j]
rewards[i] = TerminalSteps.reward[j]
next_done[i] = True
j += 1
rollout.append(obs, this_action, logprob, rewards, done, value.flatten())
obs, done = next_obs, next_done
total_reward += rewards
if(len(next_obs_Decision) != 0):
step += 1
next_obs = next_obs_Decision
rewards = DecisionSteps.reward
next_done = np.zeros(agentsNum, dtype=bool)
rollout.append(obs, this_action, logprob, rewards, done, value.flatten())
obs, done = next_obs, next_done
total_reward += rewards
total_steps += 1
stepsNum += 1
if(stepsNum % 200 == 199):
arv_reward = total_reward / 200
print("total_steps:{0}".format(total_steps))
print("arv_reward:", arv_reward)
stepsNum = 0
total_reward = 0
# PPO训练模型
value = agent.value(obs)
rollout.compute_returns(value, done)
value_loss, action_loss, entropy_loss, lr = agent.learn(rollout)
env.close()