-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcartesian_200k_cached_100part.py
44 lines (36 loc) · 1.42 KB
/
cartesian_200k_cached_100part.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import time
import random
import pyspark.sql.functions as func
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import HiveContext, SQLContext
import math
from pyspark.mllib.random import RandomRDDs
from pyspark.sql.types import *
#from pyspark.sql.functions import *
from pyspark.sql.types import Row
spark = SparkSession.builder.config("spark.sql.crossJoin.enabled","true").getOrCreate()
n=200000
# create rdd of random floats
nRow = n
nCol = 4
seed = 5
numPartitions=100
rdd1 = RandomRDDs.normalVectorRDD(spark, nRow, nCol,numPartitions,seed)
seed = 3
rdd2 = RandomRDDs.normalVectorRDD(spark, nRow, nCol,numPartitions,seed)
sc = spark.sparkContext
# convert each tuple in the rdd to a row
randomNumberRdd1 = rdd1.map(lambda x: Row(A=float(x[0]), B=float(x[1]), C=float(x[2]), D=float(x[3])))
randomNumberRdd2 = rdd2.map(lambda x: Row(E=float(x[0]), F=float(x[1]), G=float(x[2]), H=float(x[3])))
# create dataframe from rdd
schemaRandomNumberDF1 = spark.createDataFrame(randomNumberRdd1)
schemaRandomNumberDF2 = spark.createDataFrame(randomNumberRdd2)
# cache the dataframe
schemaRandomNumberDF1.cache()
cart_df = schemaRandomNumberDF1.crossJoin(schemaRandomNumberDF2)
# aggregate
results = schemaRandomNumberDF1.groupBy("A").agg(func.max("B"),func.sum("C"))
results.show(n=100)
print "----------Count in cross-join--------------- {0}".format(cart_df.count())