强化学习之DQN代码带讲解


本代码取自周博磊强化学习课程https://space.bilibili.com/511221970/channel/detail?cid=105354&ctype=0

源码:https://download.csdn.net/download/tianjuewudi/24541126

此处程序个人感觉过多过乱,应整理出属于自己风格的代码结构,这是编程实现必不可少的环节。

导入包

import gym
from gym import wrappers

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import numpy as np

from IPython.display import clear_output
from matplotlib import pyplot as plt
%matplotlib inline

import random
from timeit import default_timer as timer
from datetime import timedelta
import math
from utils.wrappers import make_atari, wrap_deepmind, wrap_pytorch

from utils.hyperparameters import Config
from agents.BaseAgent import BaseAgent
# 这两行不加会导致Notebook出现内核停止的问题
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

超参数

utils.hypeparameter文件:

import torch
import math


class Config(object):
    def __init__(self):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        #PPO controls
        self.ppo_epoch = 3
        self.num_mini_batch = 32
        self.ppo_clip_param = 0.1

        #a2c controls
        self.num_agents = 8
        self.rollout = 16
        self.value_loss_weight = 0.5
        self.entropy_loss_weight = 0.001
        self.grad_norm_max = 0.5
        self.USE_GAE=True
        self.gae_tau = 0.95

        #algorithm control
        self.USE_NOISY_NETS=False
        self.USE_PRIORITY_REPLAY=False
        
        #Multi-step returns
        self.N_STEPS = 1

        #epsilon variables
        self.epsilon_start = 1.0
        self.epsilon_final = 0.01
        self.epsilon_decay = 30000
        self.epsilon_by_frame = lambda frame_idx: self.epsilon_final + (self.epsilon_start - self.epsilon_final) * math.exp(-1. * frame_idx / self.epsilon_decay)

        #misc agent variables
        self.GAMMA=0.99
        self.LR=1e-4

        #memory
        self.TARGET_NET_UPDATE_FREQ = 1000
        self.EXP_REPLAY_SIZE = 100000
        self.BATCH_SIZE = 32
        self.PRIORITY_ALPHA=0.6
        self.PRIORITY_BETA_START=0.4
        self.PRIORITY_BETA_FRAMES = 100000

        #Noisy Nets
        self.SIGMA_INIT=0.5

        #Learning control variables
        self.LEARN_START = 10000
        self.MAX_FRAMES=100000

        #Categorical Params
        self.ATOMS = 51
        self.V_MAX = 10
        self.V_MIN = -10

        #Quantile Regression Parameters
        self.QUANTILES=51

        #DRQN Parameters
        self.SEQUENCE_LENGTH=8

主文件代码:

# 导入预先设定的参数
config = Config()

config.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# epsilon为探索因子,越小随机探索的概率越低,我们期望在训练初期能给予模型更多的探索机会,找到相对更优的路径,后期稳定更新。
config.epsilon_start = 1.0
config.epsilon_final = 0.01
config.epsilon_decay = 30000
config.epsilon_by_frame = lambda frame_idx: config.epsilon_final + (config.epsilon_start - config.epsilon_final) * math.exp(-1. * frame_idx / config.epsilon_decay)

# 折扣因子以及学习率
config.GAMMA=0.99
config.LR=1e-4

#memory
config.TARGET_NET_UPDATE_FREQ = 1000
config.EXP_REPLAY_SIZE = 100000
config.BATCH_SIZE = 32

#Learning control variables
config.LEARN_START = 10000
# 最多跑多少episode
config.MAX_FRAMES=1000000

经验回放池

class ExperienceReplayMemory:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
    
    # 当数据过多时,删除最早的数据
    def push(self, transition):
        self.memory.append(transition)
        if len(self.memory) > self.capacity:
            del self.memory[0]
	# 在经验池中抽取batch_size个样本
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

神经网络

