-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNumber test.py
114 lines (94 loc) · 3.56 KB
/
Number test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from prefect import flow, task
from prefect_snowflake import SnowflakeConnector
import csv
@task
def setup_table_1(block_name: str, num_iterations: int) -> None:
"""
Creates a Snowflake table named `snowflake_loop_test` and inserts numbers from 1 to `num_iterations`.
Args:
block_name (str): The name of the Snowflake connection block.
num_iterations (int): The number of rows to insert into the table.
"""
with SnowflakeConnector.load(block_name) as connector:
# Create the table if it doesn't exist
connector.execute(
"CREATE TABLE IF NOT EXISTS snowflake_loop_test (NUM1 integer);"
)
# Generate numbers dynamically based on num_iterations
seq_of_parameters = [{"num": i} for i in range(1, num_iterations + 1)]
# Insert numbers into the table
connector.execute_many(
"INSERT INTO snowflake_loop_test (NUM1) VALUES (%(num)s);",
seq_of_parameters=seq_of_parameters,
)
@task
def fetch_data(block_name: str) -> list:
"""
Fetches data from the `customers_11` table in batches of 2 rows at a time.
Args:
block_name (str): The name of the Snowflake connection block.
Returns:
list: A list of rows fetched from the table.
"""
all_rows = []
with SnowflakeConnector.load(block_name) as connector:
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = connector.fetch_many("SELECT * FROM customers_11", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@task
def export_table_to_csv(block_name: str, output_file: str):
"""
Exports data from the `snowflake_loop_test` table in Snowflake to a CSV file.
Args:
block_name (str): The name of the Snowflake connection block.
output_file (str): The path to the output CSV file.
Raises:
Exception: If an error occurs during database operations or file writing.
"""
try:
# Load the Snowflake connection block
snowflake_connector = SnowflakeConnector.load(block_name)
# Open connection
conn = snowflake_connector.get_connection()
cursor = conn.cursor()
# Define your query
query = "SELECT * FROM snowflake_loop_test"
# Execute query
cursor.execute(query)
if cursor.description: # Check if there are results
result = cursor.fetchall() # Fetch all results from the query
else:
result = []
# Write to CSV
with open(output_file, "w") as f:
writer = csv.writer(f)
# Write header
writer.writerow([col[0] for col in cursor.description])
# Write data
writer.writerows(result)
except Exception as e:
raise
finally:
# Close connection
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
@flow
def snowflake_flow(block_name: str) -> list:
"""
Orchestrates the setup of a Snowflake table, inserts data, and exports it to a CSV file.
Args:
block_name (str): The name of the Snowflake connection block.
Returns:
list: A list of rows fetched from the table (optional).
"""
setup_table_1(block_name, 50)
export_table_to_csv(block_name, 'number_test_output.csv')
if __name__ == "__main__":
snowflake_flow("snowflake-connection-block")