-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy path5_combine_results.py
114 lines (97 loc) · 3.28 KB
/
5_combine_results.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Ed Mountjoy
#
# Combines the outputs from the coloc pipeline into a single file
#
'''
# Set SPARK_HOME and PYTHONPATH to use 2.4.0
export PYSPARK_SUBMIT_ARGS="--driver-memory 8g pyspark-shell"
export SPARK_HOME=/Users/em21/software/spark-2.4.0-bin-hadoop2.7
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-2.4.0-src.zip:$PYTHONPATH
'''
import pyspark.sql
from pyspark.sql.functions import *
from pyspark.sql.types import *
def main():
# Make spark session
# Using `ignoreCorruptFiles` will skip empty files
spark = (
pyspark.sql.SparkSession.builder
.config("spark.sql.files.ignoreCorruptFiles", "true")
.config("spark.master", "local[*]")
.getOrCreate()
)
print('Spark version: ', spark.version)
# # Set logging level
# sc = spark.sparkContext
# sc.setLogLevel('INFO')
# Args
in_res_dir = '/output/coloc/'
out_coloc = '/output/coloc_raw.parquet'
res_schema = (
StructType()
.add('nsnps', IntegerType(), False)
.add('PP.H0.abf', DoubleType(), False)
.add('PP.H1.abf', DoubleType(), False)
.add('PP.H2.abf', DoubleType(), False)
.add('PP.H3.abf', DoubleType(), False)
.add('PP.H4.abf', DoubleType(), False)
.add('left_type', StringType(), False)
.add('left_study', StringType(), False)
.add('left_bio_feature', StringType(), True)
.add('left_phenotype', StringType(), True)
.add('left_chrom', StringType(), False)
.add('left_pos', IntegerType(), False)
.add('left_ref', StringType(), False)
.add('left_alt', StringType(), False)
.add('right_type', StringType(), False)
.add('right_study', StringType(), False)
.add('right_bio_feature', StringType(), True)
.add('right_phenotype', StringType(), True)
.add('right_chrom', StringType(), False)
.add('right_pos', IntegerType(), False)
.add('right_ref', StringType(), False)
.add('right_alt', StringType(), False)
)
# Load
# df = spark.read.option('basePath', in_res_dir).json(in_res_dir, schema=res_schema)
df = spark.read.option('basePath', in_res_dir).option("header", True).csv(in_res_dir, schema=res_schema)
df = (
df.withColumnRenamed('PP.H0.abf', 'coloc_h0')
.withColumnRenamed('PP.H1.abf', 'coloc_h1')
.withColumnRenamed('PP.H2.abf', 'coloc_h2')
.withColumnRenamed('PP.H3.abf', 'coloc_h3')
.withColumnRenamed('PP.H4.abf', 'coloc_h4')
.withColumnRenamed('nsnps', 'coloc_n_vars')
)
# Repartition
# df = (
# df.repartitionByRange('left_chrom', 'left_pos')
# .sortWithinPartitions('left_chrom', 'left_pos')
# )
# Coalesce
df = df.coalesce(200)
# Write
(
df
.write
.parquet(
out_coloc,
compression='snappy',
mode='overwrite'
)
)
# Somewhat slow - could fail if not enough memory on machine
# This could be done in a more efficient way, using spark to coalesce
(
df.toPandas().to_csv(
'/output/coloc_raw.csv.gz',
index=False,
compression='gzip'
)
)
return 0
if __name__ == '__main__':
main()