forked from JedMills/Communication-Efficient-FL-In-IoT
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_utils.py
174 lines (133 loc) · 5.47 KB
/
data_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
Utils for loading/handling datasets for use with FL.
"""
import numpy as np
import tensorflow as tf
debugdata=0
def shuffle_client_data(data):
""" Returns a copy of a client's (x, y) shuffled using the same order. """
order = np.random.permutation(data[0].shape[0])
x = data[0][order]
y = data[1][order]
return x, y
def to_oh(data, n_c):
"""
Converts an array of class numbers into one-hot encoding.
Parameters:
data (array): y values as integers
n_c (int): number of classes in data
Returns:
oh (array): y values as one-hot, shape is [len(y), n_c]
"""
o_h = np.zeros([data.shape[0], n_c], dtype=np.float32)
o_h[np.arange(data.shape[0]), data] = 1.0
return o_h
def get_client_data(data, idx):
"""
Returns (x, y) values using client's index from all clients' data.
Parameters:
data (list): all clients' data (x, y)
idxs (int): client index
Returns:
(client's x data, client's y (label) data)
"""
return data[0][idx], data[1][idx]
def split_iid(x, y, W):
"""
Shuffles x and y training data, and splits shuffled data into W equal parts.
Parameters:
x (array): x data of shape [n_samples, sample_shape]
y (array): label data of shape [n_samples]
W (int): number of portions (Workers) to split data into
Returns:
xs (list[array]): worker x data after shuffling, length W
ys (list[array]): worker y data after shuffling, length W
"""
ord = np.random.permutation(y.shape[0])
#Split x to W shards
#TODO 2: change np.array_split to random replace =True
xs = np.array_split(x[ord], W)
ys = np.array_split(y[ord], W)
return xs, ys
def split_niid(x, y, W, n_frag):
"""
Sorts x and y training data by label (y), splits the data into W*n_frag
shards, assigns n_frag shard to each worker, and returns data as lists of
length W. Although not the most best to split datasets, it is the same
as used by McMahan et al in the FedAvg paper.
Parameters:
x (array): x data of shape [n_samples, sample_shape]
y (array): label data of shape [n_samples]
W (int): number of portions (Workers) to split data into
n_frag (int): number of fragments/shards per worker
Returns:
xs (list[array]): worker x data after non-iid split, length W
ys (list[array]): worker y data after non-iid split, length W
"""
ord = np.argsort(y) # sort by label
x_frags = np.array_split(x[ord], W * n_frag)
y_frags = np.array_split(y[ord], W * n_frag)
# fragment assignments for each worker
frag_idxs = [[i+(W*j) for j in range(n_frag)] for i in range(W)]
# concatenate fragments for each worker
xs = []
ys = []
for w in range(W):
xs.append(np.squeeze(np.concatenate([x_frags[i] for i in frag_idxs[w]])))
ys.append(np.squeeze(np.concatenate([y_frags[i] for i in frag_idxs[w]])))
return xs, ys
def load_dataset(dataset, W, iid):
"""
Load given dataset, split into W partitions.
1/Flatten training/testing dataset
2/Normalize from 0-255 to 0-1, Convert to Float32
Parameters:
dataset (str): 'mnist' or 'cifar'
W (int): number of workers to split training data into
iid (bool): shuffle iid or partition non-iid
Returns:
train (tuple): (x, y) data, each a list of length W of workers' data
test (tuple): (x, y) data, each an array of all test samples/labels
"""
if dataset == 'cifar':
# train, test = load_cifar_dataset(folder, W, iid)
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
elif dataset == 'mnist':
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape((-1, 784))
x_test = x_test.reshape((-1, 784))
else:
raise RuntimeError('Unsupported dataset string...')
if debugdata:
print("#take 1/100 of the original data to expedite debugging")
x_trains, y_trains = split_iid(x_train, y_train, 100)
x_train, y_train = x_trains[0], y_trains[0]
x_tests, y_tests = split_iid(x_test, y_test, 100)
x_test, y_test = x_tests[0], y_tests[0]
print("Training total {} samples".format(x_train.shape[0]))
x_train = (x_train / 255).astype(np.float32)
x_test = (x_test / 255).astype(np.float32)
if iid:
x_trains, y_trains = split_iid(x_train, y_train, W)
else:
x_trains, y_trains = split_niid(x_train, y_train, W, 2)
y_trains = [to_oh(y, 10) for y in y_trains]
y_test = to_oh(y_test, 10)
#Simplify training for debugging
return (x_trains, y_trains), (x_test, y_test)
def exp_details(args):
print('\nExperimental details:')
print(f' Model : {args.model}')
print(f' Optimizer : {args.optimizer}')
print(f' Learning : {args.lr}')
print(f' Global Rounds : {args.R}\n')
print(' Federated parameters:')
if args.iid:
print(' IID')
else:
print(' Non-IID')
print(f' Fraction of users C : {args.C}')
print(f' Local Batch size B : {args.B}')
print(f' Local Epochs E : {args.E}\n')
print(f' Sparcity S : {args.S}\n')
return