diff --git a/nldi-crawler.toml b/nldi-crawler.toml index 38fa3dc..09be804 100644 --- a/nldi-crawler.toml +++ b/nldi-crawler.toml @@ -1,5 +1,5 @@ [nldi-db] -hostname: 172.18.0.1 +hostname: 172.21.0.2 port: 5432 username: nldi_schema_owner password: changeMe diff --git a/notebooks/ORM.ipynb b/notebooks/ORM.ipynb index b2f95b0..77d7a8e 100644 --- a/notebooks/ORM.ipynb +++ b/notebooks/ORM.ipynb @@ -10,16 +10,16 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "'2.0.0b1'" + "'2.0.19'" ] }, - "execution_count": 3, + "execution_count": 1, "metadata": {}, "output_type": "execute_result" } @@ -31,18 +31,18 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from sqlalchemy import create_engine\n", - "DB_URL=\"postgresql://nldi_schema_owner:changeMe@172.18.0.1:5432/nldi\" ## demo Database (CI is empty)\n", + "DB_URL=\"postgresql://nldi_schema_owner:changeMe@172.21.0.2:5432/nldi\" ## demo Database (CI is empty)\n", "eng = create_engine(DB_URL, client_encoding=\"UTF-8\", echo=False, future=True)" ] }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ @@ -64,7 +64,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ @@ -95,7 +95,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 7, "metadata": {}, "outputs": [], "source": [ @@ -142,7 +142,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 8, "metadata": {}, "outputs": [], "source": [ @@ -152,35 +152,41 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - " 1 :: Water Quality Portal \n", - "\t Source URI: https://www.waterqualitydata.us/data/Station/search?mimeType=geojson&minactivities=1&counts=no\n", - " 2 :: HUC12 Pour Points \n", + " 1 :: HUC12 Pour Points \n", "\t Source URI: https://www.sciencebase.gov/catalogMaps/mapping/ows/57336b02e4b0dae0d5dd619a?service=WFS&version=1.0.0&request=GetFeature&srsName=EPSG:4326&typeName=sb:fpp&outputFormat=json\n", - " 5 :: NWIS Surface Water Sites \n", + " 2 :: Water Quality Portal \n", + "\t Source URI: https://www.waterqualitydata.us/data/Station/search?mimeType=geojson&minactivities=1&counts=no\n", + " 3 :: NWIS Surface Water Sites \n", "\t Source URI: https://www.sciencebase.gov/catalog/file/get/60c7b895d34e86b9389b2a6c?name=usgs_nldi_gages.geojson\n", - " 6 :: Water Data Exchange 2.0 Sites \n", + " 4 :: Water Data Exchange 2.0 Sites \n", "\t Source URI: https://www.hydroshare.org/resource/5f665b7b82d74476930712f7e423a0d2/data/contents/wade.geojson\n", - " 7 :: geoconnex.us reference gages \n", + " 5 :: geoconnex.us reference gages \n", "\t Source URI: https://www.hydroshare.org/resource/3295a17b4cc24d34bd6a5c5aaf753c50/data/contents/nldi_gages.geojson\n", - " 8 :: Streamgage catalog for CA SB19 \n", + " 6 :: Streamgage catalog for CA SB19 \n", "\t Source URI: https://sb19.linked-data.internetofwater.dev/collections/ca_gages/items?f=json&limit=10000\n", - " 9 :: USGS Geospatial Fabric V1.1 Poin\n", + " 7 :: USGS Geospatial Fabric V1.1 Poin\n", "\t Source URI: https://www.sciencebase.gov/catalogMaps/mapping/ows/609c8a63d34ea221ce3acfd3?service=WFS&version=1.0.0&request=GetFeature&srsName=EPSG:4326&typeName=sb::gfv11&outputFormat=json\n", - "10 :: Vigil Network Data \n", + " 8 :: Vigil Network Data \n", "\t Source URI: https://www.sciencebase.gov/catalog/file/get/60c7b895d34e86b9389b2a6c?name=vigil.geojson\n", - "11 :: NWIS Groundwater Sites \n", + " 9 :: NWIS Groundwater Sites \n", "\t Source URI: https://www.sciencebase.gov/catalog/file/get/60c7b895d34e86b9389b2a6c?name=nwis_wells.geojson\n", - "12 :: New Mexico Water Data Initative \n", + "10 :: New Mexico Water Data Initative \n", "\t Source URI: https://locations.newmexicowaterdata.org/collections/Things/items?f=json&limit=100000\n", - "13 :: geoconnex contribution demo site\n", - "\t Source URI: https://geoconnex-demo-pages.internetofwater.dev/collections/demo-gpkg/items?f=json&limit=10000\n" + "11 :: geoconnex contribution demo site\n", + "\t Source URI: https://geoconnex-demo-pages.internetofwater.dev/collections/demo-gpkg/items?f=json&limit=10000\n", + "12 :: EPA National Rivers and Streams \n", + "\t Source URI: https://www.sciencebase.gov/catalog/file/get/60c7b895d34e86b9389b2a6c?name=nrsa_nldi.geojson\n", + "13 :: 2020 Census Block - NHDPlusV2 Ca\n", + "\t Source URI: https://storage.googleapis.com/nhgf/census/2020pts.geojson\n", + "14 :: NPDES Facilities that Discharge \n", + "\t Source URI: https://www.hydroshare.org/resource/495b65e56e994289baaa5feeb401358e/data/contents/npdes/NPDES_pts.geojson\n" ] } ], @@ -203,6 +209,31 @@ " # print(f\"\\t Feature Type {source.feature_type}\")" ] }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "It's a point\n" + ] + } + ], + "source": [ + "f = \"point\"\n", + "\n", + "match f:\n", + " case \"point\":\n", + " print(\"It's a point\")\n", + " case \"line\":\n", + " print(\"It's a line\")\n", + " case _:\n", + " print(\"Unknown shape\")" + ] + }, { "cell_type": "code", "execution_count": null, @@ -227,7 +258,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.16 (main, Dec 14 2022, 13:52:45) \n[GCC 11.3.0]" + "version": "3.10.6" }, "orig_nbformat": 4, "vscode": { diff --git a/src/nldi_crawler/cli.py b/src/nldi_crawler/cli.py index 68bfee7..947914d 100644 --- a/src/nldi_crawler/cli.py +++ b/src/nldi_crawler/cli.py @@ -172,3 +172,5 @@ def ingest(ctx: click.Context, source_id: int) -> None: ingestor.create_tmp_table(ctx.obj["DAL"], src) ingestor.sql_ingestor(src, dal=ctx.obj["DAL"]) ingestor.install_data(ctx.obj["DAL"], src) + ingestor.link_comids(ctx.obj["DAL"], src) + ingestor.drop_null_comids(ctx.obj["DAL"], src) diff --git a/src/nldi_crawler/ingestor.py b/src/nldi_crawler/ingestor.py index 9877d96..1d2cddd 100644 --- a/src/nldi_crawler/ingestor.py +++ b/src/nldi_crawler/ingestor.py @@ -113,7 +113,9 @@ def sql_ingestor(src: CrawlerSource, dal: DataAccessLayer) -> int: finally: mapper_registry.dispose() dal.disconnect() - return i - 1 + injested = i - 1 + + return injested def create_tmp_table(dal: DataAccessLayer, src: CrawlerSource) -> None: @@ -173,3 +175,73 @@ def install_data(dal: DataAccessLayer, src: CrawlerSource) -> None: session.execute(text(stmt)) session.commit() dal.disconnect() + +def drop_null_comids(dal: DataAccessLayer, src: CrawlerSource) -> None: + """ + Remove any rows from the feature table that have a null comid value + + Args: + dal (DataAccessLayer): Connection information for the database + src (CrawlerSource): Details for the source we are crawling + """ + + table = src.tablename() + schema = "nldi_data" + logging.info("Dropping features with empty COMID from %s", table) + + dal.connect() + stmt = f""" + set search_path = {schema}; + DELETE FROM "{table}" WHERE comid = 0; + """ + with dal.Session() as session: + session.execute(text(stmt)) + session.commit() + dal.disconnect() + + +def link_comids(dal: DataAccessLayer, src: CrawlerSource) -> None: + """ + + Args: + dal (DataAccessLayer): Connection information for the database + src (CrawlerSource): Details for the source we are crawling + """ + table = src.tablename() + schema = "nldi_data" + + match src.ingest_type.upper(): + case "POINT": + logging.info("Matching COMIDs for %s points", src.source_name) + stmt = f""" + UPDATE {schema}."{table}" upd_table + SET comid = featureid + FROM {schema}."{table}" src_table + JOIN nhdplus.catchmentsp + ON ST_covers(catchmentsp.the_geom, src_table.location) + WHERE upd_table.crawler_source_id = src_table.crawler_source_id AND + upd_table.identifier = src_table.identifier + """ + case "REACH": + logging.info("Matching COMIDs for %s reaches", src.source_name) + stmt = f""" + UPDATE {schema}."{table}" upd_table + set comid = nhdflowline_np21.nhdplus_comid + FROM {schema}."{table}" src_table + join nhdplus.nhdflowline_np21 + on nhdflowline_np21.reachcode = src_table.reachcode and + src_table.measure between nhdflowline_np21.fmeasure and nhdflowline_np21.tmeasure + where upd_table.crawler_source_id = src_table.crawler_source_id and + upd_table.identifier = src_table.identifier + """ + case _: + logging.info("Unknown ingest_type: %s", src.ingest_type) + return + + dal.connect() + with dal.Session() as session: + session.execute(text(stmt)) + session.commit() + dal.disconnect() + logging.info("Done matching COMIDs for %s", src.source_name) +