-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathflow_crawlers.py
54 lines (42 loc) · 1.39 KB
/
flow_crawlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from prefect.context import FlowRunContext
from prefect import flow, task
from crawler.crawler_otodom import CrawlerOTODOM
from crawler.crawler_olx import CrawlerOLX
from crawler.data_cleaner.data_cleaner import DataCleaner
from _common.email_sender.send_finish_message import send_finish_message
@task(name='scrape_otodom_data', log_prints=True)
def scrape_otodom_data():
crawler = CrawlerOTODOM()
try:
crawler.scrape()
except Exception as e:
crawler.kill_webdriver_processes()
raise e
else:
crawler.kill_webdriver_processes()
@task(name='scrape_olx_data', log_prints=True)
def scrape_olx_data():
crawler = CrawlerOLX()
try:
crawler.scrape()
except Exception as e:
crawler.kill_webdriver_processes()
raise e
else:
crawler.kill_webdriver_processes()
@task(name='clean_data', log_prints=True)
def clean_data():
flow_name = FlowRunContext.get().flow_run.dict().get('name')
print(f'The flow name is: {flow_name}')
cleaner = DataCleaner(flow_name=flow_name)
cleaner.clean_and_save_data()
@flow(
name='run_crawlers', retries=3, log_prints=True,
on_completion=[send_finish_message], on_failure=[send_finish_message]
)
def run_crawlers():
# scrape_olx_data()
scrape_otodom_data()
clean_data()
if __name__ == "__main__":
run_crawlers.serve(name="2023-12-05", cron='0 18,6 * * *')