class DQN(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(DQN, self).__init__()
        
        self.input_shape = input_shape
        self.num_actions = num_actions

        # 注意输入的是一个画面,如果有RGB三层,input_shape[0]=3
        self.conv1 = nn.Conv2d(self.input_shape[0], 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)

        self.fc1 = nn.Linear(self.feature_size(), 512)
        self.fc2 = nn.Linear(512, self.num_actions)
        
    def forward(self, x):
        # 三个卷积层+展开+带ReLU全连接层+不带ReLU全连接层
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x
    
    def feature_size(self):
        return self.conv3(self.conv2(self.conv1(torch.zeros(1, *self.input_shape)))).view(1, -1).size(1)

智能体

BaseAgent文件:

import numpy as np
import pickle
import os.path

import torch
import torch.optim as optim


class BaseAgent(object):
    def __init__(self):
        self.model=None
        self.target_model=None
        self.optimizer = None
        self.losses = []
        self.rewards = []
        self.sigma_parameter_mag=[]

    def huber(self, x):
        cond = (x.abs() < 1.0).float().detach()
        return 0.5 * x.pow(2) * cond + (x.abs() - 0.5) * (1.0 - cond)

    # 保存模型和优化器
    def save_w(self):
        torch.save(self.model.state_dict(), './saved_agents/model.dump')
        torch.save(self.optimizer.state_dict(), './saved_agents/optim.dump')
    # 导入模型和优化器
    def load_w(self):
        fname_model = "./saved_agents/model.dump"
        fname_optim = "./saved_agents/optim.dump"

        if os.path.isfile(fname_model):
            self.model.load_state_dict(torch.load(fname_model))
            self.target_model.load_state_dict(self.model.state_dict())

        if os.path.isfile(fname_optim):
            self.optimizer.load_state_dict(torch.load(fname_optim))

    # 保存经验池
    def save_replay(self):
        pickle.dump(self.memory, open('./saved_agents/exp_replay_agent.dump', 'wb'))

    # 导入经验池
    def load_replay(self):
        fname = './saved_agents/exp_replay_agent.dump'
        if os.path.isfile(fname):
            self.memory = pickle.load(open(fname, 'rb'))

    
    def save_sigma_param_magnitudes(self):
        tmp = []
        for name, param in self.model.named_parameters():
            if param.requires_grad:
                if 'sigma' in name:
                    # 把数据放cpu上,ravel把数组展开成一维,tolist变成列表
                    tmp+=param.data.cpu().numpy().ravel().tolist()
        if tmp:
            计算平均值,然后加入列表中
            self.sigma_parameter_mag.append(np.mean(np.abs(npA.array(tmp))))

    def save_loss(self, loss):
        self.losses.append(loss)

    def save_reward(self, reward):
        self.rewards.append(reward)

主文件代码agent:

class Model(BaseAgent):
    def __init__(self, static_policy=False, env=None, config=None):
        super(Model, self).__init__()
        # 设定参数
        self.device = config.device
        self.gamma = config.GAMMA
        self.lr = config.LR
        self.target_net_update_freq = config.TARGET_NET_UPDATE_FREQ
        self.experience_replay_size = config.EXP_REPLAY_SIZE
        self.batch_size = config.BATCH_SIZE
        self.learn_start = config.LEARN_START

        self.static_policy = static_policy
        self.num_feats = env.observation_space.shape
        self.num_actions = env.action_space.n
        self.env = env

        # 定义两个网络
        self.declare_networks()
            
        self.target_model.load_state_dict(self.model.state_dict())
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
        
        # 迁移网络到GPU
        self.model = self.model.to(self.device)
        self.target_model.to(self.device)

        # 训练还是评估,此处执行的是初始化
        if self.static_policy:
            self.model.eval()
            self.target_model.eval()
        else:
            self.model.train()
            self.target_model.train()

        self.update_count = 0

        # 定义经验池
        self.declare_memory()
        

    def declare_networks(self):
        self.model = DQN(self.num_feats, self.num_actions)
        self.target_model = DQN(self.num_feats, self.num_actions)

    def declare_memory(self):
        self.memory = ExperienceReplayMemory(self.experience_replay_size)

    def append_to_replay(self, s, a, r, s_):
        self.memory.push((s, a, r, s_))

	# 从经验池中抽样并解压得到对应四组数据,并附带non_final_mask是否存在下一状态的batch向量, empty_next_state_values检查下一状态的异常
    def prep_minibatch(self):
        # 从经验池中抽样
        transitions = self.memory.sample(self.batch_size)
        # 把抽样的内容再分成四种数据
        batch_state, batch_action, batch_reward, batch_next_state = zip(*transitions)
		# (-1,1,84,84)
        shape = (-1,)+self.num_feats

        batch_state = torch.tensor(batch_state, device=self.device, dtype=torch.float).view(shape)
        # shape:32*1
        batch_action = torch.tensor(batch_action, device=self.device, dtype=torch.long).squeeze().view(-1, 1)
        batch_reward = torch.tensor(batch_reward, device=self.device, dtype=torch.float).squeeze().view(-1, 1)
        # 是否存在下一个状态shape:32
        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch_next_state)), device=self.device, dtype=torch.uint8)
        try: #sometimes all next states are false
            # 过滤出存在的下一个状态
            non_final_next_states = torch.tensor([s for s in batch_next_state if s is not None], device=self.sdevice, dtype=torch.float).view(shape)
            empty_next_state_values = False
        except:
            non_final_next_states = None
            empty_next_state_values = True

        return batch_state, batch_action, batch_reward, non_final_next_states, non_final_mask, empty_next_state_values

    def compute_loss(self, batch_vars):
        batch_state, batch_action, batch_reward, non_final_next_states, non_final_mask, empty_next_state_values = batch_vars

        #gather第一个参数是对应维度dim,第二个是索引,返回对应索引的值,此处返回的是batch中对应的动作价值Q。
        current_q_values = self.model(batch_state).gather(1, batch_action)
        
        #target
        with torch.no_grad():
            max_next_q_values = torch.zeros(self.batch_size, device=self.device, dtype=torch.float).unsqueeze(dim=1)
            if not empty_next_state_values:
                max_next_action = self.get_max_next_state_action(non_final_next_states)
                # 这个操作可以把下一个状态为空的动作价值直接设置为0,其他不变
                max_next_q_values[non_final_mask] = self.target_model(non_final_next_states).gather(1, max_next_action)
            expected_q_values = batch_reward + (self.gamma*max_next_q_values)

        diff = (expected_q_values - current_q_values)
        loss = self.huber(diff)
        loss = loss.mean()

        return loss

    def update(self, s, a, r, s_, frame=0):
        if self.static_policy:
            return None

        self.append_to_replay(s, a, r, s_)

        if frame < self.learn_start:
            return None

        batch_vars = self.prep_minibatch()

        loss = self.compute_loss(batch_vars)

        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        for param in self.model.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()

        self.update_target_model()
        self.save_loss(loss.item())
        self.save_sigma_param_magnitudes()


    def get_action(self, s, eps=0.1):
        with torch.no_grad():
            if np.random.random() >= eps or self.static_policy:
                X = torch.tensor([s], device=self.device, dtype=torch.float)
                # max(1)对dim=1求最大值,返回两个数,第一个是值,第二个是索引
                a = self.model(X).max(1)[1].view(1, 1)
                return a.item()
            else:
                return np.random.randint(0, self.num_actions)
	# 软更新
    def update_target_model(self):
        self.update_count+=1
        self.update_count = self.update_count % self.target_net_update_freq
        if self.update_count == 0:
            self.target_model.load_state_dict(self.model.state_dict())
	
    def get_max_next_state_action(self, next_states):
        return self.target_model(next_states).max(dim=1)[1].view(-1, 1)

    def huber(self, x):
        cond = (x.abs() < 1.0).to(torch.float)
        return 0.5 * x.pow(2) * cond + (x.abs() - 0.5) * (1 - cond)

