DDPG「建议收藏」

DDPG「建议收藏」内容都是百度AIstudio的内容,我只是在这里做个笔记,不是原创。把一位同学的课程笔记转载在自己博客里吧,加上自己的一些理解并解释代码,万一人家的博客被删了呢,我是站在别人的肩膀上继续前进。以下是链接https://aistudio.baidu.com/aistudio/education/group/info/1335https://blog.csdn.net/qq_44635194/article/details/106812096ddpg就是用来处理连续动作空间的,比如无人驾驶,.

大家好,欢迎来到IT知识分享网。

内容都是百度AIstudio的内容,我只是在这里做个笔记,不是原创。

把一位同学的课程笔记转载在自己博客里吧,加上自己的一些理解并解释代码,万一人家的博客被删了呢,我是站在别人的肩膀上继续前进。以下是链接

https://aistudio.baidu.com/aistudio/education/group/info/1335

https://blog.csdn.net/qq_44635194/article/details/106812096
 

同样,重要的概念放在前面:

DDPG「建议收藏」

 

Policy网络是根据状态obs输出动作action

Value网络是根据状态obs和动作action输出Q值(收益值

 

上面这个例子,有一条经验exp(obs=1,action=real_action,reward,next_obs=2)

Learn过程如下:

  1. 根据当前状态obs=1,调用policy网络生成一个动作action,再把obs=1和action这2个参数给value网络,让优化器优化出产生最大Q值的网络结构。相当于在已知obs=1的情况下,调整参数使得这个action可以获得更大的Q。这个是有道理的,policy产生动作action,然后obs和action放到value网络中去产生Q值,policy产生动作action的目的就是希望value网络给个好分数,所以就是去优化policy网络去迎合value网络嘛,value网络就是学习环境的规律,根据环境状态obs和policy网络给出的action打分,所以policy网络去学习迎合value网路没问题。value学习环境的规律,policy学习value网络的规律
  2. 根据next_obs通过policy网络得出next_action,然后通过value网络得到next_Q,,再根据公式把未来收益考虑进来得到(obs,real_action)的   target_Q = reward + (1.0 – terminal) * self.gamma * next_Q
  3. 然后再优化Q和target_Q就可以了。

DDPG「建议收藏」

一共4个网络的,同步也是在agent的learn方法里同步参数的

DDPG「建议收藏」

ddpg就是用来处理连续动作空间的,比如无人驾驶,旋转多少度,力度多少等等。推小车不再简单地左推右推,我们还要考虑力度大小

DDPG「建议收藏」

在这里插入图片描述

 连续动作可以通过缩放求得

在这里插入图片描述

 DQN的目标是选取动作以达到使Q最大化的目的,DDPG多了一个策略网络用来输出动作值,此动作以让Q最大化为目标。因此此策略网络的loss=-Q,即最小化-Q,相当于最大化Q值

在这里插入图片描述

 深色的两个网络用来稳定真实值,浅色的网络用于计算Q值不断逼近真实值。

在这里插入图片描述

 先看项目结构:

DDPG「建议收藏」

 agent.py是定义智能体。predict方法就是你给一个状态obs,输出一个动作。learn就是学习的过程。

DDPG「建议收藏」

import numpy as np
import parl
from parl import layers
from paddle import fluid

class QuadrotorAgent(parl.Agent):
    def __init__(self, algorithm, obs_dim, act_dim=4):
        assert isinstance(obs_dim, int)
        assert isinstance(act_dim, int)
        self.obs_dim = obs_dim
        self.act_dim = act_dim
        super(QuadrotorAgent, self).__init__(algorithm)

        # 注意,在最开始的时候,先完全同步target_model和model的参数
        self.alg.sync_target(decay=0)

    def build_program(self):
        self.pred_program = fluid.Program()
        self.learn_program = fluid.Program()

        with fluid.program_guard(self.pred_program):
            obs = layers.data(
                name='obs', shape=[self.obs_dim], dtype='float32')
            self.pred_act = self.alg.predict(obs)

        with fluid.program_guard(self.learn_program):
            obs = layers.data(
                name='obs', shape=[self.obs_dim], dtype='float32')
            act = layers.data(
                name='act', shape=[self.act_dim], dtype='float32')
            reward = layers.data(name='reward', shape=[], dtype='float32')
            next_obs = layers.data(
                name='next_obs', shape=[self.obs_dim], dtype='float32')
            terminal = layers.data(name='terminal', shape=[], dtype='bool')
            _, self.critic_cost = self.alg.learn(obs, act, reward, next_obs,
                                                 terminal)

    def predict(self, obs):
        obs = np.expand_dims(obs, axis=0)
        act = self.fluid_executor.run(
            self.pred_program, feed={'obs': obs},
            fetch_list=[self.pred_act])[0]
        return act

    def learn(self, obs, act, reward, next_obs, terminal):
        feed = {
            'obs': obs,
            'act': act,
            'reward': reward,
            'next_obs': next_obs,
            'terminal': terminal
        }
        critic_cost = self.fluid_executor.run(
            self.learn_program, feed=feed, fetch_list=[self.critic_cost])[0]
        self.alg.sync_target()
        return critic_cost

IT知识分享网

algorithm.py实现DDPG算法的部分。_actor_learn就是策略网络的学习,首先给定obs输出一个动作act,然后把当前状态obs和act放进Q网络,再让优化器去学习怎么输出更大Q(代码里是平均值)。_critic_learn和DQN的Q网络一样,原先网络没考虑未来的收益,通过这个_critic_learn要把未来收益考虑进来,就是把当前收益值逼近真实的收益(考虑未来收益)。sync_target是更新target网络参数,也是在agent的learn里调用

DDPG「建议收藏」

IT知识分享网import parl
from parl import layers
from copy import deepcopy
from paddle import fluid


class DDPG(parl.Algorithm):
    def __init__(self,
                 model,
                 gamma=None,
                 tau=None,
                 actor_lr=None,
                 critic_lr=None):
        """  DDPG algorithm
        
        Args:
            model (parl.Model): actor and critic 的前向网络.
                                model 必须实现 get_actor_params() 方法.
            gamma (float): reward的衰减因子.
            tau (float): self.target_model 跟 self.model 同步参数 的 软更新参数
            actor_lr (float): actor 的学习率
            critic_lr (float): critic 的学习率
        """
        assert isinstance(gamma, float)
        assert isinstance(tau, float)
        assert isinstance(actor_lr, float)
        assert isinstance(critic_lr, float)
        self.gamma = gamma
        self.tau = tau
        self.actor_lr = actor_lr
        self.critic_lr = critic_lr

        self.model = model
        self.target_model = deepcopy(model)

    def predict(self, obs):
        """ 使用 self.model 的 actor model 来预测动作
        """
        return self.model.policy(obs)

    def learn(self, obs, action, reward, next_obs, terminal):
        """ 用DDPG算法更新 actor 和 critic
        """
        actor_cost = self._actor_learn(obs)
        critic_cost = self._critic_learn(obs, action, reward, next_obs,
                                         terminal)
        return actor_cost, critic_cost

    def _actor_learn(self, obs):
        action = self.model.policy(obs)
        Q = self.model.value(obs, action)
        cost = layers.reduce_mean(-1.0 * Q)
        optimizer = fluid.optimizer.AdamOptimizer(self.actor_lr)
        optimizer.minimize(cost, parameter_list=self.model.get_actor_params())
        return cost

    def _critic_learn(self, obs, action, reward, next_obs, terminal):
        next_action = self.target_model.policy(next_obs)
        next_Q = self.target_model.value(next_obs, next_action)

        terminal = layers.cast(terminal, dtype='float32')
        target_Q = reward + (1.0 - terminal) * self.gamma * next_Q
        target_Q.stop_gradient = True

        Q = self.model.value(obs, action)
        cost = layers.square_error_cost(Q, target_Q)
        cost = layers.reduce_mean(cost)
        optimizer = fluid.optimizer.AdamOptimizer(self.critic_lr)
        optimizer.minimize(cost)
        return cost

    def sync_target(self, decay=None, share_vars_parallel_executor=None):
        """ self.target_model从self.model复制参数过来,若decay不为None,则是软更新
        """
        if decay is None:
            decay = 1.0 - self.tau
        self.model.sync_weights_to(
            self.target_model,
            decay=decay,
            share_vars_parallel_executor=share_vars_parallel_executor)

 model.py里定义了网络的结构,class ActorModel(parl.Model)是policy网络结构,CriticModel是Q网络的神经网络结构。

DDPG「建议收藏」

 

import paddle.fluid as fluid
import parl
from parl import layers



class ActorModel(parl.Model):
    def __init__(self, act_dim):
        hid_size = 100

        self.fc1 = layers.fc(size=hid_size, act='relu')
        self.fc2 = layers.fc(size=act_dim, act='tanh')

    def policy(self, obs):
        hid = self.fc1(obs)
        means = self.fc2(hid)
        return means


class CriticModel(parl.Model):
    def __init__(self):
        hid_size = 100

        self.fc1 = layers.fc(size=hid_size, act='relu')
        self.fc2 = layers.fc(size=1, act=None)

    def value(self, obs, act):
        concat = layers.concat([obs, act], axis=1)
        hid = self.fc1(concat)
        Q = self.fc2(hid)
        Q = layers.squeeze(Q, axes=[1])
        return Q

class QuadrotorModel(parl.Model):
    def __init__(self, act_dim):
        self.actor_model = ActorModel(act_dim)
        self.critic_model = CriticModel()

    def policy(self, obs):
        return self.actor_model.policy(obs)

    def value(self, obs, act):
        return self.critic_model.value(obs, act)

    def get_actor_params(self):
        return self.actor_model.parameters()

 train.py就是程序入口了,run_episode就跑一个周期,在探索,经验池数量够了,就每走1个step就learn一次。evaluate评估模型效果。

IT知识分享网import gym
import numpy as np
import parl
from parl.utils import logger
from parl.utils import action_mapping

from agent import QuadrotorAgent
from model import QuadrotorModel
from algorithm import DDPG  # from parl.algorithms import DDPG
from parl.utils import ReplayMemory # 经验回放
from rlschool import make_env  # 使用 RLSchool 创建飞行器环境

ACTOR_LR = 1e-3  # Actor网络的 learning rate
CRITIC_LR = 1e-3  # Critic网络的 learning rate
GAMMA = 0.99  # reward 的衰减因子
TAU = 0.001  # 软更新的系数
MEMORY_SIZE = int(1e6)  # 经验池大小
MEMORY_WARMUP_SIZE = MEMORY_SIZE // 20  # 预存一部分经验之后再开始训练
BATCH_SIZE = 256
REWARD_SCALE = 0.1  # reward 缩放系数
NOISE = 0.05  # 动作噪声方差
TRAIN_EPISODE = 6e3  # 训练的总episode数
TRAIN_TOTAL_STEPS = 1e6   # 总训练步数
TEST_EVERY_STEPS = 1e4    # 每个N步评估一下算法效果,每次评估5个episode求平均reward

# 训练一个episode
def run_episode(env,agent,rpm):
    obs = env.reset()
    total_reward = 0
    steps = 0
    while True:
        steps += 1
        batch_obs = np.expand_dims(obs, axis=0)
        action = agent.predict(batch_obs.astype('float32'))
        action = np.squeeze(action)

        # 给输出动作增加探索扰动,输出限制在 [-1.0, 1.0] 范围内
        action = np.clip(np.random.normal(action, 1.0), -1.0, 1.0)
        # 动作映射到对应的 实际动作取值范围 内, action_mapping是从parl.utils那里import进来的函数
        action = action_mapping(action, env.action_space.low[0],
                                env.action_space.high[0])

        next_obs, reward, done, info = env.step(action)
        rpm.append(obs, action, REWARD_SCALE * reward, next_obs, done)

        if rpm.size() > MEMORY_WARMUP_SIZE:
            batch_obs, batch_action, batch_reward, batch_next_obs, \
                    batch_terminal = rpm.sample_batch(BATCH_SIZE)
            critic_cost = agent.learn(batch_obs, batch_action, batch_reward,
                                      batch_next_obs, batch_terminal)

        obs = next_obs
        total_reward += reward

        if done:
            break
    return total_reward, steps


# 评估 agent, 跑 5 个episode,总reward求平均
def evaluate(env, agent, render=False):
    eval_reward = []
    for i in range(5):
        obs = env.reset()
        total_reward, steps = 0, 0
        while True:
            # print("obs:",obs)
            batch_obs = np.expand_dims(obs, axis=0)
            action = agent.predict(batch_obs.astype('float32'))
            # print("before:",action)
            action = np.squeeze(action)
            # print("middle:",action)
            action = np.clip(action, -1.0, 1.0)
            # print("after:",action)
            action = action_mapping(action, env.action_space.low[0], 
                                    env.action_space.high[0])
            # print("last:",action)
            next_obs, reward, done, info = env.step(action)

            obs = next_obs
            total_reward += reward
            steps += 1

            if render:
                env.render()
            if done:
                break
        eval_reward.append(total_reward)
    return np.mean(eval_reward)


def main():
    env = make_env("Quadrotor", task="hovering_control")
    env.reset()

    obs_dim = env.observation_space.shape[0]
    act_dim = env.action_space.shape[0]

    # 使用PARL框架创建agent
    model = QuadrotorModel(act_dim)
    algorithm = DDPG(
        model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
    agent = QuadrotorAgent(algorithm, obs_dim, act_dim)
    agent.restore('model_dir/steps_1000177.ckpt')
    # 创建经验池
    rpm = ReplayMemory(int(MEMORY_SIZE), obs_dim, act_dim)
    # 启动训练
    test_flag = 0
    total_steps = 0
    while total_steps < TRAIN_TOTAL_STEPS:
        train_reward, steps = run_episode(env, agent, rpm)
        total_steps += steps
        #logger.info('Steps: {} Reward: {}'.format(total_steps, train_reward)) # 打印训练reward

        if total_steps // TEST_EVERY_STEPS >= test_flag: # 每隔一定step数,评估一次模型
            while total_steps // TEST_EVERY_STEPS >= test_flag:
                test_flag += 1
    
            evaluate_reward = evaluate(env, agent,True)
            logger.info('Steps {}, Test reward: {}'.format(
                total_steps, evaluate_reward)) # 打印评估的reward

            # 每评估一次,就保存一次模型,以训练的step数命名
            ckpt = 'model_dir/steps_{}.ckpt'.format(total_steps)
            agent.save(ckpt)


if __name__ == '__main__':
    main()

下图是 把不同时期的action打印出来,注意这个action因为是无人机的一些参数,速度,方向之类的,是4个值组成一个action。

DDPG「建议收藏」

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/7659.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信