本代码取自周博磊强化学习课程https://space.bilibili.com/511221970/channel/detail?cid=105354&ctype=0
源码:https://download.csdn.net/download/tianjuewudi/24541126
此处程序个人感觉过多过乱,应整理出属于自己风格的代码结构,这是编程实现必不可少的环节。
导入包
import gym
from gym import wrappers
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from IPython.display import clear_output
from matplotlib import pyplot as plt
%matplotlib inline
import random
from timeit import default_timer as timer
from datetime import timedelta
import math
from utils.wrappers import make_atari, wrap_deepmind, wrap_pytorch
from utils.hyperparameters import Config
from agents.BaseAgent import BaseAgent
# 这两行不加会导致Notebook出现内核停止的问题
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
超参数
utils.hypeparameter文件:
import torch
import math
class Config(object):
def __init__(self):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#PPO controls
self.ppo_epoch = 3
self.num_mini_batch = 32
self.ppo_clip_param = 0.1
#a2c controls
self.num_agents = 8
self.rollout = 16
self.value_loss_weight = 0.5
self.entropy_loss_weight = 0.001
self.grad_norm_max = 0.5
self.USE_GAE=True
self.gae_tau = 0.95
#algorithm control
self.USE_NOISY_NETS=False
self.USE_PRIORITY_REPLAY=False
#Multi-step returns
self.N_STEPS = 1
#epsilon variables
self.epsilon_start = 1.0
self.epsilon_final = 0.01
self.epsilon_decay = 30000
self.epsilon_by_frame = lambda frame_idx: self.epsilon_final + (self.epsilon_start - self.epsilon_final) * math.exp(-1. * frame_idx / self.epsilon_decay)
#misc agent variables
self.GAMMA=0.99
self.LR=1e-4
#memory
self.TARGET_NET_UPDATE_FREQ = 1000
self.EXP_REPLAY_SIZE = 100000
self.BATCH_SIZE = 32
self.PRIORITY_ALPHA=0.6
self.PRIORITY_BETA_START=0.4
self.PRIORITY_BETA_FRAMES = 100000
#Noisy Nets
self.SIGMA_INIT=0.5
#Learning control variables
self.LEARN_START = 10000
self.MAX_FRAMES=100000
#Categorical Params
self.ATOMS = 51
self.V_MAX = 10
self.V_MIN = -10
#Quantile Regression Parameters
self.QUANTILES=51
#DRQN Parameters
self.SEQUENCE_LENGTH=8
主文件代码:
# 导入预先设定的参数
config = Config()
config.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# epsilon为探索因子,越小随机探索的概率越低,我们期望在训练初期能给予模型更多的探索机会,找到相对更优的路径,后期稳定更新。
config.epsilon_start = 1.0
config.epsilon_final = 0.01
config.epsilon_decay = 30000
config.epsilon_by_frame = lambda frame_idx: config.epsilon_final + (config.epsilon_start - config.epsilon_final) * math.exp(-1. * frame_idx / config.epsilon_decay)
# 折扣因子以及学习率
config.GAMMA=0.99
config.LR=1e-4
#memory
config.TARGET_NET_UPDATE_FREQ = 1000
config.EXP_REPLAY_SIZE = 100000
config.BATCH_SIZE = 32
#Learning control variables
config.LEARN_START = 10000
# 最多跑多少episode
config.MAX_FRAMES=1000000
经验回放池
class ExperienceReplayMemory:
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
# 当数据过多时,删除最早的数据
def push(self, transition):
self.memory.append(transition)
if len(self.memory) > self.capacity:
del self.memory[0]
# 在经验池中抽取batch_size个样本
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
神经网络
class DQN(nn.Module):
def __init__(self, input_shape, num_actions):
super(DQN, self).__init__()
self.input_shape = input_shape
self.num_actions = num_actions
# 注意输入的是一个画面,如果有RGB三层,input_shape[0]=3
self.conv1 = nn.Conv2d(self.input_shape[0], 32, kernel_size=8, stride=4)
self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
self.fc1 = nn.Linear(self.feature_size(), 512)
self.fc2 = nn.Linear(512, self.num_actions)
def forward(self, x):
# 三个卷积层+展开+带ReLU全连接层+不带ReLU全连接层
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = F.relu(self.conv3(x))
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
def feature_size(self):
return self.conv3(self.conv2(self.conv1(torch.zeros(1, *self.input_shape)))).view(1, -1).size(1)
智能体
BaseAgent文件:
import numpy as np
import pickle
import os.path
import torch
import torch.optim as optim
class BaseAgent(object):
def __init__(self):
self.model=None
self.target_model=None
self.optimizer = None
self.losses = []
self.rewards = []
self.sigma_parameter_mag=[]
def huber(self, x):
cond = (x.abs() < 1.0).float().detach()
return 0.5 * x.pow(2) * cond + (x.abs() - 0.5) * (1.0 - cond)
# 保存模型和优化器
def save_w(self):
torch.save(self.model.state_dict(), './saved_agents/model.dump')
torch.save(self.optimizer.state_dict(), './saved_agents/optim.dump')
# 导入模型和优化器
def load_w(self):
fname_model = "./saved_agents/model.dump"
fname_optim = "./saved_agents/optim.dump"
if os.path.isfile(fname_model):
self.model.load_state_dict(torch.load(fname_model))
self.target_model.load_state_dict(self.model.state_dict())
if os.path.isfile(fname_optim):
self.optimizer.load_state_dict(torch.load(fname_optim))
# 保存经验池
def save_replay(self):
pickle.dump(self.memory, open('./saved_agents/exp_replay_agent.dump', 'wb'))
# 导入经验池
def load_replay(self):
fname = './saved_agents/exp_replay_agent.dump'
if os.path.isfile(fname):
self.memory = pickle.load(open(fname, 'rb'))
def save_sigma_param_magnitudes(self):
tmp = []
for name, param in self.model.named_parameters():
if param.requires_grad:
if 'sigma' in name:
# 把数据放cpu上,ravel把数组展开成一维,tolist变成列表
tmp+=param.data.cpu().numpy().ravel().tolist()
if tmp:
计算平均值,然后加入列表中
self.sigma_parameter_mag.append(np.mean(np.abs(npA.array(tmp))))
def save_loss(self, loss):
self.losses.append(loss)
def save_reward(self, reward):
self.rewards.append(reward)
主文件代码agent:
class Model(BaseAgent):
def __init__(self, static_policy=False, env=None, config=None):
super(Model, self).__init__()
# 设定参数
self.device = config.device
self.gamma = config.GAMMA
self.lr = config.LR
self.target_net_update_freq = config.TARGET_NET_UPDATE_FREQ
self.experience_replay_size = config.EXP_REPLAY_SIZE
self.batch_size = config.BATCH_SIZE
self.learn_start = config.LEARN_START
self.static_policy = static_policy
self.num_feats = env.observation_space.shape
self.num_actions = env.action_space.n
self.env = env
# 定义两个网络
self.declare_networks()
self.target_model.load_state_dict(self.model.state_dict())
self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
# 迁移网络到GPU
self.model = self.model.to(self.device)
self.target_model.to(self.device)
# 训练还是评估,此处执行的是初始化
if self.static_policy:
self.model.eval()
self.target_model.eval()
else:
self.model.train()
self.target_model.train()
self.update_count = 0
# 定义经验池
self.declare_memory()
def declare_networks(self):
self.model = DQN(self.num_feats, self.num_actions)
self.target_model = DQN(self.num_feats, self.num_actions)
def declare_memory(self):
self.memory = ExperienceReplayMemory(self.experience_replay_size)
def append_to_replay(self, s, a, r, s_):
self.memory.push((s, a, r, s_))
# 从经验池中抽样并解压得到对应四组数据,并附带non_final_mask是否存在下一状态的batch向量, empty_next_state_values检查下一状态的异常
def prep_minibatch(self):
# 从经验池中抽样
transitions = self.memory.sample(self.batch_size)
# 把抽样的内容再分成四种数据
batch_state, batch_action, batch_reward, batch_next_state = zip(*transitions)
# (-1,1,84,84)
shape = (-1,)+self.num_feats
batch_state = torch.tensor(batch_state, device=self.device, dtype=torch.float).view(shape)
# shape:32*1
batch_action = torch.tensor(batch_action, device=self.device, dtype=torch.long).squeeze().view(-1, 1)
batch_reward = torch.tensor(batch_reward, device=self.device, dtype=torch.float).squeeze().view(-1, 1)
# 是否存在下一个状态shape:32
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch_next_state)), device=self.device, dtype=torch.uint8)
try: #sometimes all next states are false
# 过滤出存在的下一个状态
non_final_next_states = torch.tensor([s for s in batch_next_state if s is not None], device=self.sdevice, dtype=torch.float).view(shape)
empty_next_state_values = False
except:
non_final_next_states = None
empty_next_state_values = True
return batch_state, batch_action, batch_reward, non_final_next_states, non_final_mask, empty_next_state_values
def compute_loss(self, batch_vars):
batch_state, batch_action, batch_reward, non_final_next_states, non_final_mask, empty_next_state_values = batch_vars
#gather第一个参数是对应维度dim,第二个是索引,返回对应索引的值,此处返回的是batch中对应的动作价值Q。
current_q_values = self.model(batch_state).gather(1, batch_action)
#target
with torch.no_grad():
max_next_q_values = torch.zeros(self.batch_size, device=self.device, dtype=torch.float).unsqueeze(dim=1)
if not empty_next_state_values:
max_next_action = self.get_max_next_state_action(non_final_next_states)
# 这个操作可以把下一个状态为空的动作价值直接设置为0,其他不变
max_next_q_values[non_final_mask] = self.target_model(non_final_next_states).gather(1, max_next_action)
expected_q_values = batch_reward + (self.gamma*max_next_q_values)
diff = (expected_q_values - current_q_values)
loss = self.huber(diff)
loss = loss.mean()
return loss
def update(self, s, a, r, s_, frame=0):
if self.static_policy:
return None
self.append_to_replay(s, a, r, s_)
if frame < self.learn_start:
return None
batch_vars = self.prep_minibatch()
loss = self.compute_loss(batch_vars)
# Optimize the model
self.optimizer.zero_grad()
loss.backward()
for param in self.model.parameters():
param.grad.data.clamp_(-1, 1)
self.optimizer.step()
self.update_target_model()
self.save_loss(loss.item())
self.save_sigma_param_magnitudes()
def get_action(self, s, eps=0.1):
with torch.no_grad():
if np.random.random() >= eps or self.static_policy:
X = torch.tensor([s], device=self.device, dtype=torch.float)
# max(1)对dim=1求最大值,返回两个数,第一个是值,第二个是索引
a = self.model(X).max(1)[1].view(1, 1)
return a.item()
else:
return np.random.randint(0, self.num_actions)
# 软更新
def update_target_model(self):
self.update_count+=1
self.update_count = self.update_count % self.target_net_update_freq
if self.update_count == 0:
self.target_model.load_state_dict(self.model.state_dict())
def get_max_next_state_action(self, next_states):
return self.target_model(next_states).max(dim=1)[1].view(-1, 1)
def huber(self, x):
cond = (x.abs() < 1.0).to(torch.float)
return 0.5 * x.pow(2) * cond + (x.abs() - 0.5) * (1 - cond)
绘图plot
# 打印三幅图
def plot(frame_idx, rewards, losses, sigma, elapsed_time):
clear_output(True)
plt.figure(figsize=(20,5))
plt.subplot(131)
plt.title('frame %s. reward: %s. time: %s' % (frame_idx, np.mean(rewards[-10:]), elapsed_time))
plt.plot(rewards)
if losses:
plt.subplot(132)
plt.title('loss')
plt.plot(losses)
if sigma:
plt.subplot(133)
plt.title('noisy param magnitude')
plt.plot(sigma)
plt.show()
主程序
start=timer()
env_id = "PongNoFrameskip-v4"
env = make_atari(env_id)
env = wrap_deepmind(env, frame_stack=False)
env = wrap_pytorch(env)
model = Model(env=env, config=config)
episode_reward = 0
observation = env.reset()
for frame_idx in range(1, config.MAX_FRAMES + 1):
epsilon = config.epsilon_by_frame(frame_idx)
action = model.get_action(observation, epsilon)
prev_observation=observation
observation, reward, done, _ = env.step(action)
observation = None if done else observation
model.update(prev_observation, action, reward, observation, frame_idx)
episode_reward += reward
if done:
observation = env.reset()
model.save_reward(episode_reward)
episode_reward = 0
if np.mean(model.rewards[-10:]) > 19:
plot(frame_idx, model.rewards, model.losses, model.sigma_parameter_mag, timedelta(seconds=int(timer()-start)))
break
if frame_idx % 10000 == 0:
plot(frame_idx, model.rewards, model.losses, model.sigma_parameter_mag, timedelta(seconds=int(timer()-start)))
model.save_w()
env.close()
总结
下面是整体的代码架构(可自己灵活添加功能):
# 导入包
# 设定超参数
# ReplayMemory:经验回放池
class ReplayMemory:
def __init__(self, capacity):
# 初始化,参数为容器大小,创建列表
def push(self, transition):
# 在列表后面新增数据,当数据超过容器大小时,删除最早的数据
def sample(self, batch_size):
# 在经验池中抽取batch_size个样本
def __len__(self):
# 返回列表长度
# Model:神经网络模型
class Model(nn.Module):
def __init__(self, input_shape, num_actions):
# 定义网络结构
def forward(self, x):
# 构造网络结构,输入状态(游戏画面),最后一层是全连接层
# 输出节点数为action数量,返回的是action的Tensor
# 所有Agent通用基类,定义了通用参数,包括各种保存,导入的函数
class BaseAgent(object):
class Agent(BaseAgent):
def __init__(self, static_policy = False,env = False):
# TODO:这里设置超参数
if self.static_policy:
self.model.eval()
self.target_model.eval()
else:
self.model.train()
self.target_model.train()
#从经验池中抽样,并把数据分门别类,做成几个Tensor
def prep_minibatch(self):
# 输入状态,输出最佳动作
def Get_Action(self,obs):
# 预测下一个最佳动作的索引
def get_max_next_state_action(self,next_states):
# 重点,计算Loss
def Compute_Loss(self,batch_vars):
# 用TargetQ网络根据公式计算目标的Q价值,并用Q网络得出现在Q价值
diff = (expected_q_values - current_q_values) **2
loss = loss.mean()
return loss
# 放置绘图函数
class Plot():
# -----------主运行文件--------------
# 负责训练
def run_train_eposide(agent,env):
# 储存经验池
# 采样
# 计算Loss,调用反向传播
# 保存数据
# 负责展示效果
def run_evaluate_eposide(agent,env):
def main():
# 创建环境,实例化各种类,开启训练,记得定期保存
if __name__ == '__main__':
main()