绘图plot

# 打印三幅图
def plot(frame_idx, rewards, losses, sigma, elapsed_time):
    clear_output(True)
    plt.figure(figsize=(20,5))
    plt.subplot(131)
    plt.title('frame %s. reward: %s. time: %s' % (frame_idx, np.mean(rewards[-10:]), elapsed_time))
    plt.plot(rewards)
    if losses:
        plt.subplot(132)
        plt.title('loss')
        plt.plot(losses)
    if sigma:
        plt.subplot(133)
        plt.title('noisy param magnitude')
        plt.plot(sigma)
    plt.show()

主程序

start=timer()

env_id = "PongNoFrameskip-v4"
env    = make_atari(env_id)
env    = wrap_deepmind(env, frame_stack=False)
env    = wrap_pytorch(env)
model = Model(env=env, config=config)

episode_reward = 0

observation = env.reset()
for frame_idx in range(1, config.MAX_FRAMES + 1):
    epsilon = config.epsilon_by_frame(frame_idx)

    action = model.get_action(observation, epsilon)
    prev_observation=observation
    observation, reward, done, _ = env.step(action)
    observation = None if done else observation

    model.update(prev_observation, action, reward, observation, frame_idx)
    episode_reward += reward

    if done:
        observation = env.reset()
        model.save_reward(episode_reward)
        episode_reward = 0
        
        if np.mean(model.rewards[-10:]) > 19:
            plot(frame_idx, model.rewards, model.losses, model.sigma_parameter_mag, timedelta(seconds=int(timer()-start)))
            break

    if frame_idx % 10000 == 0:
        plot(frame_idx, model.rewards, model.losses, model.sigma_parameter_mag, timedelta(seconds=int(timer()-start)))

