-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontroller.py
334 lines (284 loc) · 13.3 KB
/
controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
import os
from graphviz import render, Digraph, view
from cfsm import CFSM
from global_graph_parser.main import main as global_graph_parser
from dot_parser.main import main
from dot_parser.domitilla_converter import domitilla_converter
from dot_parser.MyErrorListener import parseError
from dot_parser.MyVisitor import ForkStatementDetected
import itertools
from well_formedness import well_sequencedness_conditions, well_branchedness_first_condition, well_branchedness_second_condition, well_branchedness_third_condition
class Controller:
ca_dict = {}
def get_participants(self, graph_name):
""" Return participants of a given graph """
return self.ca_dict.get(graph_name).participants
def get_all_ca(self):
""" Return all the opened graphs """
return self.ca_dict
def get_start_node(self, graph_name):
""" Return the start node of a given graph """
return self.ca_dict.get(graph_name).s0
def get_states(self, graph_name):
""" Return the states of a given graph """
return self.ca_dict.get(graph_name).states
def get_edges(self, graph_name):
""" Return the edges of a given graph """
return self.ca_dict.get(graph_name).edges
def get_labels(self, graph_name):
""" Return the labels of a given graph """
return self.ca_dict.get(graph_name).labels
def check_for_epsilon_moves(self, graph_name):
for edge in self.ca_dict.get(graph_name).edges:
if edge[1] == "":
return "Yes"
return "No"
def GGparser(self, path_file, path_store):
""" Parser a Chorgram file and create a converted
version (DOT) in a given path.
"""
# read the file and convert in DOT
message, path = global_graph_parser(path_file, path_store)
# parse the new DOT file and store the graph
result = self.DOTparser(path)
# return a log message and the name of the graph
return message, result[2]
def DOTparser(self, path_file):
""" Parser a DOT files, check if it was a Domitilla graph
and fill a Choreography Automata (CA) struct. """
try:
ca, domi_bool, graph_name = main(path_file)
except (parseError, ForkStatementDetected) as e:
return False, e.message # return eventually an error message
else:
# store the CA in a dictionary { 'graph_name' : CA }
self.ca_dict.update({graph_name: ca})
# return a boolean for Domitilla graphs, a log message
# and the name of the graph
return domi_bool, ["Read Successfully " + path_file], graph_name
def DomitillaConverter(self, graph_name, path_file, path_store):
""" Convert a Domitilla Graph, previously opened and stored.
Check /dot_parser/domitilla_visitor.py """
ca, message = domitilla_converter(self.ca_dict.get(graph_name), path_file, path_store)
self.ca_dict.update({graph_name: ca})
return message
def remove_record(self, graph_name):
""" Remove a graph from the opened graphs dictionary (self.ca_dict)"""
self.ca_dict.pop(graph_name)
def render(self, path_file, extension,V_rend_on_default_image_viewer):
try:
main(path_file) # check for errors
except (parseError, ForkStatementDetected) as e:
print("non riesco a creare il render")
return e.message[0] + " " + e.message[1] # return eventually an error message
else:
#V voglio mettere che lo apre se glielo dico io,altirmenti mi rendera quello
#che volgio
save_path = render('dot', extension, path_file)
if V_rend_on_default_image_viewer == True:
view(save_path) # open the default image viewer
return save_path
def make_product(self, graph_name1, graph_name2, path_to_store):
""" Make the product of two choreography automata (ca1, ca2),
building a c-automaton corresponding to the concurrent
execution of the two original c-automata. """
# get the two choreography automaton from graphs name
ca1 = self.ca_dict.get(graph_name1)
ca2 = self.ca_dict.get(graph_name2)
# check if the participants are disjoint,
if ca1.participants.intersection(ca2.participants):
return ["[ERROR] participants are not disjoint"]
# if the user didn't add a .gv extension to the name of
# product graph, we'll add it
if not path_to_store.endswith('.gv'):
path_to_store += '.gv'
# get the name from the path
path_splitted = os.path.split(path_to_store)
graph_name = path_splitted[1].split('.')
# initializes graph
g = Digraph(graph_name[0])
g.node('s0', label="", shape='none', height='0', width='0')
start_node = ca1.s0 + ',' + ca2.s0
g.edge('s0', start_node)
# Build the product graph
# NOTE: for each edge
# edge[0] == sender node
# edge[1] == label
# edge[2] == receiver node
for i in ca1.states:
for j in ca2.states:
# current node we re considering
node = i + ',' + j
# set j, and look for some edges from i to other nodes
for edge in ca1.edges:
if edge[0] == i:
g.edge(node, edge[2] + ',' + j, label=edge[1])
# set i, and look for some edges from j to other nodes
for edge in ca2.edges:
if edge[0] == j:
g.edge(node, i + ',' + edge[2], label=edge[1])
# draw and save the graph
g.save(path_to_store)
# parser the product graph and
# store the relative CA
result = self.DOTparser(path_to_store)
# return a message, and
# the graph name just created
return ["[CREATED] " + path_to_store]
def synchronize(self, graph_name_to_sync, interface1, interface2, path_to_store):
"""
Remove a pair of (compatible) roles by transforming them into forwarders.
e.g.: a synchronization over H and K removes participants H and K and
sends each message originally sent to H to whoever K used to send the
same message, and vice versa.
"""
# get the choreography automaton from graph name
ca = self.ca_dict.get(graph_name_to_sync)
# check if interfaces aren't the same
if interface1 == interface2:
return ["[ERROR] you selected the same participant for both interfaces"]
# if the user didn't add a .gv extension to the name of
# sync graph, we'll add it
if not path_to_store.endswith('.gv'):
path_to_store += '.gv'
# get the name from the path
path_splitted = os.path.split(path_to_store)
graph_name = path_splitted[1].split('.')
# initializes graph
g = Digraph(graph_name[0])
g.node('s0', label="", shape='none', height='0', width='0')
g.edge('s0', ca.s0)
#
# NOTE: each edge in ca.edges contains a 6-uple like this:
# (source_node, label, dest_node, sender, receiver, message)
#
# ------------ STEP (1) -----------------
# each transition (p, A -> H: m, q) is removed, and for each
# transition (q, K -> B: m, r) a transition (p, A -> B: m, r)
# is added, and similarly by swapping H and K
new_edges = set()
edges_to_remove = set()
for i in ca.edges:
if i[4] == interface1:
edges_to_remove.add(i)
for j in ca.edges:
if j[0] == i[2] and j[3] == interface2 and j[5] == i[5] and i[3] != j[4]:
edges_to_remove.add(i)
label = i[3] + '->' + j[4] + ':' + i[5]
new_edges.add((i[0], label, j[2], i[3], j[4], i[5]))
if i[4] == interface2:
for j in ca.edges:
if j[0] == i[2] and j[3] == interface1 and j[5] == i[5] and i[3] != j[4]:
edges_to_remove.add(i)
label = i[3] + '->' + j[4] + ':' + i[5]
new_edges.add((i[0], label, j[2], i[3], j[4], i[5]))
ca.edges = ca.edges.difference(edges_to_remove)
ca.edges = ca.edges.union(new_edges)
# ------------ STEP (2) -----------------
# Transitions involving neither H nor K are preserved,
# whereas all other transitions are removed
edges_to_remove.clear()
for i in ca.edges:
if i[3] == interface1 or i[3] == interface2 or i[4] == interface1 or i[4] == interface2:
edges_to_remove.add(i)
ca.edges = ca.edges.difference(edges_to_remove)
# ------------ STEP (3) -----------------
# States and transitions unreachable from the initial
# state are removed.
ca.delete_unreachable_nodes()
for edge in ca.edges:
g.edge(edge[0], edge[2], label=str(edge[1]))
# draw and save the graph
g.save(path_to_store)
# parser the product graph and
# store the relative CA
result = self.DOTparser(path_to_store)
return ["[CREATED] " + path_to_store]
def projection(self, graph_name, participant, path_to_store):
"""
The projection of a c-automaton on a participant A
is a CFSM, obtained by minimising the c-automaton
after updating the labels.
"""
# get the choreography automaton from graph name
ca = self.ca_dict.get(graph_name)
# if the user didn't add a .gv extension to the name of
# sync graph, we'll add it
if not path_to_store.endswith('.gv'):
path_to_store += '.gv'
# get the name from the path
path_splitted = os.path.split(path_to_store)
graph_name = path_splitted[1].split('.')
# initializes graph
g = Digraph(graph_name[0])
# NOTE: each edge in ca.edges contains a 6-uple like this:
# (source_node, label, dest_node, sender, receiver, message)
new_edges = set()
new_labels = set()
for edge in ca.edges:
# if the participant is the sender
if edge[3] == participant:
label = participant + ' ' + edge[4] + '!' + edge[5]
new_edges.add((edge[0], label, edge[2]))
new_labels.add(label)
# if the participant is the receiver
elif edge[4] == participant:
label = edge[3] + ' ' + participant + '?' + edge[5]
new_edges.add((edge[0], label, edge[2]))
new_labels.add(label)
# in all the other cases, empty_label edges
else:
new_edges.add((edge[0], "", edge[2]))
c = CFSM(ca.states, new_labels, new_edges, ca.s0, ca.participants)
c.delete_epsilon_moves()
c.minimization()
for edge in c.edges:
g.edge(str(edge[0]), str(edge[2]), label=edge[1])
# draw and save the graph
g.save(path_to_store)
return ["[CREATED] " + path_to_store]
# Check each condition of Well-Branchedness one by one. It stops as soon as a
# condition is not met and returns the associated error
def make_well_branchedness(self, graph_name):
ca = self.ca_dict.get(graph_name)
# First error check
res1 = well_branchedness_first_condition(ca.edges,ca.states)
if res1 != None:
result = ['Verified: NO Well-branched in first condition: ' + res1]
return [result]
# Second error check
res2 = well_branchedness_second_condition(ca.edges,ca.states,ca.participants)
if res2 != None:
result = ['Verified: NO Well-branched in second condition ' + res2]
return [result]
# Third error check
res3 = well_branchedness_third_condition(ca.states,ca.edges,ca.participants)
if res3 != None:
result = ['Verified: NO Well-branched in third condition ' + res3]
return [result]
else:
result = ['Verified: Well-branched']
return [result]
# Call the Well-Sequencedness condition check. If no error is returned
# then it returns that it has passed the check, otherwise it returns
# the error
def make_well_sequencedness(self, graph_name):
ca = self.ca_dict.get(graph_name)
ret = well_sequencedness_conditions(ca)
if ret == None:
result = ['Verified: Well-sequenced']
return [result]
else:
result = ['Verified: NO Well-sequenced, not verified in ' + ret]
return [result]
# First it does the well-Sequencedness check and if it
# passes it then does the well-Branchedness check
def make_well_formedness(self, graph_name):
resultWS = self.make_well_sequencedness(graph_name)
if resultWS[0][0] == 'Verified: Well-sequenced':
resultWB = self.make_well_branchedness(graph_name)
if resultWB[0][0] == 'Verified: Well-branched':
result = ['Verified: Well-formed']
return [result]
return [resultWB[0]]
return [resultWS[0]]