-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathLuby.java
241 lines (197 loc) · 9.3 KB
/
Luby.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import java.io.*;
import java.util.*;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* Simulates a synchronous distributed system and implements LubyMIS algorithm using multi threading
* @author Anshul Pardhi, Jayita Roy, Ruchi Singh
*/
public class Luby {
// Shared memory
static Map<String, Integer> randomIdsMap = new HashMap<>();
static Map<String, Boolean> isCandidateMap = new HashMap<>();
static Map<String, Boolean> isIndependentMap = new HashMap<>();
static CyclicBarrier gate = null;
static int rounds = 0;
static int n = 0;
/**
* Simulates the work done by a process node. Here, every process has its own thread
*/
private static class ProcessNode implements Runnable {
private final List<Integer> neighbors;
int roundCount = 0;
ProcessNode(int pid, List<Integer> neighbors) {
int randomId = 1 + new Random().nextInt((int) Math.pow(n, 4)); // Assign a random id
randomIdsMap.put(Integer.toString(pid), randomId);
this.neighbors = neighbors; // Contains ids of neighboring process nodes
// Initially every process nodes are candidate nodes and none are present in the independent set
isCandidateMap.put(Integer.toString(pid), true);
isIndependentMap.put(Integer.toString(pid), false);
}
/**
* Luby MIS algorithm implementation
*/
public void run() {
try {
gate.await();
String currentThread = Thread.currentThread().getName();
while(isCandidateMap.get(currentThread)) {
int randomId = 1 + new Random().nextInt((int) Math.pow(n, 4)); // Assign a random id in range 1...n^4
randomIdsMap.put(currentThread, randomId);
System.out.println("Current process id: " + Thread.currentThread().getName() + "; Randomly assigned id: " + randomId);
// This is done to maintain a synchronous network
Thread.sleep(2000); // Wait for other processes to assign a random id
int currId = randomIdsMap.get(currentThread);
boolean isMax = true;
boolean isRoundWasted = false;
for(int neighbor : neighbors) {
String neighborThreadName = Integer.toString(neighbor);
if (isCandidateMap.get(neighborThreadName)) {
if(randomIdsMap.get(neighborThreadName) > currId) {
isMax = false; // Some other node has id greater than the current node
break;
} else if(randomIdsMap.get(neighborThreadName) == currId) {
isRoundWasted = true; // If two nodes have the same id, this round is wasted
break;
}
}
}
if(isRoundWasted) { // Invoked when two nodes have the same temp id
roundCount++;
continue;
}
// This is done to maintain a synchronous network
Thread.sleep(2000); // Wait for other processes to find leaders in their neighborhoods
if(isMax) { // Current node has max id among all its neighbors
isIndependentMap.put(currentThread, true); // Put the current node in MIS
isCandidateMap.put(currentThread, false);
for(int neighbor : neighbors) { // Neighbors won't be in MIS
isCandidateMap.put(Integer.toString(neighbor), false);
}
}
roundCount++;
// This is done to maintain a synchronous network
Thread.sleep(2000); // Wait for other processes to update their candidacy for next round
}
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
rounds = Math.max(rounds, roundCount);
}
}
/**
* Prints the MIS process ids
* @return Set containing MIS process ids
*/
private static Set<Integer> printMIS() {
Set<Integer> misSet = new HashSet<>();
for(Map.Entry<String, Boolean> node : isIndependentMap.entrySet()) {
if(node.getValue()) {
misSet.add(Integer.parseInt(node.getKey()));
}
}
return misSet;
}
/**
* Verifies if the MIS constructed is indeed correct
* @param ids Ids corresponding to each process
* @param misSet Set containing MIS process ids
* @param processNeighbors Contains a list of neighboring process ids for each process
*/
private static boolean verifyMIS(int[] ids, Set<Integer> misSet, List<List<Integer>> processNeighbors) {
Set<Integer> checkMISSet = new HashSet<>();
for(int i=0; i<n; i++) {
for(int j=0; j<processNeighbors.get(i).size(); j++) {
// Check if neighbors added in MIS
if(misSet.contains(ids[i]) && misSet.contains(processNeighbors.get(i).get(j))) {
return false;
}
// checkMISSet is used to make sure that all nodes have been considered for MIS
if(misSet.contains(ids[i])) {
checkMISSet.add(ids[i]);
checkMISSet.add(processNeighbors.get(i).get(j));
}
}
}
return checkMISSet.size() == n;
}
public static void main(String[] args) {
// Pass on input & output file through command line argument
if(args.length != 2) {
System.err.println("Error: Input and/or output file not provided in the argument!");
System.exit(1);
}
Scanner sc = null;
try {
sc = new Scanner(new File(args[0]));
} catch (FileNotFoundException e) {
System.err.println("Error: Input file not found!");
System.exit(1);
}
n = sc.nextInt(); // Total processes
// Every process is identified through the provided id
// This id is not used for comparison though, it's just used for identification
int[] ids = new int[n];
for(int i=0; i<n; i++) {
ids[i] = sc.nextInt();
}
Luby.ProcessNode[] processNodes = new Luby.ProcessNode[n];
List<List<Integer>> processNeighbors = new ArrayList<>(); // Used for checking if MIS is indeed correct
for(int i=0; i<n; i++) {
List<Integer> currentProcessNeighbors = new ArrayList<>();
for(int j=0; j<n; j++) {
if(sc.nextInt() == 1) {
currentProcessNeighbors.add(ids[j]); // Add neighbors of current process
}
}
// Create a new instance of a process node
processNodes[i] = new Luby.ProcessNode(ids[i], currentProcessNeighbors);
processNeighbors.add(currentProcessNeighbors);
}
// Start multi-threading!
try {
Thread[] threads = new Thread[processNodes.length];
gate = new CyclicBarrier(processNodes.length + 1); // Acts as the main thread
// Assign a thread to each process node
for (int i = 0; i < processNodes.length; i++) {
threads[i] = new Thread(processNodes[i]);
threads[i].setName(Integer.toString(ids[i])); // Process id is the name of the thread
threads[i].start();
}
gate.await();
// Main thread checks if the other threads have been terminated
for (int i = 0; i < processNodes.length; i++) {
while (threads[i].isAlive()) {
System.out.println("Computing MIS...");
threads[i].join(1000);
}
}
System.out.println();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Finished computing MIS...Writing results to output file...");
// Write final output to a file
try {
BufferedWriter output = new BufferedWriter(new FileWriter(args[1]));
output.append("Number of rounds (phases): ").append(String.valueOf(rounds));
output.newLine();
output.append("MIS has the processes with IDs: ");
Set<Integer> misSet = printMIS(); // Prints processes in the MIS
for (Integer misId : misSet) {
output.append(String.valueOf(misId)).append(" ");
}
output.newLine();
output.append("Verifying the MIS constructed is correct...");
output.newLine();
if(verifyMIS(ids, misSet, processNeighbors)) { // Verify correctness of the MIS
output.append("Verified that the MIS constructed is indeed correct!");
} else {
output.append("THE MIS CONSTRUCTED IS NOT CORRECT....EXITING PROGRAM...");
}
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}