model.save_w()
env.close()

总结

下面是整体的代码架构(可自己灵活添加功能):

# 导入包
# 设定超参数
# ReplayMemory:经验回放池
class ReplayMemory:
    def __init__(self, capacity):
        # 初始化,参数为容器大小,创建列表
    def push(self, transition):
        # 在列表后面新增数据,当数据超过容器大小时,删除最早的数据
    def sample(self, batch_size):
        # 在经验池中抽取batch_size个样本
    def __len__(self):
        # 返回列表长度
# Model:神经网络模型
class Model(nn.Module):
    def __init__(self, input_shape, num_actions):
        # 定义网络结构
        
    def forward(self, x):
        # 构造网络结构,输入状态(游戏画面),最后一层是全连接层
        # 输出节点数为action数量,返回的是action的Tensor
        
# 所有Agent通用基类,定义了通用参数,包括各种保存,导入的函数        
class BaseAgent(object):
   
                
class Agent(BaseAgent):
    def __init__(self, static_policy = False,env = False):
        # TODO:这里设置超参数
        if self.static_policy:
            self.model.eval()
            self.target_model.eval()
        else:
            self.model.train()
            self.target_model.train()
    #从经验池中抽样,并把数据分门别类,做成几个Tensor
    def prep_minibatch(self):
    
    # 输入状态,输出最佳动作
    def Get_Action(self,obs):    
     
    # 预测下一个最佳动作的索引
    def get_max_next_state_action(self,next_states):
	
    # 重点,计算Loss
    def Compute_Loss(self,batch_vars):
        # 用TargetQ网络根据公式计算目标的Q价值,并用Q网络得出现在Q价值
        diff = (expected_q_values - current_q_values) **2
        loss = loss.mean()
        return loss
    
    
# 放置绘图函数
class Plot():
 
# -----------主运行文件--------------
# 负责训练
def run_train_eposide(agent,env):
	# 储存经验池
    # 采样
    # 计算Loss,调用反向传播
    # 保存数据
    
# 负责展示效果
def run_evaluate_eposide(agent,env):

    
def main():
    # 创建环境,实例化各种类,开启训练,记得定期保存
if __name__ == '__main__':
    main()
    

文章作者: 微笑紫瞳星
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 微笑紫瞳星 !
  目录