Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add table description and automatically list projects #534

Merged
merged 5 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pipelines/rj_escritorio/data_catalog/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pipelines.rj_escritorio.data_catalog.schedules import update_data_catalog_schedule
from pipelines.rj_escritorio.data_catalog.tasks import (
generate_dataframe_from_list_of_tables,
list_projects,
list_tables,
merge_list_of_list_of_tables,
update_gsheets_data_catalog,
Expand All @@ -28,18 +29,16 @@
],
) as rj_escritorio_data_catalog_flow:
# Parameters
project_ids = Parameter("project_ids")
spreadsheet_url = Parameter("spreadsheet_url")
sheet_name = Parameter("sheet_name")
bq_client_mode = Parameter("bq_client_mode", default="prod")
exclude_dev_projects = Parameter("exclude_dev_projects", default=True)
exclude_staging = Parameter("exclude_staging", default=True)
exclude_test = Parameter("exclude_test", default=True)
exclude_logs = Parameter("exclude_logs", default=True)

# Flow
project_ids = parse_comma_separated_string_to_list(
input_text=project_ids, output_type=str
)
project_ids = list_projects(mode=bq_client_mode, exclude_dev=exclude_dev_projects)
list_of_list_of_tables = list_tables.map(
project_id=project_ids,
mode=unmapped(bq_client_mode),
Expand Down
98 changes: 73 additions & 25 deletions pipelines/rj_escritorio/data_catalog/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
"""
Tasks for generating a data catalog from BigQuery.
"""
from typing import List

from google.api_core.exceptions import NotFound
from google.cloud import bigquery
from googleapiclient import discovery
import gspread
import pandas as pd
from prefect import task
Expand All @@ -15,6 +19,41 @@
from pipelines.utils.utils import get_credentials_from_env, log


@task
def list_projects(
mode: str = "prod",
exclude_dev: bool = True,
) -> List[str]:
"""
Lists all GCP projects that we have access to.

Args:
mode: Credentials mode.
exclude_dev: Exclude projects that ends with "-dev".

Returns:
List of project IDs.
"""
credentials = get_credentials_from_env(mode=mode)
service = discovery.build("cloudresourcemanager", "v1", credentials=credentials)
request = service.projects().list()
projects = []
while request is not None:
response = request.execute()
for project in response.get("projects", []):
project_id = project["projectId"]
if exclude_dev and project_id.endswith("-dev"):
log(f"Excluding dev project {project_id}.")
continue
log(f"Found project {project_id}.")
projects.append(project_id)
request = service.projects().list_next(
previous_request=request, previous_response=response
)
log(f"Found {len(projects)} projects.")
return projects


@task
def list_tables( # pylint: disable=too-many-arguments
project_id: str,
Expand Down Expand Up @@ -50,32 +89,41 @@ def list_tables( # pylint: disable=too-many-arguments
client = get_bigquery_client(mode=mode)
log(f"Listing tables in project {project_id}.")
tables = []
for dataset in client.list_datasets(project=project_id):
dataset_id: str = dataset.dataset_id
if exclude_staging and dataset_id.endswith("_staging"):
log(f"Excluding staging dataset {dataset_id}.")
continue
if exclude_test and "test" in dataset_id:
log(f"Excluding test dataset {dataset_id}.")
continue
if exclude_logs and (
dataset_id.startswith("logs_") or dataset_id.endswith("_logs")
):
log(f"Excluding logs dataset {dataset_id}.")
continue
for table in client.list_tables(dataset):
table_id = table.table_id
if exclude_test and "test" in table_id:
log(f"Excluding test table {table_id}.")
try:
datasets = client.list_datasets(project=project_id)
for dataset in datasets:
dataset_id: str = dataset.dataset_id
if exclude_staging and dataset_id.endswith("_staging"):
log(f"Excluding staging dataset {dataset_id}.")
continue
if exclude_test and "test" in dataset_id:
log(f"Excluding test dataset {dataset_id}.")
continue
if exclude_logs and (
dataset_id.startswith("logs_") or dataset_id.endswith("_logs")
):
log(f"Excluding logs dataset {dataset_id}.")
continue
table_info = {
"project_id": project_id,
"dataset_id": dataset_id,
"table_id": table_id,
"url": f"https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table",
"private": not project_id == "datario",
}
tables.append(table_info)
for table in client.list_tables(dataset):
table_id = table.table_id
table_object = client.get_table(table.reference)
if exclude_test and "test" in table_id:
log(f"Excluding test table {table_id}.")
continue
table_description = table_object.description
table_info = {
"project_id": project_id,
"dataset_id": dataset_id,
"table_id": table_id,
"description": table_description,
"url": f"https://console.cloud.google.com/bigquery?p={project_id}&d={dataset_id}&t={table_id}&page=table",
"private": not project_id == "datario",
}
tables.append(table_info)
except NotFound:
# This will happen if BigQuery API is not enabled for this project. Just return an empty
# list
return tables
log(f"Found {len(tables)} tables in project {project_id}.")
return tables

Expand Down