大家好,欢迎来到IT知识分享网。
内容都是百度AIstudio的内容,我只是在这里做个笔记,不是原创。
把一位同学的课程笔记转载在自己博客里吧,加上自己的一些理解并解释代码,万一人家的博客被删了呢,我是站在别人的肩膀上继续前进。以下是链接
https://aistudio.baidu.com/aistudio/education/group/info/1335
https://blog.csdn.net/qq_44635194/article/details/106812096
同样,重要的概念放在前面:
Policy网络是根据状态obs输出动作action
Value网络是根据状态obs和动作action输出Q值(收益值)
上面这个例子,有一条经验exp(obs=1,action=real_action,reward,next_obs=2)
Learn过程如下:
- 根据当前状态obs=1,调用policy网络生成一个动作action,再把obs=1和action这2个参数给value网络,让优化器优化出产生最大Q值的网络结构。相当于在已知obs=1的情况下,调整参数使得这个action可以获得更大的Q。这个是有道理的,policy产生动作action,然后obs和action放到value网络中去产生Q值,policy产生动作action的目的就是希望value网络给个好分数,所以就是去优化policy网络去迎合value网络嘛,value网络就是学习环境的规律,根据环境状态obs和policy网络给出的action打分,所以policy网络去学习迎合value网路没问题。value学习环境的规律,policy学习value网络的规律。
- 根据next_obs通过policy网络得出next_action,然后通过value网络得到next_Q,,再根据公式把未来收益考虑进来得到(obs,real_action)的 target_Q = reward + (1.0 – terminal) * self.gamma * next_Q
- 然后再优化Q和target_Q就可以了。
一共4个网络的,同步也是在agent的learn方法里同步参数的
ddpg就是用来处理连续动作空间的,比如无人驾驶,旋转多少度,力度多少等等。推小车不再简单地左推右推,我们还要考虑力度的大小。
连续动作可以通过缩放求得
DQN的目标是选取动作以达到使Q最大化的目的,DDPG多了一个策略网络用来输出动作值,此动作以让Q最大化为目标。因此此策略网络的loss=-Q,即最小化-Q,相当于最大化Q值
深色的两个网络用来稳定真实值,浅色的网络用于计算Q值不断逼近真实值。
先看项目结构:
agent.py是定义智能体。predict方法就是你给一个状态obs,输出一个动作。learn就是学习的过程。
import numpy as np
import parl
from parl import layers
from paddle import fluid
class QuadrotorAgent(parl.Agent):
def __init__(self, algorithm, obs_dim, act_dim=4):
assert isinstance(obs_dim, int)
assert isinstance(act_dim, int)
self.obs_dim = obs_dim
self.act_dim = act_dim
super(QuadrotorAgent, self).__init__(algorithm)
# 注意,在最开始的时候,先完全同步target_model和model的参数
self.alg.sync_target(decay=0)
def build_program(self):
self.pred_program = fluid.Program()
self.learn_program = fluid.Program()
with fluid.program_guard(self.pred_program):
obs = layers.data(
name='obs', shape=[self.obs_dim], dtype='float32')
self.pred_act = self.alg.predict(obs)
with fluid.program_guard(self.learn_program):
obs = layers.data(
name='obs', shape=[self.obs_dim], dtype='float32')
act = layers.data(
name='act', shape=[self.act_dim], dtype='float32')
reward = layers.data(name='reward', shape=[], dtype='float32')
next_obs = layers.data(
name='next_obs', shape=[self.obs_dim], dtype='float32')
terminal = layers.data(name='terminal', shape=[], dtype='bool')
_, self.critic_cost = self.alg.learn(obs, act, reward, next_obs,
terminal)
def predict(self, obs):
obs = np.expand_dims(obs, axis=0)
act = self.fluid_executor.run(
self.pred_program, feed={'obs': obs},
fetch_list=[self.pred_act])[0]
return act
def learn(self, obs, act, reward, next_obs, terminal):
feed = {
'obs': obs,
'act': act,
'reward': reward,
'next_obs': next_obs,
'terminal': terminal
}
critic_cost = self.fluid_executor.run(
self.learn_program, feed=feed, fetch_list=[self.critic_cost])[0]
self.alg.sync_target()
return critic_cost
IT知识分享网
algorithm.py实现DDPG算法的部分。_actor_learn就是策略网络的学习,首先给定obs输出一个动作act,然后把当前状态obs和act放进Q网络,再让优化器去学习怎么输出更大的Q(代码里是平均值)。_critic_learn和DQN的Q网络一样,原先网络没考虑未来的收益,通过这个_critic_learn要把未来收益考虑进来,就是把当前收益值逼近真实的收益(考虑未来收益)。sync_target是更新target网络参数,也是在agent的learn里调用
IT知识分享网import parl
from parl import layers
from copy import deepcopy
from paddle import fluid
class DDPG(parl.Algorithm):
def __init__(self,
model,
gamma=None,
tau=None,
actor_lr=None,
critic_lr=None):
""" DDPG algorithm
Args:
model (parl.Model): actor and critic 的前向网络.
model 必须实现 get_actor_params() 方法.
gamma (float): reward的衰减因子.
tau (float): self.target_model 跟 self.model 同步参数 的 软更新参数
actor_lr (float): actor 的学习率
critic_lr (float): critic 的学习率
"""
assert isinstance(gamma, float)
assert isinstance(tau, float)
assert isinstance(actor_lr, float)
assert isinstance(critic_lr, float)
self.gamma = gamma
self.tau = tau
self.actor_lr = actor_lr
self.critic_lr = critic_lr
self.model = model
self.target_model = deepcopy(model)
def predict(self, obs):
""" 使用 self.model 的 actor model 来预测动作
"""
return self.model.policy(obs)
def learn(self, obs, action, reward, next_obs, terminal):
""" 用DDPG算法更新 actor 和 critic
"""
actor_cost = self._actor_learn(obs)
critic_cost = self._critic_learn(obs, action, reward, next_obs,
terminal)
return actor_cost, critic_cost
def _actor_learn(self, obs):
action = self.model.policy(obs)
Q = self.model.value(obs, action)
cost = layers.reduce_mean(-1.0 * Q)
optimizer = fluid.optimizer.AdamOptimizer(self.actor_lr)
optimizer.minimize(cost, parameter_list=self.model.get_actor_params())
return cost
def _critic_learn(self, obs, action, reward, next_obs, terminal):
next_action = self.target_model.policy(next_obs)
next_Q = self.target_model.value(next_obs, next_action)
terminal = layers.cast(terminal, dtype='float32')
target_Q = reward + (1.0 - terminal) * self.gamma * next_Q
target_Q.stop_gradient = True
Q = self.model.value(obs, action)
cost = layers.square_error_cost(Q, target_Q)
cost = layers.reduce_mean(cost)
optimizer = fluid.optimizer.AdamOptimizer(self.critic_lr)
optimizer.minimize(cost)
return cost
def sync_target(self, decay=None, share_vars_parallel_executor=None):
""" self.target_model从self.model复制参数过来,若decay不为None,则是软更新
"""
if decay is None:
decay = 1.0 - self.tau
self.model.sync_weights_to(
self.target_model,
decay=decay,
share_vars_parallel_executor=share_vars_parallel_executor)
model.py里定义了网络的结构,class ActorModel(parl.Model)是policy网络结构,CriticModel是Q网络的神经网络结构。
import paddle.fluid as fluid
import parl
from parl import layers
class ActorModel(parl.Model):
def __init__(self, act_dim):
hid_size = 100
self.fc1 = layers.fc(size=hid_size, act='relu')
self.fc2 = layers.fc(size=act_dim, act='tanh')
def policy(self, obs):
hid = self.fc1(obs)
means = self.fc2(hid)
return means
class CriticModel(parl.Model):
def __init__(self):
hid_size = 100
self.fc1 = layers.fc(size=hid_size, act='relu')
self.fc2 = layers.fc(size=1, act=None)
def value(self, obs, act):
concat = layers.concat([obs, act], axis=1)
hid = self.fc1(concat)
Q = self.fc2(hid)
Q = layers.squeeze(Q, axes=[1])
return Q
class QuadrotorModel(parl.Model):
def __init__(self, act_dim):
self.actor_model = ActorModel(act_dim)
self.critic_model = CriticModel()
def policy(self, obs):
return self.actor_model.policy(obs)
def value(self, obs, act):
return self.critic_model.value(obs, act)
def get_actor_params(self):
return self.actor_model.parameters()
train.py就是程序入口了,run_episode就跑一个周期,在探索,经验池数量够了,就每走1个step就learn一次。evaluate评估模型效果。
IT知识分享网import gym
import numpy as np
import parl
from parl.utils import logger
from parl.utils import action_mapping
from agent import QuadrotorAgent
from model import QuadrotorModel
from algorithm import DDPG # from parl.algorithms import DDPG
from parl.utils import ReplayMemory # 经验回放
from rlschool import make_env # 使用 RLSchool 创建飞行器环境
ACTOR_LR = 1e-3 # Actor网络的 learning rate
CRITIC_LR = 1e-3 # Critic网络的 learning rate
GAMMA = 0.99 # reward 的衰减因子
TAU = 0.001 # 软更新的系数
MEMORY_SIZE = int(1e6) # 经验池大小
MEMORY_WARMUP_SIZE = MEMORY_SIZE // 20 # 预存一部分经验之后再开始训练
BATCH_SIZE = 256
REWARD_SCALE = 0.1 # reward 缩放系数
NOISE = 0.05 # 动作噪声方差
TRAIN_EPISODE = 6e3 # 训练的总episode数
TRAIN_TOTAL_STEPS = 1e6 # 总训练步数
TEST_EVERY_STEPS = 1e4 # 每个N步评估一下算法效果,每次评估5个episode求平均reward
# 训练一个episode
def run_episode(env,agent,rpm):
obs = env.reset()
total_reward = 0
steps = 0
while True:
steps += 1
batch_obs = np.expand_dims(obs, axis=0)
action = agent.predict(batch_obs.astype('float32'))
action = np.squeeze(action)
# 给输出动作增加探索扰动,输出限制在 [-1.0, 1.0] 范围内
action = np.clip(np.random.normal(action, 1.0), -1.0, 1.0)
# 动作映射到对应的 实际动作取值范围 内, action_mapping是从parl.utils那里import进来的函数
action = action_mapping(action, env.action_space.low[0],
env.action_space.high[0])
next_obs, reward, done, info = env.step(action)
rpm.append(obs, action, REWARD_SCALE * reward, next_obs, done)
if rpm.size() > MEMORY_WARMUP_SIZE:
batch_obs, batch_action, batch_reward, batch_next_obs, \
batch_terminal = rpm.sample_batch(BATCH_SIZE)
critic_cost = agent.learn(batch_obs, batch_action, batch_reward,
batch_next_obs, batch_terminal)
obs = next_obs
total_reward += reward
if done:
break
return total_reward, steps
# 评估 agent, 跑 5 个episode,总reward求平均
def evaluate(env, agent, render=False):
eval_reward = []
for i in range(5):
obs = env.reset()
total_reward, steps = 0, 0
while True:
# print("obs:",obs)
batch_obs = np.expand_dims(obs, axis=0)
action = agent.predict(batch_obs.astype('float32'))
# print("before:",action)
action = np.squeeze(action)
# print("middle:",action)
action = np.clip(action, -1.0, 1.0)
# print("after:",action)
action = action_mapping(action, env.action_space.low[0],
env.action_space.high[0])
# print("last:",action)
next_obs, reward, done, info = env.step(action)
obs = next_obs
total_reward += reward
steps += 1
if render:
env.render()
if done:
break
eval_reward.append(total_reward)
return np.mean(eval_reward)
def main():
env = make_env("Quadrotor", task="hovering_control")
env.reset()
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]
# 使用PARL框架创建agent
model = QuadrotorModel(act_dim)
algorithm = DDPG(
model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
agent = QuadrotorAgent(algorithm, obs_dim, act_dim)
agent.restore('model_dir/steps_1000177.ckpt')
# 创建经验池
rpm = ReplayMemory(int(MEMORY_SIZE), obs_dim, act_dim)
# 启动训练
test_flag = 0
total_steps = 0
while total_steps < TRAIN_TOTAL_STEPS:
train_reward, steps = run_episode(env, agent, rpm)
total_steps += steps
#logger.info('Steps: {} Reward: {}'.format(total_steps, train_reward)) # 打印训练reward
if total_steps // TEST_EVERY_STEPS >= test_flag: # 每隔一定step数,评估一次模型
while total_steps // TEST_EVERY_STEPS >= test_flag:
test_flag += 1
evaluate_reward = evaluate(env, agent,True)
logger.info('Steps {}, Test reward: {}'.format(
total_steps, evaluate_reward)) # 打印评估的reward
# 每评估一次,就保存一次模型,以训练的step数命名
ckpt = 'model_dir/steps_{}.ckpt'.format(total_steps)
agent.save(ckpt)
if __name__ == '__main__':
main()
下图是 把不同时期的action打印出来,注意这个action因为是无人机的一些参数,速度,方向之类的,是4个值组成一个action。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/7659.html