-
Notifications
You must be signed in to change notification settings - Fork 18
Traffic Signal Controller Sample Code
Here we provide a sample code to illustrate how to generate a traffic signal plan through interacting with the simulator. The sample code is here: https://github.com/tianrang-intelligence/TSCC2019 In the sample code, we implement two baseline methods: Self-organizing traffic light (SOTL) and reinforcement learning.
You can run SOTL using the following command:
python3 run_sotl.py --scenario "hangzhou_bc_tyc_1h_7_8_1848"
You can run a basic reinforcement learning code using the following command:
python3 run_rl.py --scenario "hangzhou_bc_tyc_1h_7_8_1848"
Figure 1 Description of the intersection
- Road: For an intersection, there are two kinds of roads: start road and end road. Start road is where a car enters an intersection; end road is where a car leaves an intersection. Figure 1 shows a typical intersection with four start roads and four end roads.
- Lane: Each road can be divided into one or multiple lanes, indexed by 0, 1, ... from the inner lane to the outter lane. Lane_id = road_id_{i}, where i is the lane index.
- Roadlink (road id - road id): A roadlink is defined as the traffic moving from one start road towards another end road. Roadlinks are contained in the list of intersection["roadLinks"]. Each roadlink is stored in a dictionary like:
road_link = {"type":..., "startRoad":..., "endRoad":...,
"direction":..., "laneLinks":...}
- Lanelink: A roadlink consists of several lanelinks. The dictionary of lanelink is like:
lane_link = {"startLaneIndex":..., "endLaneIndex":..., "points":...}
The part of points are the geometric points to define the lane physical positions. "startLaneIndex" means the lane index of the start road while "endLaneIndex" is the lane index of the end road.
- Signal phase: A specific combination of roadlinks can form a signal phase.
In the cityflow_env.py, a class of the cityflow environment is provided. There is a function to get the state features of the environment.
def get_state(self):
state = {
# {lane_id: lane_count, ...}
state['lane_vehicle_count'] = self.eng.get_lane_vehicle_count()
# {lane_id: lane_waiting_count, ...}
state['lane_waiting_vehicle_count'] = self.eng.get_lane_waiting_vehicle_count()
# {lane_id: [vehicle1_id, vehicle2_id, ...], ...}
state['lane_vehicles'] = self.eng.get_lane_vehicles()
# {vehicle_id: vehicle_speed, ...}
state['vehicle_speed'] = self.eng.get_vehicle_speed()
# {vehicle_id: distance, ...}
state['vehicle_distance'] = self.eng.get_vehicle_distance()
state['current_time'] = self.eng.get_current_time()
state['current_phase'] = self.current_phase
state['current_phase_time'] = self.current_phase_time
return state
As mentioned above, for a control algorithm, the mapping from lanes to a phase is crucial. We obtain the mapping by the function parse_roadnet(roadnetfile) in the utility.py.
The first step is to read the roadnet json file into a dictionary with the intersection_id as the key and the details of this intersection as the value.
roadnet = json.load(open(roadnetFile))
Then we process each intersection respectively.
lane_phase_info_dict = {}
for intersection in roadnet["intersections"]:
if intersection['virtual']:
continue
lane_phase_info_dict[intersection['id']] = {
"start_lane": [],
"end_lane": [],
"phase": [],
"phase_startLane_mapping": {},
"phase_roadLink_mapping": {}
}
For an intersection, we record the mapping in a dictionary. Note that only non-virtual intersections are controlled by the traffic signal.
You can traverse the list of intersection["roadLinks"] and then the list of road_link["laneLinks"] to record the roadLink_lane_pair, showing which lane pairs form a roadlink.
road_links = intersection["roadLinks"]
start_lane = []
end_lane = []
# roadLink includes some lane_pair: (start_lane, end_lane)
roadLink_lane_pair = {ri: [] for ri in
range(len(road_links))}
for ri in range(len(road_links)):
road_link = road_links[ri]
for lane_link in road_link["laneLinks"]:
sl = road_link['startRoad'] + "_" + str(lane_link["startLaneIndex"])
el = road_link['endRoad'] + "_" + str(lane_link["endLaneIndex"])
start_lane.append(sl)
end_lane.append(el)
roadLink_lane_pair[ri].append((sl, el))
lane_phase_info_dict[intersection['id']]["start_lane"] = sorted(list(set(start_lane)))
lane_phase_info_dict[intersection['id']]["end_lane"] = sorted(list(set(end_lane)))
Finally, we can figure out the mapping from lanes to a phase.
for phase_i in range(1, len(intersection["trafficLight"]["lightphases"])):
p = intersection["trafficLight"]["lightphases"][phase_i]
lane_pair = []
start_lane = []
for ri in p["availableRoadLinks"]:
lane_pair.extend(roadLink_lane_pair[ri])
if roadLink_lane_pair[ri][0][0] not in start_lane:
start_lane.append(roadLink_lane_pair[ri][0][0])
lane_phase_info_dict[intersection['id']]["phase"].append(phase_i)
lane_phase_info_dict[intersection['id']]["phase_startLane_mapping"][phase_i] = start_lane
lane_phase_info_dict[intersection['id']]["phase_roadLink_mapping"][phase_i] = lane_pair
The information of phases is stored in the list of intersection["trafficLight"]["lightphases"] which is listed below. The index of the list represents a phase. Note that the phase #0 is ignored since it serves as the yellow signal and it is not controlled by the algorithm.
"lightphases": [
{"time": 5, "availableRoadLinks": []},
{"time": 30, "availableRoadLinks": [0, 4]},
{"time": 30, "availableRoadLinks": [2, 7]},
{"time": 30, "availableRoadLinks": [1, 5]},
{"time": 30, "availableRoadLinks": [3, 6]},
{"time": 30, "availableRoadLinks": [0, 1]},
{"time": 30, "availableRoadLinks": [4, 5]},
{"time": 30, "availableRoadLinks": [2, 3]},
{"time": 30, "availableRoadLinks": [6, 7]},
]
Each phase contains a certain number of roadlinks so given available roadlinks, lanes of the phase can be known through roadLink_lane_pair.
As for the SOTL(run_sotl.py), it is a simple adaptive method for traffic signal control.
def choose_action(self, state):
cur_phase = state["current_phase"]
print("Time: {}, Phase: {}".format(state['current_time'], cur_phase))
if state["current_phase_time"] >= self.phi:
num_green_vehicle = sum([state["lane_waiting_vehicle_count"][i] for i in self.phase_startLane_mapping[cur_phase]])
num_red_vehicle = sum([state["lane_waiting_vehicle_count"][i] for i in self.lane_phase_info[self.intersection_id]["start_lane"]]) - num_green_vehicle
if num_green_vehicle <= self.min_green_vehicle and num_red_vehicle > self.max_red_vehicle:
self.action = cur_phase % len(self.phase_list) + 1
return self.action
There are some pre-defined parameters, minimum green time phi, minimum waiting vehicles on the lanes with a green signal, and the threshold of waiting vehicles on the lanes with a red signal. When the following three conditions are satisfied, the phase will be set to the next one. (1) the time of current phase exceeds the minimum green time \phi; (2) the number of "green vehicles" is less than the min waiting vehicles on green lanes; (3) the number of "red vehicles" surpasses the threshod of waiting vehicles on red lanes;
Apart from SOTL, a simple deep q-learning algorithm (run_rl.py) is also provided to calculate traffic signal plan. In this sample code, there are following definitions.
- state: number of vehicles on the starting lane, current phase
- reward: the total number of waiting vehicles on the starting lane
- action: set the signal to some phase every 10 seconds
- model: DQN with 2 hidden layers
The following code segement illustrates these four definitions in the sample code.
# state definition in run_rl.py
state = env.get_state()
state = np.array(list(state['start_lane_vehicle_count'].values()) + [state['current_phase']]) # a sample state definition
state = np.reshape(state, [1, state_size])
# action definition in run_rl.py
action = phase_list[agent.choose_action(state)]
# reward defined in CityFlow_env.py
reward = env.get_reward()
'''
def get_reward():
lane_waiting_vehicle_count = self.eng.get_lane_waiting_vehicle_count()
reward = -1 * sum(list(lane_waiting_vehicle_count.values()))
return reward
'''
# model network defined in dqn_agent.py
model = Sequential()
model.add(Dense(40, input_dim=self.state_size, activation='relu'))
model.add(Dense(40, activation='relu'))
model.add(Dense(self.action_size, activation='linear'))
Please note that this reinforcement learning algorithm is just for your reference and you may need to redesign more suitable rl algorithm, including network structure, state definition, reward function and so on.
In particular, we recommend that you implement your control method in an agent class as we write in sotl_agent.py and dqn_agent.py. A function of choose_action returns an action indicating the phase to be set in the simulator.
In cityflow_env.py, the step function in the cityflow_env class controls the simulator to carry out an action. And then the phase is set through the function self.eng.set_tl_phase(self.intersection_id, self.current_phase). Two paramters of set_tl_phase are the intersection_id and phase_id.
def step(self, next_phase):
if self.current_phase == next_phase:
self.current_phase_time += 1
else:
self.current_phase = next_phase
self.current_phase_time = 1
self.eng.set_tl_phase(self.intersection_id, self.current_phase)
self.eng.next_step()
self.phase_log.append(self.current_phase)
Everytime when you finish controlling, the traffic signal plan can be recorded by runing 'env.log()', like in the 'run_sotl.py' and 'run_rl.py'.
# defined in CityFlow_env.py
env.log()
'''
def log(self):
df = pd.DataFrame({self.intersection_id: self.phase_log[:self.num_step]})
if not os.path.exists(self.config['data']):
os.makedirs(self.config['data'])
df.to_csv(os.path.join(self.config['data'], 'signal_plan.txt'), index=None
'''