-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrain_mmlspark.py
75 lines (57 loc) · 2.7 KB
/
train_mmlspark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# This PySpark code uses mmlspark library.
# It is much simpler comparing to the regular Spark ML version.
import numpy as np
import pandas as pd
import pyspark
import os
import requests
from pyspark.ml.classification import LogisticRegression
import mmlspark
from mmlspark.TrainClassifier import TrainClassifier
from mmlspark.ComputeModelStatistics import ComputeModelStatistics
from azureml.logging import get_azureml_logger
# create the outputs folder
os.makedirs('./outputs', exist_ok=True)
# Initialize the logger
run_logger = get_azureml_logger()
# Start Spark application
spark = pyspark.sql.SparkSession.builder.appName("Adult Census Income").getOrCreate()
# Download AdultCensusIncome.csv from Azure CDN. This file has 32,561 rows.
dataFile = "AdultCensusIncome.csv"
if not os.path.isfile(dataFile):
r = requests.get("https://amldockerdatasets.azureedge.net/" + dataFile)
with open(dataFile, 'wb') as f:
f.write(r.content)
# Create a Spark dataframe out of the csv file.
data = spark.createDataFrame(pd.read_csv(dataFile, dtype={" hours-per-week": np.float64}))
# Choose a few relevant columns and the label column.
data = data.select([" education", " marital-status", " hours-per-week", " income"])
# Split data into train and test.
train, test = data.randomSplit([0.75, 0.25], seed=123)
print("********* TRAINING DATA ***********")
print(train.limit(10).toPandas())
reg = 0.1
# Load Regularization Rate from argument
if len(sys.argv) > 1:
reg = float(sys.argv[1])
print("Regularization Rate is {}.".format(reg))
# Use TrainClassifier in mmlspark to train a logistic regression model. Notice that we don't have to do any one-hot encoding, or vectorization.
# We also don't need to convert the label column from string to binary. mmlspark does those all these tasks for us.
model = TrainClassifier(model=LogisticRegression(regParam=reg), labelCol=" income", numFeatures=256).fit(train)
run_logger.log("Regularization Rate", reg)
# predict on the test dataset
prediction = model.transform(test)
# compute model metrics
metrics = ComputeModelStatistics().transform(prediction)
print("******** MODEL METRICS ************")
print("Accuracy is {}.".format(metrics.collect()[0]['accuracy']))
print("Precision is {}.".format(metrics.collect()[0]['precision']))
print("Recall is {}.".format(metrics.collect()[0]['recall']))
print("AUC is {}.".format(metrics.collect()[0]['AUC']))
# Log the metrics
run_logger.log("Accuracy", metrics.collect()[0]['accuracy'])
run_logger.log("AUC", metrics.collect()[0]['AUC'])
print("******** SAVE THE MODEL ***********")
model.write().overwrite().save("./outputs/AdultCensus.mml")
# save model in wasb if running in HDI.
#model.write().overwrite().save("wasb:///models/AdultCensus.mml")