Here in the fourth part of our self-driving cars with Carla, Python, TensorFlow, and reinforcement learning project, we're going to be working on coding our actual agent. In the previous tutorial, we worked on the environment class, which our agent will interact with.
As we consider how to create our agent, we have to also consider the model itself. Assuming you've been through the reinforcement learning tutorials (which you had better, otherwise things are about to get very confusing for you), you know that every step an agent takes comes not only with a plausible prediction (depending on exploration/epsilon), but also with a fitment! This means that we're training and predicting at the same time, but it's pretty essential that our agent gets the most FPS (frames per second) possible. We'd also like our trainer to go as quick as possible. To achieve this, we can use either multiprocessing, or threading. Threading allows us to keep things still fairly simple. Later, I will likely at least open-source some multiprocessing code for this task at some point, but, for now, threading it is.
Code up to this point:
import glob
import os
import sys
import random
import time
import numpy as np
import cv2
import math
try:
sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
sys.version_info.major,
sys.version_info.minor,
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
pass
import carla
SHOW_PREVIEW = False
IM_WIDTH = 640
IM_HEIGHT = 480
SECONDS_PER_EPISODE = 10
class CarEnv:
SHOW_CAM = SHOW_PREVIEW
STEER_AMT = 1.0
im_width = IM_WIDTH
im_height = IM_HEIGHT
front_camera = None
def __init__(self):
self.client = carla.Client("localhost", 2000)
self.client.set_timeout(2.0)
self.world = self.client.get_world()
self.blueprint_library = self.world.get_blueprint_library()
self.model_3 = self.blueprint_library.filter("model3")[0]
def reset(self):
self.collision_hist = []
self.actor_list = []
self.transform = random.choice(self.world.get_map().get_spawn_points())
self.vehicle = self.world.spawn_actor(self.model_3, self.transform)
self.actor_list.append(self.vehicle)
self.rgb_cam = self.blueprint_library.find('sensor.camera.rgb')
self.rgb_cam.set_attribute("image_size_x", f"{self.im_width}")
self.rgb_cam.set_attribute("image_size_y", f"{self.im_height}")
self.rgb_cam.set_attribute("fov", f"110")
transform = carla.Transform(carla.Location(x=2.5, z=0.7))
self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)
self.actor_list.append(self.sensor)
self.sensor.listen(lambda data: self.process_img(data))
self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))
time.sleep(4)
colsensor = self.blueprint_library.find("sensor.other.collision")
self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle)
self.actor_list.append(self.colsensor)
self.colsensor.listen(lambda event: self.collision_data(event))
while self.front_camera is None:
time.sleep(0.01)
self.episode_start = time.time()
self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))
return self.front_camera
def collision_data(self, event):
self.collision_hist.append(event)
def process_img(self, image):
i = np.array(image.raw_data)
#print(i.shape)
i2 = i.reshape((self.im_height, self.im_width, 4))
i3 = i2[:, :, :3]
if self.SHOW_CAM:
cv2.imshow("", i3)
cv2.waitKey(1)
self.front_camera = i3
def step(self, action):
if action == 0:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))
elif action == 1:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer= 0))
elif action == 2:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))
v = self.vehicle.get_velocity()
kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2))
if len(self.collision_hist) != 0:
done = True
reward = -200
elif kmh < 50:
done = False
reward = -1
else:
done = False
reward = 1
if self.episode_start + SECONDS_PER_EPISODE < time.time():
done = True
return self.front_camera, reward, done, None
Now, we're going to create a new class, the DQNAgent class. We'll start with:
class DQNAgent:
def __init__(self):
self.model = self.create_model()
self.target_model = self.create_model()
self.target_model.set_weights(self.model.get_weights())
This is the same concept as from the reinforcement learning tutorials, where we have a main network, which is constantly evolving, and then the target network, which we update every n things, where n is whatever you want and things is something like steps or episodes.
As we train, we train from randomly selected data from our replay memory:
self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
For the same reasons as before (the RL tutorial), we will be modifying TensorBoard:
self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}")
Now we can wrap up our init method by setting some final values:
self.target_update_counter = 0 # will track when it's time to update the target model
self.graph = tf.get_default_graph()
self.terminate = False # Should we quit?
self.last_logged_episode = 0
self.training_initialized = False # waiting for TF to get rolling
We're going to use elf.training_initialized to track when TensorFlow is ready to get going. The first predictions/fitments when a model begins take extra long, so we're going to just pass some nonsense information initially to prime our model to actually get moving.
Now let's create our model:
def create_model(self):
base_model = Xception(weights=None, include_top=False, input_shape=(IM_HEIGHT, IM_WIDTH,3))
x = base_model.output
x = GlobalAveragePooling2D()(x)
predictions = Dense(3, activation="linear")(x)
model = Model(inputs=base_model.input, outputs=predictions)
model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"])
return model
Here, we're just going to make use of the premade Xception model, but you could make some other model, or import a different one. Note that we're adding GlobalAveragePooling to our ouput layer, as well as obviously adding the 3 neuron output that is each possible action for the agent to take. Let's make some required imports for this method:
from keras.applications.xception import Xception from keras.layers import Dense, GlobalAveragePooling2D from keras.optimizers import Adam from keras.models import Model
We need a quick method in our DQNAgent for updating replay memory:
def update_replay_memory(self, transition):
# transition = (current_state, action, reward, new_state, done)
self.replay_memory.append(transition)
As long as our method is this simple, we probably don't even need a method for this, but I'll leave that for now.
Now to the train method. To begin, we only want to train if we have a bare minimum of samples in replay memory:
def train(self):
if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
return
Since we have a lot of constants like this, let's go ahead and just slap all of them in at once before I forget:
REPLAY_MEMORY_SIZE = 5_000 MIN_REPLAY_MEMORY_SIZE = 1_000 MINIBATCH_SIZE = 16 PREDICTION_BATCH_SIZE = 1 TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4 UPDATE_TARGET_EVERY = 5 MODEL_NAME = "Xception" MEMORY_FRACTION = 0.8 MIN_REWARD = -200 EPISODES = 100 DISCOUNT = 0.99 epsilon = 1 EPSILON_DECAY = 0.95 ## 0.9975 99975 MIN_EPSILON = 0.001 AGGREGATE_STATS_EVERY = 10
Put those at the top of the script, obviously, outside of all classes/functions/methods...etc. If at any point you're confused about where things go, you can check the end of the tutorial to see code up to that point.
Back to our train method:
def train(self):
if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
return
If we don't have enough samples, we'll just return and be done. If we do, then we will begin our training. First, we need to grab a random minibatch:
minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)
Once we have our minibatch, we want to grab our current and future q values
current_states = np.array([transition[0] for transition in minibatch])/255
with self.graph.as_default():
current_qs_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE)
new_current_states = np.array([transition[3] for transition in minibatch])/255
with self.graph.as_default():
future_qs_list = self.target_model.predict(new_current_states, PREDICTION_BATCH_SIZE)
Recall a transition is: transition = (current_state, action, reward, new_state, done)
Now, we create our inputs (X) and outputs (y):
X = []
y = []
for index, (current_state, action, reward, new_state, done) in enumerate(minibatch):
if not done:
max_future_q = np.max(future_qs_list[index])
new_q = reward + DISCOUNT * max_future_q
else:
new_q = reward
current_qs = current_qs_list[index]
current_qs[action] = new_q
X.append(current_state)
y.append(current_qs)
Nothing new or fancy here, pretty much identical to our reinforcement learning tutorials (DQN).
log_this_step = False
if self.tensorboard.step > self.last_logged_episode:
log_this_step = True
self.last_log_episode = self.tensorboard.step
We're only trying to log per episode, not actual training step, so we're going to use the above to keep track.
Next, we'll fit:
with self.graph.as_default():
self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None)
Notice that we're setting the tensorboard callback, only if log_this_step is true. If it's false, then we'll still fit, we just wont log to TensorBoard.
Next, we want to continue tracking for logging:
if log_this_step:
self.target_update_counter += 1
Finally, we'll check to see if it's time to update our target_model:
if self.target_update_counter > UPDATE_TARGET_EVERY:
self.target_model.set_weights(self.model.get_weights())
self.target_update_counter = 0
Now we just need 2 more methods and we're complete with our agent class. First, we need a method to get q values (basically to make a prediction)
def get_qs(self, state):
return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]
Finally, we just need to actually do training:
def train_in_loop(self):
X = np.random.uniform(size=(1, IM_HEIGHT, IM_WIDTH, 3)).astype(np.float32)
y = np.random.uniform(size=(1, 3)).astype(np.float32)
with self.graph.as_default():
self.model.fit(X,y, verbose=False, batch_size=1)
self.training_initialized = True
To start, we use some random data like above to initialize, then we begin our infinite loop:
while True:
if self.terminate:
return
self.train()
time.sleep(0.01)
Alright, that's our DQNAgent class. Full code up to this point:
import glob
import os
import sys
import random
import time
import numpy as np
import cv2
import math
from collections import deque
from keras.applications.xception import Xception
from keras.layers import Dense, GlobalAveragePooling2D
from keras.optimizers import Adam
from keras.models import Model
try:
sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
sys.version_info.major,
sys.version_info.minor,
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
pass
import carla
SHOW_PREVIEW = False
IM_WIDTH = 640
IM_HEIGHT = 480
SECONDS_PER_EPISODE = 10
REPLAY_MEMORY_SIZE = 5_000
MIN_REPLAY_MEMORY_SIZE = 1_000
MINIBATCH_SIZE = 16
PREDICTION_BATCH_SIZE = 1
TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4
UPDATE_TARGET_EVERY = 5
MODEL_NAME = "Xception"
MEMORY_FRACTION = 0.8
MIN_REWARD = -200
EPISODES = 100
DISCOUNT = 0.99
epsilon = 1
EPSILON_DECAY = 0.95 ## 0.9975 99975
MIN_EPSILON = 0.001
AGGREGATE_STATS_EVERY = 10
class CarEnv:
SHOW_CAM = SHOW_PREVIEW
STEER_AMT = 1.0
im_width = IM_WIDTH
im_height = IM_HEIGHT
front_camera = None
def __init__(self):
self.client = carla.Client("localhost", 2000)
self.client.set_timeout(2.0)
self.world = self.client.get_world()
self.blueprint_library = self.world.get_blueprint_library()
self.model_3 = self.blueprint_library.filter("model3")[0]
def reset(self):
self.collision_hist = []
self.actor_list = []
self.transform = random.choice(self.world.get_map().get_spawn_points())
self.vehicle = self.world.spawn_actor(self.model_3, self.transform)
self.actor_list.append(self.vehicle)
self.rgb_cam = self.blueprint_library.find('sensor.camera.rgb')
self.rgb_cam.set_attribute("image_size_x", f"{self.im_width}")
self.rgb_cam.set_attribute("image_size_y", f"{self.im_height}")
self.rgb_cam.set_attribute("fov", f"110")
transform = carla.Transform(carla.Location(x=2.5, z=0.7))
self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)
self.actor_list.append(self.sensor)
self.sensor.listen(lambda data: self.process_img(data))
self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))
time.sleep(4)
colsensor = self.blueprint_library.find("sensor.other.collision")
self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle)
self.actor_list.append(self.colsensor)
self.colsensor.listen(lambda event: self.collision_data(event))
while self.front_camera is None:
time.sleep(0.01)
self.episode_start = time.time()
self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))
return self.front_camera
def collision_data(self, event):
self.collision_hist.append(event)
def process_img(self, image):
i = np.array(image.raw_data)
#print(i.shape)
i2 = i.reshape((self.im_height, self.im_width, 4))
i3 = i2[:, :, :3]
if self.SHOW_CAM:
cv2.imshow("", i3)
cv2.waitKey(1)
self.front_camera = i3
def step(self, action):
if action == 0:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))
elif action == 1:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer= 0))
elif action == 2:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))
v = self.vehicle.get_velocity()
kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2))
if len(self.collision_hist) != 0:
done = True
reward = -200
elif kmh < 50:
done = False
reward = -1
else:
done = False
reward = 1
if self.episode_start + SECONDS_PER_EPISODE < time.time():
done = True
return self.front_camera, reward, done, None
class DQNAgent:
def __init__(self):
self.model = self.create_model()
self.target_model = self.create_model()
self.target_model.set_weights(self.model.get_weights())
self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}")
self.target_update_counter = 0
self.graph = tf.get_default_graph()
self.terminate = False
self.last_logged_episode = 0
self.training_initialized = False
def create_model(self):
base_model = Xception(weights=None, include_top=False, input_shape=(IM_HEIGHT, IM_WIDTH,3))
x = base_model.output
x = GlobalAveragePooling2D()(x)
predictions = Dense(3, activation="linear")(x)
model = Model(inputs=base_model.input, outputs=predictions)
model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"])
return model
def update_replay_memory(self, transition):
# transition = (current_state, action, reward, new_state, done)
self.replay_memory.append(transition)
def train(self):
if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
return
minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)
current_states = np.array([transition[0] for transition in minibatch])/255
with self.graph.as_default():
current_qs_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE)
new_current_states = np.array([transition[3] for transition in minibatch])/255
with self.graph.as_default():
future_qs_list = self.target_model.predict(new_current_states, PREDICTION_BATCH_SIZE)
X = []
y = []
for index, (current_state, action, reward, new_state, done) in enumerate(minibatch):
if not done:
max_future_q = np.max(future_qs_list[index])
new_q = reward + DISCOUNT * max_future_q
else:
new_q = reward
current_qs = current_qs_list[index]
current_qs[action] = new_q
X.append(current_state)
y.append(current_qs)
log_this_step = False
if self.tensorboard.step > self.last_logged_episode:
log_this_step = True
self.last_log_episode = self.tensorboard.step
with self.graph.as_default():
self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None)
if log_this_step:
self.target_update_counter += 1
if self.target_update_counter > UPDATE_TARGET_EVERY:
self.target_model.set_weights(self.model.get_weights())
self.target_update_counter = 0
def get_qs(self, state):
return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]
def train_in_loop(self):
X = np.random.uniform(size=(1, IM_HEIGHT, IM_WIDTH, 3)).astype(np.float32)
y = np.random.uniform(size=(1, 3)).astype(np.float32)
with self.graph.as_default():
self.model.fit(X,y, verbose=False, batch_size=1)
self.training_initialized = True
while True:
if self.terminate:
return
self.train()
time.sleep(0.01)
Now, in the next tutorial, we'll put the finishing touches on things and actually run to see what we've got!