forked from spartalab/uav-path-plan
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnodeModel.py
113 lines (96 loc) · 3.87 KB
/
nodeModel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# -*- coding: utf-8 -*-
"""
node models
@author: cesny
"""
from node import Node
class Zone(Node):
def __init__(self, nodeID, nodeModel, fstar, rstar):
Node.__init__(self, nodeID, nodeModel, fstar, rstar)
self.__initialize()
def __initialize(self):
"""
sets subtype of node and creates
demandRates if origin
"""
if len(self.fstar) == 0:
# Node is destination
self.subType = 'Destination'
if len(self.rstar) == 0:
self.subType = 'Origin'
self.demandRates = dict() # create a demand rates dict if it's an origin!
if (len(self.rstar) != 0) and (len(self.fstar) != 0):
raise Exception('... wrong zone initialization ...')
if (len(self.rstar) >= 1) and (len(self.fstar) >= 1):
raise Exception('... code note ready for multiple ins or outs for des or orig ...')
return None
def calculateTransitionFlows(self, sendingFlow, receivingFlow, proportions=None):
"""
raises exception
"""
raise Exception('... load vehicles for Zones error in calculate transition flows ...')
return None
def nodeUpdate(self, time, timeStep):
"""
override the nodeUpdate method to update inFlows and outFlows
based on loading vehicles or removing them
"""
if self.subType == 'Origin':
for outLink in self.downstreamLinks:
outLink.inFlow = self.demandRates[time] * (1.0/3600) * timeStep # demand is in veh/hr, have to convert to timestep
if self.subType == 'Destination':
for inLink in self.upstreamLinks: # if it's the destination, all the link from upstream gets out
inLink.outFlow = inLink.calculateSendingFlow(time, timeStep)
return None
class SeriesNode(Node):
def __init__(self, nodeID, nodeModel, fstar, rstar):
Node.__init__(self, nodeID, nodeModel, fstar, rstar)
def calculateTransitionFlows(self, sendingFlow, receivingFlow, proportions=None):
"""
sendingFlow and receivingFlow are dictionaries
sendingFlow = {uplink1: val, uplink2: val..}
"""
transitionFlows = dict()
for inLinkID in self.rstar:
transitionFlows[inLinkID] = dict()
upLink = self.rstar[0]
downLink = self.fstar[0]
transitionFlows[upLink][downLink] = min(sendingFlow[upLink], receivingFlow[downLink])
return transitionFlows
class DivergeNode(Node):
def __init__(self, nodeID, nodeModel, fstar, rstar):
self.proportions = dict() # a dictionary with props of flow to links, considering that proportions are fixed
Node.__init__(self, nodeID, nodeModel, fstar, rstar)
def _processStars(self, fstar, rstar):
"""
overrides node processStars to create proportions if they are static
"""
for star in rstar:
intStar = int(star)
self.rstar.append(intStar)
self.proportions[self.rstar[0]] = dict()
for star in fstar:
link, prop = star.split(':')
intLink = int(link)
floatProp = float(prop)
self.fstar.append(intLink)
self.proportions[self.rstar[0]][intLink] = floatProp
return None
def calculateTransitionFlows(self, sendingFlow, receivingFlow, proportions=None):
"""
sendingFlow and receivingFlow are dictionaries
sendingFlow = {uplink1: val, uplink2: val..}
"""
transitionFlows = dict()
for inLinkID in self.rstar:
transitionFlows[inLinkID] = dict()
inLink = self.rstar[0]
thetas = list()
for outLink in self.fstar:
if (sendingFlow[inLink] != 0) and (self.proportions[inLink][outLink] != 0):
thetas.append(receivingFlow[outLink] / ((self.proportions[inLink][outLink])*(sendingFlow[inLink])))
thetas.append(1.0)
theta = min(thetas)
for outLink in self.fstar:
transitionFlows[inLink][outLink] = theta * self.proportions[inLink][outLink] * sendingFlow[inLink]
return transitionFlows