Skip to content

Commit

Permalink
[#220]: link comids spatially
Browse files Browse the repository at this point in the history
  • Loading branch information
g.trantham committed Jul 31, 2023
1 parent 89773ae commit 1a4bfa7
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 26 deletions.
2 changes: 1 addition & 1 deletion nldi-crawler.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[nldi-db]
hostname: 172.18.0.1
hostname: 172.21.0.2
port: 5432
username: nldi_schema_owner
password: changeMe
Expand Down
79 changes: 55 additions & 24 deletions notebooks/ORM.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'2.0.0b1'"
"'2.0.19'"
]
},
"execution_count": 3,
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
Expand All @@ -31,18 +31,18 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from sqlalchemy import create_engine\n",
"DB_URL=\"postgresql://nldi_schema_owner:changeMe@172.18.0.1:5432/nldi\" ## demo Database (CI is empty)\n",
"DB_URL=\"postgresql://nldi_schema_owner:changeMe@172.21.0.2:5432/nldi\" ## demo Database (CI is empty)\n",
"eng = create_engine(DB_URL, client_encoding=\"UTF-8\", echo=False, future=True)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -64,7 +64,7 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -95,7 +95,7 @@
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -142,7 +142,7 @@
},
{
"cell_type": "code",
"execution_count": 10,
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -152,35 +152,41 @@
},
{
"cell_type": "code",
"execution_count": 12,
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" 1 :: Water Quality Portal \n",
"\t Source URI: https://www.waterqualitydata.us/data/Station/search?mimeType=geojson&minactivities=1&counts=no\n",
" 2 :: HUC12 Pour Points \n",
" 1 :: HUC12 Pour Points \n",
"\t Source URI: https://www.sciencebase.gov/catalogMaps/mapping/ows/57336b02e4b0dae0d5dd619a?service=WFS&version=1.0.0&request=GetFeature&srsName=EPSG:4326&typeName=sb:fpp&outputFormat=json\n",
" 5 :: NWIS Surface Water Sites \n",
" 2 :: Water Quality Portal \n",
"\t Source URI: https://www.waterqualitydata.us/data/Station/search?mimeType=geojson&minactivities=1&counts=no\n",
" 3 :: NWIS Surface Water Sites \n",
"\t Source URI: https://www.sciencebase.gov/catalog/file/get/60c7b895d34e86b9389b2a6c?name=usgs_nldi_gages.geojson\n",
" 6 :: Water Data Exchange 2.0 Sites \n",
" 4 :: Water Data Exchange 2.0 Sites \n",
"\t Source URI: https://www.hydroshare.org/resource/5f665b7b82d74476930712f7e423a0d2/data/contents/wade.geojson\n",
" 7 :: geoconnex.us reference gages \n",
" 5 :: geoconnex.us reference gages \n",
"\t Source URI: https://www.hydroshare.org/resource/3295a17b4cc24d34bd6a5c5aaf753c50/data/contents/nldi_gages.geojson\n",
" 8 :: Streamgage catalog for CA SB19 \n",
" 6 :: Streamgage catalog for CA SB19 \n",
"\t Source URI: https://sb19.linked-data.internetofwater.dev/collections/ca_gages/items?f=json&limit=10000\n",
" 9 :: USGS Geospatial Fabric V1.1 Poin\n",
" 7 :: USGS Geospatial Fabric V1.1 Poin\n",
"\t Source URI: https://www.sciencebase.gov/catalogMaps/mapping/ows/609c8a63d34ea221ce3acfd3?service=WFS&version=1.0.0&request=GetFeature&srsName=EPSG:4326&typeName=sb::gfv11&outputFormat=json\n",
"10 :: Vigil Network Data \n",
" 8 :: Vigil Network Data \n",
"\t Source URI: https://www.sciencebase.gov/catalog/file/get/60c7b895d34e86b9389b2a6c?name=vigil.geojson\n",
"11 :: NWIS Groundwater Sites \n",
" 9 :: NWIS Groundwater Sites \n",
"\t Source URI: https://www.sciencebase.gov/catalog/file/get/60c7b895d34e86b9389b2a6c?name=nwis_wells.geojson\n",
"12 :: New Mexico Water Data Initative \n",
"10 :: New Mexico Water Data Initative \n",
"\t Source URI: https://locations.newmexicowaterdata.org/collections/Things/items?f=json&limit=100000\n",
"13 :: geoconnex contribution demo site\n",
"\t Source URI: https://geoconnex-demo-pages.internetofwater.dev/collections/demo-gpkg/items?f=json&limit=10000\n"
"11 :: geoconnex contribution demo site\n",
"\t Source URI: https://geoconnex-demo-pages.internetofwater.dev/collections/demo-gpkg/items?f=json&limit=10000\n",
"12 :: EPA National Rivers and Streams \n",
"\t Source URI: https://www.sciencebase.gov/catalog/file/get/60c7b895d34e86b9389b2a6c?name=nrsa_nldi.geojson\n",
"13 :: 2020 Census Block - NHDPlusV2 Ca\n",
"\t Source URI: https://storage.googleapis.com/nhgf/census/2020pts.geojson\n",
"14 :: NPDES Facilities that Discharge \n",
"\t Source URI: https://www.hydroshare.org/resource/495b65e56e994289baaa5feeb401358e/data/contents/npdes/NPDES_pts.geojson\n"
]
}
],
Expand All @@ -203,6 +209,31 @@
" # print(f\"\\t Feature Type {source.feature_type}\")"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"It's a point\n"
]
}
],
"source": [
"f = \"point\"\n",
"\n",
"match f:\n",
" case \"point\":\n",
" print(\"It's a point\")\n",
" case \"line\":\n",
" print(\"It's a line\")\n",
" case _:\n",
" print(\"Unknown shape\")"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -227,7 +258,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.16 (main, Dec 14 2022, 13:52:45) \n[GCC 11.3.0]"
"version": "3.10.6"
},
"orig_nbformat": 4,
"vscode": {
Expand Down
2 changes: 2 additions & 0 deletions src/nldi_crawler/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,5 @@ def ingest(ctx: click.Context, source_id: int) -> None:
ingestor.create_tmp_table(ctx.obj["DAL"], src)
ingestor.sql_ingestor(src, dal=ctx.obj["DAL"])
ingestor.install_data(ctx.obj["DAL"], src)
ingestor.link_comids(ctx.obj["DAL"], src)
ingestor.drop_null_comids(ctx.obj["DAL"], src)
74 changes: 73 additions & 1 deletion src/nldi_crawler/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ def sql_ingestor(src: CrawlerSource, dal: DataAccessLayer) -> int:
finally:
mapper_registry.dispose()
dal.disconnect()
return i - 1
injested = i - 1

return injested


def create_tmp_table(dal: DataAccessLayer, src: CrawlerSource) -> None:
Expand Down Expand Up @@ -173,3 +175,73 @@ def install_data(dal: DataAccessLayer, src: CrawlerSource) -> None:
session.execute(text(stmt))
session.commit()
dal.disconnect()

def drop_null_comids(dal: DataAccessLayer, src: CrawlerSource) -> None:
"""
Remove any rows from the feature table that have a null comid value
Args:
dal (DataAccessLayer): Connection information for the database
src (CrawlerSource): Details for the source we are crawling
"""

table = src.tablename()
schema = "nldi_data"
logging.info("Dropping features with empty COMID from %s", table)

dal.connect()
stmt = f"""
set search_path = {schema};
DELETE FROM "{table}" WHERE comid = 0;
"""
with dal.Session() as session:
session.execute(text(stmt))
session.commit()
dal.disconnect()


def link_comids(dal: DataAccessLayer, src: CrawlerSource) -> None:
"""
Args:
dal (DataAccessLayer): Connection information for the database
src (CrawlerSource): Details for the source we are crawling
"""
table = src.tablename()
schema = "nldi_data"

match src.ingest_type.upper():
case "POINT":
logging.info("Matching COMIDs for %s points", src.source_name)
stmt = f"""
UPDATE {schema}."{table}" upd_table
SET comid = featureid
FROM {schema}."{table}" src_table
JOIN nhdplus.catchmentsp
ON ST_covers(catchmentsp.the_geom, src_table.location)
WHERE upd_table.crawler_source_id = src_table.crawler_source_id AND
upd_table.identifier = src_table.identifier
"""
case "REACH":
logging.info("Matching COMIDs for %s reaches", src.source_name)
stmt = f"""
UPDATE {schema}."{table}" upd_table
set comid = nhdflowline_np21.nhdplus_comid
FROM {schema}."{table}" src_table
join nhdplus.nhdflowline_np21
on nhdflowline_np21.reachcode = src_table.reachcode and
src_table.measure between nhdflowline_np21.fmeasure and nhdflowline_np21.tmeasure
where upd_table.crawler_source_id = src_table.crawler_source_id and
upd_table.identifier = src_table.identifier
"""
case _:
logging.info("Unknown ingest_type: %s", src.ingest_type)
return

dal.connect()
with dal.Session() as session:
session.execute(text(stmt))
session.commit()
dal.disconnect()
logging.info("Done matching COMIDs for %s", src.source_name)

0 comments on commit 1a4bfa7

Please sign in to comment.