Commit 3db7626a authored by Hudson Yeo's avatar Hudson Yeo
Browse files

add files

parent 03a5f3e6
# -*- coding: utf-8 -*-
"""Untitled11.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/16AO5LtaahOGixsomKG9TvHMI8WM0SvfL
"""
import os
from typing import Dict, List, Tuple
import gym
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from gym import spaces
class cliff_walking():
def __init__(self,start_pos=[3,0],goal=[3,9],e=0.1,windy_x=[0,3],windy_y=[2,9],pos_max=[4,12],reward_region=9):
self.d_names={0:'U',
1:'R',
2:'D',
3:'L'}
self.d={0:np.array([-1,0]),
1:np.array([0,1]),
2:np.array([1,0]),
3:np.array([0,-1])}
self.start_pos=np.array(start_pos)
self.pos=np.array(start_pos)
self.pos_max_x=pos_max[0]-1
self.pos_max_y=pos_max[1]-1
self.goal=np.array(goal)
self.action_space = spaces.Discrete(4)
self.observation_space = spaces.Tuple((
spaces.Discrete(self.pos_max_x+1),
spaces.Discrete(self.pos_max_y+1)))
self.windy_x=windy_x
self.windy_y=windy_y
self.e=e
self.reward_region=reward_region
def reset(self):
self.pos=self.start_pos
return self.pos
def is_windy(self,pos):
if pos[0]<=self.windy_x[1] and pos[0]>=self.windy_x[0] and pos[1]<=self.windy_y[1] and pos[1]>=self.windy_y[0]:
return True
else:
return False
def next_pos(self,a):
if self.is_windy(self.pos) and np.random.random()<self.e:
next_pos=self.pos+self.d[2] #go down
else:
next_pos=self.pos+self.d[a]
if next_pos[0]<0:
next_pos[0]=0
if next_pos[0]>self.pos_max_x:
next_pos[0]=self.pos_max_x
if next_pos[1]<0:
next_pos[1]=0
if next_pos[1]>self.pos_max_y:
next_pos[1]=self.pos_max_y
return next_pos
def reward_terminal(self,pos):
assert pos[0]<=self.pos_max_x
assert pos[1]<=self.pos_max_y
if pos[0]==3 and pos[1]>0:
isdone=True
if pos[1]>self.reward_region:
reward=1
else:
reward=-1
else:
reward=-0.01
isdone=False
return reward, isdone
def step(self,a):
a=int(a)
self.pos=self.next_pos(a)
reward,isdone=self.reward_terminal(self.pos)
return self.pos,reward, isdone, _
def render(self,*args,**kwargs):
import pandas as pd
a=np.ndarray([self.pos_max_x+1,self.pos_max_y+1],dtype=object)
a.fill(' ')
a[-1,self.reward_region:]='T'
a[tuple(self.pos)]='X'
print(pd.DataFrame(a))
def close(self):
pass
def seed(self,seed):
torch.manual_seed(seed)
class ReplayBuffer:
"""A simple numpy replay buffer."""
def __init__(self, obs_dim: int, size: int, batch_size: int = 32):
self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
self.next_obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
self.acts_buf = np.zeros([size], dtype=np.float32)
self.rews_buf = np.zeros([size], dtype=np.float32)
self.done_buf = np.zeros(size, dtype=np.float32)
self.max_size, self.batch_size = size, batch_size
self.ptr, self.size, = 0, 0
def store(
self,
obs: np.ndarray,
act: np.ndarray,
rew: float,
next_obs: np.ndarray,
done: bool,
):
#try:
self.obs_buf[self.ptr] = obs
self.next_obs_buf[self.ptr] = next_obs
self.acts_buf[self.ptr] = act
self.rews_buf[self.ptr] = rew
self.done_buf[self.ptr] = done
self.ptr = (self.ptr + 1) % self.max_size
self.size = min(self.size + 1, self.max_size)
def sample_batch(self) -> Dict[str, np.ndarray]:
idxs = np.random.choice(self.size, size=self.batch_size, replace=False)
return dict(obs=self.obs_buf[idxs],
next_obs=self.next_obs_buf[idxs],
acts=self.acts_buf[idxs],
rews=self.rews_buf[idxs],
done=self.done_buf[idxs])
def __len__(self) -> int:
return self.size
class Network(nn.Module):
def __init__(
self,
in_dim: int,
out_dim: int,
atom_size: int,
support: torch.Tensor
):
"""Initialization."""
super(Network, self).__init__()
self.support = support
self.out_dim = out_dim
self.atom_size = atom_size
self.layers = nn.Sequential(
nn.Linear(in_dim, 300),
nn.ReLU(),
nn.Linear(300, 300),
nn.ReLU(),
nn.Linear(300, out_dim * atom_size)
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Forward method implementation."""
dist = self.dist(x)
q = torch.sum(dist * self.support, dim=2)
return q
def risk_action(self, x: torch.Tensor,alpha=1) -> torch.Tensor:
dist = self.dist(x)
q = torch.sum(dist * self.support, dim=2)
std=self.std_dev(dist,q)
return q-alpha*std
def std_dev(self,dist,q):
cm=(self.support.reshape(51,1)-q)**2
var=torch.sum(dist * cm.T,dim=-1)
return var.sqrt()
def dist(self, x: torch.Tensor) -> torch.Tensor:
"""Get distribution for atoms."""
q_atoms = self.layers(x).view(-1, self.out_dim, self.atom_size)
dist = F.softmax(q_atoms, dim=-1)
dist = dist.clamp(min=1e-3) # for avoiding nans
return dist
class DQNAgent:
"""DQN Agent interacting with environment.
Attribute:
env (gym.Env): openAI Gym environment
memory (ReplayBuffer): replay memory to store transitions
batch_size (int): batch size for sampling
epsilon (float): parameter for epsilon greedy policy
epsilon_decay (float): step size to decrease epsilon
max_epsilon (float): max value of epsilon
min_epsilon (float): min value of epsilon
target_update (int): period for target model's hard update
gamma (float): discount factor
dqn (Network): model to train and select actions
dqn_target (Network): target model to update
optimizer (torch.optim): optimizer for training dqn
transition (list): transition information including
state, action, reward, next_state, done
v_min (float): min value of support
v_max (float): max value of support
atom_size (int): the unit number of support
support (torch.Tensor): support for categorical dqn
"""
def __init__(
self,
env: gym.Env,
memory_size: int,
batch_size: int,
target_update: int,
epsilon_decay: float,
max_epsilon: float = 1.0,
min_epsilon: float = 0.10,
gamma: float = 0.90,
# Categorical DQN parameters
v_min: float = -3,
v_max: float = 3,
atom_size: int = 51,
):
"""Initialization.
Args:
env (gym.Env): openAI Gym environment
memory_size (int): length of memory
batch_size (int): batch size for sampling
target_update (int): period for target model's hard update
epsilon_decay (float): step size to decrease epsilon
lr (float): learning rate
max_epsilon (float): max value of epsilon
min_epsilon (float): min value of epsilon
gamma (float): discount factor
v_min (float): min value of support
v_max (float): max value of support
atom_size (int): the unit number of support
"""
obs_dim = 2#env.observation_space.shape[0]
action_dim = env.action_space.n
self.env = env
self.memory = ReplayBuffer(obs_dim, memory_size, batch_size)
self.batch_size = batch_size
self.epsilon = max_epsilon
self.epsilon_decay = epsilon_decay
self.max_epsilon = max_epsilon
self.min_epsilon = min_epsilon
self.target_update = target_update
self.gamma = gamma
# device: cpu / gpu
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(self.device)
# Categorical DQN parameters
self.v_min = v_min
self.v_max = v_max
self.atom_size = atom_size
self.support = torch.linspace(
self.v_min, self.v_max, self.atom_size
).to(self.device)
# networks: dqn, dqn_target
self.dqn = Network(
obs_dim, action_dim, atom_size, self.support
).to(self.device)
self.dqn_target = Network(
obs_dim, action_dim, atom_size, self.support
).to(self.device)
self.dqn_target.load_state_dict(self.dqn.state_dict())
self.dqn_target.eval()
# optimizer
self.optimizer = optim.Adam(self.dqn.parameters(),lr=0.0001)
# transition to store in memory
self.transition = list()
# mode: train / test
self.is_test = False
def select_action(self, state: np.ndarray) -> np.ndarray:
"""Select an action from the input state."""
# epsilon greedy policy
if self.epsilon > np.random.random():
selected_action = self.env.action_space.sample()
else:
selected_action = self.dqn(
torch.FloatTensor(state).to(self.device)
).argmax()
selected_action = selected_action.detach().cpu().numpy()
if not self.is_test:
self.transition = [state, selected_action]
return selected_action
def greedy_action(self,state):
selected_action = self.dqn(
torch.FloatTensor(state).to(self.device)
).argmax()
selected_action = selected_action.detach().cpu().numpy()
return selected_action
def split(self,next_state):
if len(str(next_state))==1:
next_state=torch.tensor([0,int(str(next_state)[0])]).float()#split
else:
next_state=torch.tensor([int(str(next_state)[0]),int(str(next_state)[1])]).float() #split
return next_state
def step(self, action: np.ndarray) -> Tuple[np.ndarray, np.float64, bool]:
"""Take an action and return the response of the env."""
next_state, reward, done, _ = self.env.step(int(action))
if not self.is_test:
#print('transition',[reward, next_state, done])
self.transition += [reward, next_state, done]
#print(self.transition)
self.memory.store(*self.transition)
return next_state, reward, done
def update_model(self) -> torch.Tensor:
"""Update the model by gradient descent."""
samples = self.memory.sample_batch()
loss = self._compute_dqn_loss(samples)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def train(self, num_frames: int, plotting_interval: int = 200):
"""Train the agent."""
self.is_test = False
state = self.env.reset()
update_cnt = 0
epsilons = []
losses = []
scores = []
score = 0
for frame_idx in range(1, num_frames + 1):
action = self.select_action(state)
next_state, reward, done = self.step(action)
state = next_state
score += reward
# if episode ends
if done:
state = self.env.reset()
scores.append(score)
score = 0
# if training is ready
if len(self.memory) >= self.batch_size:
loss = self.update_model()
losses.append(loss)
update_cnt += 1
# linearly decrease epsilon
self.epsilon = max(self.min_epsilon, self.epsilon - (self.max_epsilon - self.min_epsilon) * self.epsilon_decay)
epsilons.append(self.epsilon)
# if hard update is needed
if update_cnt % self.target_update == 0:
self._target_hard_update()
# plotting
if frame_idx % plotting_interval == 0:
self._plot(frame_idx, scores, losses, epsilons)
self.env.close()
def test_(self,printout=False):
self.is_test=True
state = self.env.reset()
done = False
score = 0
while not done:
action = self.greedy_action(state)
if printout:
print(state,action)
next_state, reward, done = self.step(action)
state = next_state
score += reward
self.is_test=False
return score
def test(self) -> List[np.ndarray]:
"""Test the agent."""
self.is_test = True
state = self.env.reset()
done = False
score = 0
frames = []
while not done:
frames.append(self.env.render(mode="rgb_array"))
action = self.select_action(state)
next_state, reward, done = self.step(action)
state = next_state
score += reward
print("score: ", score)
self.env.close()
return frames
def _compute_dqn_loss(self, samples: Dict[str, np.ndarray]) -> torch.Tensor:
"""Return categorical dqn loss."""
device = self.device # for shortening the following lines
state = torch.FloatTensor(samples["obs"]).to(device)
next_state = torch.FloatTensor(samples["next_obs"]).to(device)
action = torch.LongTensor(samples["acts"]).to(device)
reward = torch.FloatTensor(samples["rews"].reshape(-1, 1)).to(device)
done = torch.FloatTensor(samples["done"].reshape(-1, 1)).to(device)
#print(samples)
# Categorical DQN algorithm
delta_z = float(self.v_max - self.v_min) / (self.atom_size - 1)
#print('dz',delta_z.shape)
with torch.no_grad():
next_action = self.dqn_target(next_state).argmax(1)
#print('next_action',next_action)
next_dist = self.dqn_target.dist(next_state)
#print('next dist',next_dist)
next_dist = next_dist[range(self.batch_size), next_action]
#print('final_dist',next_dist)
t_z = reward + (1 - done) * self.gamma * self.support
t_z = t_z.clamp(min=self.v_min, max=self.v_max)
b = (t_z - self.v_min) / delta_z
l = b.floor().long()
u = b.ceil().long()
offset = (
torch.linspace(
0, (self.batch_size - 1) * self.atom_size, self.batch_size
).long()
.unsqueeze(1)
.expand(self.batch_size, self.atom_size)
.to(self.device)
)
proj_dist = torch.zeros(next_dist.size(), device=self.device)
proj_dist.view(-1).index_add_(
0, (l + offset).view(-1), (next_dist * (u.float() - b)).view(-1)
)
proj_dist.view(-1).index_add_(
0, (u + offset).view(-1), (next_dist * (b - l.float())).view(-1)
)
dist = self.dqn.dist(state)
log_p = torch.log(dist[range(self.batch_size), action])
loss = -(proj_dist * log_p).sum(1).mean()
#print('loss shape',loss.shape)
return loss
def _target_hard_update(self):
"""Hard update: target <- local."""
self.dqn_target.load_state_dict(self.dqn.state_dict())
def _plot(
self,
frame_idx: int,
scores: List[float],
losses: List[float],
epsilons: List[float],
):
"""Plot the training progresses."""
clear_output(True)
plt.figure(figsize=(20, 5))
plt.subplot(131)
plt.title('frame %s. score: %s' % (frame_idx, np.mean(scores[-10:])))
plt.plot(scores)
plt.subplot(132)
plt.title('loss')
plt.plot(losses)
plt.subplot(133)
plt.title('epsilons')
plt.plot(epsilons)
plt.show()
import numpy as np
class pacman():
def __init__(self):
self.d={0:np.array([-1,0]),
1:np.array([0,1]),
2:np.array([1,0]),
3:np.array([0,-1])}
self.grid=np.ones([5,18])
self.pos=np.array([4,8])
self.grid[4,8]=0
self.enemy_pos=np.array([0,9])
self.obs_list=np.array([[1,1],[1,2],[2,1],[3,1],[3,2],[3,3],[3,4],[2,4],[0,6],[1,6],[1,7],[1,10],[1,11],[0,11],[3,6],[3,7],[3,8],[3,9],[3,10],[3,11],[2,13],[3,13],[3,14],[3,15],[3,16],[2,16],[1,16],[1,15]],dtype=np.uint8)
for pos in self.obs_list:
self.grid[pos[0],pos[1]]=-1
self.count=0
def state(self):
out=self.grid.copy()
out[self.pos[0],self.pos[1]]=2
out[self.enemy_pos[0],self.enemy_pos[1]]=-2
return out.reshape(1,5,18)
def obs(self,pos):
return np.any(np.all(pos == self.obs_list,axis=1))
def reset(self):
self.grid=np.ones([5,18])
for pos in self.obs_list:
self.grid[pos[0],pos[1]]=-1
self.pos=np.array([4,8])
self.grid[4,8]=0
self.enemy_pos=np.array([0,9])
self.count=0
return self.state()
def move(self,pos,a):
#print(a)
next_pos=pos+self.d[a].copy()
if self.obs(next_pos):
return pos
if next_pos[0]<0:
next_pos[0]=0
if next_pos[0]>4:
next_pos[0]=4
if next_pos[1]<0:
next_pos[1]=0
if next_pos[1]>17:
next_pos[1]=17
return next_pos
def shortest_path(self,present,goal):
path=np.full(self.grid.shape,np.inf)
#for pos in self.obs_list:
#path[pos[0],pos[1]]=100
path[goal[0],goal[1]]=0
queue=[]
queue.append(goal)
while path[present[0],present[1]]==np.inf:
current=queue.pop(0)
for i in self.d:
out=self.move(current,i)
#print(out,path[out[0],out[1]],path[current[0],current[1]]+1)
if path[out[0],out[1]]>(path[current[0],current[1]]+1):
path[out[0],out[1]]=path[current[0],current[1]]+1
queue.append(out)
#plt.imshow(path)
return path
def shortest_path_direction(self,present,goal):
path=self.shortest_path(present,goal)
out=[]
for i in range(4):
next_pos=self.move(present,i)
out.append(path[next_pos[0],next_pos[1]])
return np.argmin(out)
def step(self,a):
isdone=False
next_pos=self.move(self.pos,a)
if np.random.random()<0.5:
enemy_next_pos=self.move(self.enemy_pos,np.random.randint(0,4))
else:
enemy_next_pos=self.move(self.enemy_pos,self.shortest_path_direction(self.enemy_pos,self.pos))
if self.grid[next_pos[0],next_pos[1]]==1:
self.grid[next_pos[0],next_pos[1]]=0
reward=0.1
else:
reward=0
if np.all(next_pos==enemy_next_pos) or (np.all(self.pos==enemy_next_pos) and np.all(next_pos==self.enemy_pos)): #run into same position or swapped position
reward=-1
isdone=True
elif np.all(self.grid!=1):
reward=1
isdone=True
self.pos=next_pos
self.enemy_pos=enemy_next_pos
return self.state(),reward, isdone, None
def render(self,*args,**kwargs):
plt.imshow(self.state())
def close(self):
pass
def seed(self,seed):
np.random.seed(seed)