-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbigquery_dataset.py
115 lines (104 loc) · 4.5 KB
/
bigquery_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
def bigquery_data():
if(check_dataset_exists(client_dataset())): # If returns True - Data does not exists
project_client().create_dataset(client_dataset(), timeout=30)
if(check_table_exists(dataset_table('raw_data'))): # If returns True - Table does not exists
create_table(dataset_table('raw_data'), get_schema(1))
print("Table Created")
else:
print("Table Exists")
if(check_table_exists(dataset_table('fraud_prediction'))): # If returns True - Table does not exists
create_table(dataset_table('fraud_prediction'), get_schema(2))
print("Table Created")
else:
print("Table Exists")
else:
# Check raw_data table
if(check_table_exists(dataset_table('raw_data'))): # If returns True - Table does not exists
create_table(dataset_table('raw_data'), get_schema(1))
print("Table Created")
else:
print("Table Exists")
# Check fraud_prediction table
if(check_table_exists(dataset_table('fraud_prediction'))): # If returns True - Table does not exists
create_table(dataset_table('fraud_prediction'), get_schema(2))
print("Table Created")
else:
print("Table Exists")
# To Insert Data in BigQuery Table
def insert_df_data(df, table_ref):
project_client().load_table_from_dataframe(df, table_ref).result()
# Get Google Cloud Porject
def project_client():
client = bigquery.Client()
return client
# Get Dataset Name
def client_dataset():
dataset_ref = project_client().dataset('fraud_prediction_dataset')
return dataset_ref
# Get Table Name
def dataset_table(table_name):
table_ref = client_dataset().table(table_name)
return table_ref
# Check If Dataset Exists
def check_dataset_exists(dataset):
try:
project_client().get_dataset(dataset)
return False
except NotFound:
return True
# Check If Table Exists
def check_table_exists(table):
try:
project_client().get_table(table)
return False
except NotFound:
return True
# Define Table Schema
def get_schema(schema):
if(schema == 1):
schema1 = [
bigquery.SchemaField("timestamp", "STRING"),
bigquery.SchemaField("transaction_id", "String"),
bigquery.SchemaField("step", "INTEGER"),
bigquery.SchemaField("type", "STRING"),
bigquery.SchemaField("amount", "INTEGER"),
bigquery.SchemaField("source_name", "STRING"),
bigquery.SchemaField("source_bank", "STRING"),
bigquery.SchemaField("source_bank_city", "STRING"),
bigquery.SchemaField("oldbalance_source", "INTEGER"),
bigquery.SchemaField("newbalance_source", "INTEGER"),
bigquery.SchemaField("destination_name", "STRING"),
bigquery.SchemaField("destination_bank", "STRING"),
bigquery.SchemaField("destination_bank_city", "STRING"),
bigquery.SchemaField("oldbalance_destination", "INTEGER"),
bigquery.SchemaField("newbalance_destination", "INTEGER"),
bigquery.SchemaField("isFlaggedFraud", "Integer")
]
return schema1
else:
schema2 = [
bigquery.SchemaField("timestamp", "STRING"),
bigquery.SchemaField("transaction_id", "String"),
bigquery.SchemaField("step", "INTEGER"),
bigquery.SchemaField("type", "STRING"),
bigquery.SchemaField("amount", "INTEGER"),
bigquery.SchemaField("source_name", "STRING"),
bigquery.SchemaField("source_bank", "STRING"),
bigquery.SchemaField("source_bank_city", "STRING"),
bigquery.SchemaField("oldbalance_source", "INTEGER"),
bigquery.SchemaField("newbalance_source", "INTEGER"),
bigquery.SchemaField("destination_name", "STRING"),
bigquery.SchemaField("destination_bank", "STRING"),
bigquery.SchemaField("destination_bank_city", "STRING"),
bigquery.SchemaField("oldbalance_destination", "INTEGER"),
bigquery.SchemaField("newbalance_destination", "INTEGER"),
bigquery.SchemaField("isFlaggedFraud", "Integer"),
bigquery.SchemaField("isFraud", "String"),
]
return schema2
# Create Table
def create_table(table_ref, schema):
table = bigquery.Table(table_ref, schema = schema)
project_client().create_table(table)