diff --git a/README.md b/README.md deleted file mode 100644 index 4f5a33a..0000000 --- a/README.md +++ /dev/null @@ -1,104 +0,0 @@ -[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.3549078.svg)](https://doi.org/10.5281/zenodo.3549078) -___ - -# ERA5 - -The era5 python code is an interface to the CDS api to download ERA5 data from the CDS data server. -It uses a modified version of the CDS api which stops after a request has been submitted and executed. The target download url is saved and downloads are run in parallel by the code using the Pool multiprocessing module. -As well as managing the downloads the code gets all the necessary information on available variables from local json configuration files. -Before submitting a request the code will check that the file is not already available locally by quering a sqlite database. After downloading new files it is important to update the database to avoid downloading twice the same file. Files are first downloaded in a staging area, a quick qc to see if the file is a valid netcdf file is run and finally the file is converted to netcdf4 format with internal compression. -The code default behaviour is to download netcdf files, but it's also possible to download grib, however we haven't tested the full workflow for this option. - -## Getting started - -To run a download:: - - python cli.py download -s surface -y 2018 -m 11 -p 228.128 - - Download ERA5 variables, to be preferred if adding a new variable, if - month argument is not passed then the entire year will be downloaded. By - default it downloads hourly data in netcdf format. - -Options: -+ -q, --queue Create json file to add request to queue -+ -s, --stream [surface|wave|pressure|land] - ECMWF stream currently operative analysis - surface, pressure levels, wave model and - ERA5 land [required] -+ -y, --year TEXT year to download [required] -+ -m, --month TEXT month/s to download, if not specified all - months for year will be downloaded -+ -t, --timestep [mon|hr] timestep hr or mon, if not specified hr -+ -b, --back Request backwards all years and months as - one file, works only for monthly data -+ --format [grib|netcdf] Format output: grib or netcdf -+ -p, --param TEXT Grib code parameter for selected variable, - pass as param.table i.e. 132.128 [required] -+ --help Show this message and exit. - -To update files when a new month is released:: - - python cli.py update -s surface -y 2019 -m 05 - - Update ERA5 variables, to be used for regular monthly updates. Only the stream argument without params will update all the variables listed in the era5__.json file. - Options are the same as for download. - -The 'download' and 'update' commands will actually download the data unless you use the 'queue' flag. -If you want only to create a request:: - - python cli.py download -s surface -y 2018 -m 11 -p 228.128 -q - -This will create a json file which stores the arguments passed: - - era5_request_.json - {"update": false, "format": "netcdf", "stream": "surface", "params": ["228.128"], - "year": "2018", "months": ["11"], "timestep": "hr", "back": false} - -For this request to be downloaded the file should be moved to - - /g/data/ub4/Work/Scripts/ERA5/Requests - -To execute the request the tool is used with the 'scan' command option:: - - python cli.py scan -f era5_request_.json - -### click era5 command code and input files -+ cli.py -- main code, includes click commands and functions to buil request and submit it with pool - python cli.py --help to check usage -+ era5_functions.py -- functions used by cli.py, just separated them so it is overall more readable -+ update_json_vars.py -- script that uses the clef db to update era5_vars.json - needs clef module, currently - module use /g/data/hh5/tmp/public/modules - module load conda/analysis3 -+ era5_update_db.py -- crawls /g/data/ub4/era5/netcdf and adds new netcdf files to the sqlite database - -### To configure the tool - -+ config.json -- to set configuration: -..+ staging, data, logs directories, -..+ database name and location, -..+ bash commands to download, resume download, qc and compress files, -..+ number of threads, -..+ number of resume download attempts, -..+ slow and fast ips - -+ era5_pressure_hr.json -- pressure levels stream arguments to build request and list of params to download at hourly temporal resolution -+ era5_pressure_mon.json -- pressure levels stream arguments to build request and list of params to download at monthly temporal resolution -+ era5_wave_hr.json -- wave model surface level stream arguments to build request and list of params to download at hourly temporal resolution -+ era5_wave_mon.json -- wave model surface level stream arguments to build request and list of params to download at monthly temporal resolution -+ era5_surface_hr.json -- surface level stream arguments to build request and list of params to download at hourly temporal resolution -+ era5_surface_mon.json -- surface level stream arguments to build request and list of params to download at monthly temporal resolution -+ era5_land_hr.json -- Land model surface level stream arguments to build request and list of params to download at hourly temporal resolution -+ era5_vars.json -- Json file with list of grib codes that can be downloaded from CDS and respective variable and cds names - -### Other files -(not included in git) - -+ era5.sqlite -- sqlite database -+ setup.py.template -- template for setup.py - -### Modified cdsapi code -+ cdsapi: - __init__.py - __pycache__ - api.py diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..cdec0d4 --- /dev/null +++ b/README.rst @@ -0,0 +1,147 @@ +|DOI| \_\_\_ + +ERA5 +==== + +The era5 python code is an interface to the CDS api to download ERA5 +data from the CDS data server. It uses a modified version of the CDS api +which stops after a request has been submitted and executed. The target +download url is saved and downloads are run in parallel by the code +using the Pool multiprocessing module. As well as managing the downloads +the code gets all the necessary information on available variables from +local json configuration files. Before submitting a request the code +will check that the file is not already available locally by quering a +sqlite database. After downloading new files it is important to update +the database to avoid downloading twice the same file. Files are first +downloaded in a staging area, a quick qc to see if the file is a valid +netcdf file is run and finally the file is converted to netcdf4 format +with internal compression. The code default behaviour is to download +netcdf files, but it's also possible to download grib, however we +haven't tested the full workflow for this option. + +Getting started +--------------- + +To run a download:: + + era5 download -s surface -y 2018 -m 11 -p 228.128 + +Download ERA5 variables, if month argument is not passed then the entire year will be downloaded. By default it downloads hourly data in netcdf format. + +Options: + * -q, --queue Create json file to add request to queue + * -u, --urgent In conjunction with queue save the request file in a separate directory that can be then prioritised. + * -s, --stream [surface|wave|pressure|land|cems_fire|agera5|wdfe5] ECMWF stream currently operative analysis surface, pressure levels, wave model, and derived products ERA5 land, CEMS_fire, AGERA5 and WDFE5 +[required] + * -y, --year TEXT year to download [required] + * -m, --month TEXT month/s to download, if not specified all months for year will be downloaded + * -t, --timestep [mon|hr|day] timestep if not specified hr is default + * -b, --back Request backwards all years and months as one file, works only for monthly data + * --format [grib|netcdf|tgz|zip] Format output: netcdf default, some formats work only for certain streams + * -p, --param TEXT Grib code parameter for selected variable, pass as param.table i.e. 132.128 If none passed then allthe params listed in era5__.json will be used. + * --help +Show this message and exit. + +To update files when a new month is released, omit param flag:: + + era5 download -s surface -y 2019 -m 05 + + +The 'download' sub command will actually request and download the data +unless you use the 'queue' flag. If you want only to create a request:: + + era5 download -s surface -y 2018 -m 11 -p 228.128 -q + +This will create a json file which stores the arguments passed:: + + era5_request_.json + {"update": false, "format": "netcdf", "stream": "surface", "params": ["228.128"], + "year": "2018", "months": ["11"], "timestep": "hr", "back": false} + +To execute the request the tool is used with the 'scan' command option:: + + era5 scan -f era5_request_.json + +To manage the database use 'era5 db' subcommand:: + + era5 db -s surface -p u10 + +for example will add all new surface hourly (tstep is hr by default) u10 files in database. NB that in this case you pass the variable name not the grib code to the -p flag. +'db' can also list all variables in a stream:: + + era5 db -a list -s land -t mon + +Unless a variable is specified this will show all variables regularly updated for the stream, how many corrsponding files are on the filesystem and in the databse. These numbers are compared to the expected number of files based on the current date. + +Finally to delete records:: + + era5 db -a delete -s land -t mon -p ro -y 2019 + + This will delete all corresponding records but will list all records to be deleted and ask for confirmation first. + + +Latest updates +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +August 2020 - Added --urgent flag + Added support to distribute requests across multiple users accounts + +click era5 command code and input files +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +- cli.py -- main code, includes click commands and functions to buil + request and submit it with pool python cli.py --help to check usage +- era5_functions.py -- functions used by cli.py, just separated them + so it is overall more readable +- era5_db.py -- has all the fuctions relating to db operations +- update_json_vars.py -- script that uses the clef db to update + era5_vars.json needs clef module +- setup.py and setup.cfg - to install module + +To configure the tool +~~~~~~~~~~~~~~~~~~~~~ + +These files are in era5/data/ + +- config.json -- to set configuration: + * staging, data, logs directories, + * database name and location, + * bash commands to download, resume download, qc, compress and concatenate files, + * number of threads, + * number of resume download attempts, + * slow and fast ips + +- era5_pressure_hr.json -- pressure levels stream arguments to build + request and list of params to download at hourly temporal resolution +- era5_pressure_mon.json -- pressure levels stream arguments to build + request and list of params to download at monthly temporal resolution +- era5_wave_hr.json -- wave model surface level stream arguments to + build request and list of params to download at hourly temporal + resolution +- era5_wave_mon.json -- wave model surface level stream arguments to + build request and list of params to download at monthly temporal + resolution +- era5_surface_hr.json -- surface level stream arguments to build + request and list of params to download at hourly temporal resolution +- era5_surface_mon.json -- surface level stream arguments to build + request and list of params to download at monthly temporal resolution +- era5_land_hr.json -- Land model surface level stream arguments to + build request and list of params to download at hourly temporal + resolution +- era5_vars.json -- Json file with list of grib codes that can be + downloaded from CDS and respective variable and cds names +- era5_derived.json -- Json file with list of derived products variables + +Other files +~~~~~~~~~~~ + +(not included in git) + +- era5.sqlite -- sqlite database + +Modified cdsapi code +~~~~~~~~~~~~~~~~~~~~ + +- cdsapi: **init**.py **pycache** api.py + +.. |DOI| image:: https://zenodo.org/badge/DOI/10.5281/zenodo.3549078.svg + :target: https://doi.org/10.5281/zenodo.3549078 diff --git a/docs/gettingstarted.rst b/docs/gettingstarted.rst new file mode 100644 index 0000000..8d4c669 --- /dev/null +++ b/docs/gettingstarted.rst @@ -0,0 +1,75 @@ +Getting Started +=============== + +ERA5 is an interface to the CDSapi (Copernicus Climate Data Store API). It helps automating and submitting csapi requests. ERA5 is python3 based and uses the click module so it can be run as command line. +The configuration of the tool and specific information on the variables is provided by json files. + +Once is installed the tool is accessed through the command-line `era5` program. There are +presently three subcommands: + + * :code:`clef download` to submit a request and download new files, at least one stream has to be specified. If you omit the variable then all the variables listed in the corresponding stream json file will be downloaded. this is useful for regular updates. + + * :code:`clef scan` to execute a download request starting from a json constraints file previously saved using download with the --queue/-q flag. + +Examples +-------- + + +download ++++++ +:: + + $ era5 download -s pressure -y 2020 -m 02 -p 132.128 + +This will put in a request for temperature (whose grib code is 132.128) for February 2020 on pressure levels and at 1hr timesteps. +The area and grid resolution are defined in the +era5/data/era5_pressure_hr.json file + +NB that you can change the timestep by using the `-t` flag, `hr` is the default value, other possibilities are `mon` and `day` . Depending on the stream some of these options might not be valid. + +Other flags are: + * -h/--help + * -q/--queue which saves the constraints to a json file that can be passed later to the `scan` subcommand + * -f/--file works only with `scan` + * --back works only with monthly data, it allows to download backwards all available data to the year passed with `-y`. + +Scan example:: + $ era5 scan -f era5_request_file.json + +Update the database:: + + $ era5 db -s land -t mon -p ro + +NB + * -a/--action works only for db subcommand to choose what db action to execute: update (default), list, or delete + + +Installation +============ +The tool uses python >=3.6 and you need to install the click package. + +To setup the tool:: + $ python setup.py install + +The github repository includes a modified version of the cdsapi. +For this to work you need to setup a .csdapirc file in your home directory with the api url and your own api key. It should look like:: + + url: https://cds.climate.copernicus.eu/api/v2 + key: #########################:: + + +The instructions are available from the ECMWF confluence. +NB most of the tool functionalities ar epreserved even if you use the original api. What the modified api does is to stop the api working before it can download the files. Separating the file download from the requests helps managing the downloads as parallel tasks and avoid potential sisue should the connection being interrupted. + +Step2 +----- +Configure the tool by defining the following settings in era5/data/config.json: + + The following folders: + * log for log outputs + * staging where the files are initially downloaded + * netcdf for the compressed files + * era5_derived if you are downloading ERA% derived products + * And update directories paths and the sqlite db name + * the command you want to use to download/compress the files + diff --git a/era5/__init__.py b/era5/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cdsapi/__init__.py b/era5/cdsapi/__init__.py similarity index 100% rename from cdsapi/__init__.py rename to era5/cdsapi/__init__.py diff --git a/cdsapi/api.py b/era5/cdsapi/api.py similarity index 100% rename from cdsapi/api.py rename to era5/cdsapi/api.py diff --git a/cli.py b/era5/cli.py similarity index 55% rename from cli.py rename to era5/cli.py index c383e4d..9063067 100644 --- a/cli.py +++ b/era5/cli.py @@ -28,15 +28,30 @@ # depends on cdapi.py that can be downloaded from the Copernicus website # https://cds.climate.copernicus.eu/api-how-to # contact: paolap@utas.edu.au -# last updated 28/02/2019 +# last updated 09/06/2020 #!/usr/bin/python import click import os import sys +import yaml +from itertools import product as iproduct from multiprocessing.dummy import Pool as ThreadPool -from era5_update_db import db_connect, query -from era5_functions import * +#from era5.era5_update_db import db_connect, query +from era5.era5_functions import * +from era5.era5_db import query, db_connect, update_db, delete_record, variables_stats +import era5.cdsapi as cdsapi + + +def era5_catch(): + debug_logger = logging.getLogger('era5_debug') + debug_logger.setLevel(logging.CRITICAL) + try: + era5() + except Exception as e: + click.echo('ERROR: %s'%e) + debug_logger.exception(e) + sys.exit(1) def do_request(r): @@ -47,6 +62,7 @@ def do_request(r): [2] file staging path [3] file target path [4] ip for download url + [5] userid Download to staging area first, compress netcdf (nccopy) """ @@ -54,8 +70,11 @@ def do_request(r): fn = r[3] # the actual retrieve part + # set api key explicitly so you can alternate + with open(f'/mnt/pvol/era5/.cdsapirc{r[5]}', 'r') as f: + credentials = yaml.safe_load(f) # create client instance - c = cdsapi.Client() + c = cdsapi.Client(url=credentials['url'], key=credentials['key'], verify=1) era5log.info(f'Requesting {tempfn} ... ') era5log.info(f'Request: {r[1]}') # need to capture exceptions @@ -78,11 +97,26 @@ def do_request(r): if f'.{ip}/' in res.location: url = res.location.replace(f'.{ip}/', f'.{r[4]}/') if file_down(url, tempfn, size, era5log): # successful - # do some compression on the file - assuming 1. it's netcdf, 2. that nccopy will fail if file is corrupt - era5log.info(f'Compressing {tempfn} ...') - cmd = f"{cfg['nccmd']} {tempfn} {fn}" + # if netcdf compress file, assuming it'll fail if file is corrupted + # if tgz untar, if zip unzip + # if grib skip + if tempfn[-3:] == '.nc': + era5log.info(f'Compressing {tempfn} ...') + cmd = f"{cfg['nccmd']} {tempfn} {fn}" + elif tempfn[-4:] == '.tgz': + era5log.info(f'Untarring and concatenating {tempfn} ...') + base = os.path.dirname(tempfn) + cmd = f"{cfg['untar']} {tempfn} -C {base}; {cfg['concat']} {base}/*.nc {fn[:-4]}.nc; rm {base}/*.nc" + elif tempfn[-4:] == '.zip': + era5log.info(f'Unzipping and concatenating {tempfn} ...') + base = os.path.dirname(tempfn) + cmd = f"unzip {tempfn} -d {base}; {cfg['concat']} {base}/*.nc {fn[:-4]}.nc; rm {base}/*.nc" + else: + cmd = "echo 'nothing to do'" + era5log.debug(f"{cmd}") p = sp.Popen(cmd, shell=True, stdout=sp.PIPE, stderr=sp.PIPE) out,err = p.communicate() + era5log.debug(f"Popen out/err: {out}, {err}") if not p.returncode: # check was successful era5log.info(f'ERA5 download success: {fn}') else: @@ -90,8 +124,7 @@ def do_request(r): return - -def api_request(update, oformat, stream, params, yr, mntlist, tstep, back): +def api_request(oformat, stream, params, yr, mntlist, tstep, back): """ Build a list of CDSapi requests based on arguments Call do_request to submit them and start parallel download If download successful, compress file and move to era5/netcdf @@ -102,33 +135,41 @@ def api_request(update, oformat, stream, params, yr, mntlist, tstep, back): rqlist = [] # list of faster ips to alternate ips = cfg['altips'] + users = cfg['users'] i = 0 - # assign year and list of months - if type(yr) is list: - yrs = yr - else: - yrs = [yr] - + # list of years when ERA5.1 should be donwloaded instead of ERA5 + era51 = [str(y) for y in range(2000,2007)] if mntlist == []: mntlist = ["%.2d" % i for i in range(1,13)] # retrieve stream arguments dsargs = define_args(stream, tstep) era5log.debug(f'Stream attributes: {dsargs}') # get variables details from json file - vardict = read_vars() + vardict = read_vars(stream) # define params to download - if update and params == []: + if params == []: params = dsargs['params'] + era5log.debug(f'Params: {params}') # according to ECMWF, best to loop through years and months and do either multiple # variables in one request, or at least loop through variables in the innermost loop. - for y in yrs: + for y in yr: + era5log.debug(f'Year: {y}') + # change dsid if pressure and year between 2000 and 2006 included + mars = False + if y in era51 and stream == 'pressure': + era5log.debug(f'Submitting using mars for ERA5.1') + mars = True + dsargs = define_args(stream+"51", tstep) + dsargs['dsid'] = 'reanalysis-era5.1-complete' # build Copernicus requests for each month and submit it using cdsapi modified module for mn in mntlist: + era5log.debug(f'Month: {mn}') # for each output file build request and append to list # loop through params and months requested for varp in params: + era5log.debug(f'Param: {varp}') queue, var, cdsname = define_var(vardict, varp, era5log) # if grib code exists but cds name is not defined skip var and print warning if not queue: @@ -142,22 +183,27 @@ def api_request(update, oformat, stream, params, yr, mntlist, tstep, back): nclist += query(conn, sql, tup) era5log.debug(nclist) - stagedir, destdir, fname, daylist = target(stream, var, y, mn, dsargs, tstep, back) + stagedir, destdir, fname, daylist = target(stream, var, + y, mn, dsargs, tstep, back, oformat) # if file already exists in datadir then skip if file_exists(fname, nclist): era5log.info(f'Skipping {fname} already exists') continue - rdict = build_dict(dsargs, y, mn, cdsname, daylist, oformat, tstep, back) + if mars: + rdict = build_mars(dsargs, y, mn, varp, oformat, tstep, back) + else: + rdict = build_dict(dsargs, y, mn, cdsname, daylist, oformat, tstep, back) rqlist.append((dsargs['dsid'], rdict, os.path.join(stagedir,fname), - os.path.join(destdir, fname), ips[i % len(ips)])) - # progress index to alternate between ips + os.path.join(destdir, fname), ips[i % len(ips)], + users[i % len(users)])) + # progress index to alternate between ips and users i+=1 era5log.info(f'Added request for {fname}') if back: + era5log.debug(f'Breaking cycle back is True') break era5log.debug(f'{rqlist}') - # parallel downloads if len(rqlist) > 0: # set num of threads = number of params, or use default from config @@ -188,55 +234,64 @@ def era5(debug): def common_args(f): constraints = [ - click.option('--queue', '-q', is_flag=True, default=False, - help="Create json file to add request to queue"), - click.option('--stream', '-s', required=True, type=click.Choice(['surface','wave','pressure', 'land']), - help="ECMWF stream currently operative analysis surface, pressure levels, wave model and ERA5 land"), - click.option('--year', '-y', multiple=True, required=True, - help="year to download"), click.option('--month', '-m', multiple=True, help="month/s to download, if not specified all months for year will be downloaded "), - click.option('--timestep', '-t', type=click.Choice(['mon','hr']), default='hr', + click.option('--timestep', '-t', type=click.Choice(['mon','hr','day']), default='hr', help="timestep hr or mon, if not specified hr"), + click.option('--format', 'oformat', type=click.Choice(['grib','netcdf','zip','tgz']), default='netcdf', + help="Format output: grib, nc (for netcdf), tgz (compressed tar file) or zip") + ] + for c in reversed(constraints): + f = c(f) + return f + + +def download_args(f): + '''Arguments to use with db sub-command ''' + constraints = [ + click.option('--stream', '-s', required=True, + type=click.Choice(['surface','wave','pressure', 'land', 'cems_fire', 'agera5', 'wfde5']), + help="ECMWF stream currently operative analysis surface, pressure levels, "+\ + "wave model, ERA5 land, CESM_Fire, AgERA5, WFDE5"), + click.option('--param', '-p', multiple=True, + help="Grib code parameter for selected variable, pass as param.table i.e. 132.128. If not passed all parameters in .json will be downloaded"), + click.option('--year', '-y', multiple=True, required=True, + help="year to download"), + click.option('--queue', '-q', is_flag=True, default=False, + help="Create json file to add request to queue"), click.option('--back', '-b', is_flag=True, default=False, - help="Request backwards all years and months as one file, works only for monthly data"), - click.option('--format', 'oformat', type=click.Choice(['grib','netcdf']), default='netcdf', - help="Format output: grib or netcdf") + help="Request backwards all years and months as one file, works only for monthly or daily data"), + click.option('--urgent', '-u', is_flag=True, default=False, + help="high priority request, default False, if specified request is saved in Urgent folder which is pick first by wrapper. Works only for queued requests.") ] for c in reversed(constraints): f = c(f) return f -@era5.command() -@common_args -@click.option('--param', '-p', multiple=True, - help="Grib code parameter for selected variable, pass as param.table i.e. 132.128. If not passed all parameters for the stream will be updated") -def update(oformat, param, stream, year, month, timestep, back, queue): - """ - Update ERA5 variables, to be used for regular monthly update - if passing only the stream argument without params it will update - all the variables listed in the era5__.json file. - \f - Grid and other stream settings are in the era5__.json file. - """ - ####I'm separating this in update and , so eventually update can check if no yr/mn passed or only yr passed which was the last month downloaded - - update = True - if back: - print('You cannot use the backwards option with update') - sys.exit() - if queue: - dump_args(update, oformat, stream, list(param), year, list(month), timestep, back) - else: - api_request(update, oformat, stream, list(param), year, list(month), timestep, back) +def db_args(f): + '''Arguments to use with db sub-command ''' + constraints = [ + click.option('--stream', '-s', required=False, + type=click.Choice(['surface','wave','pressure', 'land', 'cems_fire', 'agera5', 'wfde5']), + help="ECMWF stream currently operative analysis surface, pressure levels, "+\ + "wave model, ERA5 land, CESM_Fire, AgERA5, WFDE5. It is required in list and delete mode."), + click.option('--param', '-p', multiple=True, + help="Variable name. At least one value is required in delete mode. If not passed in list mode all variables .json will be listed"), + click.option('--year', '-y', multiple=True, required=False, + help="year to download"), + click.option('-a','--action', type=click.Choice(['list','delete','update']), default='update', + help="db subcommand running mode: `update` (default) updates the db, `delete` deletes a record from db, `list` list all variables in db for the stream") + ] + for c in reversed(constraints): + f = c(f) + return f @era5.command() @common_args -@click.option('--param', '-p', multiple=True, required=True, - help="Grib code parameter for selected variable, pass as param.table i.e. 132.128") -def download(oformat, param, stream, year, month, timestep, back, queue): +@download_args +def download(oformat, param, stream, year, month, timestep, back, queue, urgent): """ Download ERA5 variables, to be preferred if adding a new variable, @@ -246,14 +301,19 @@ def download(oformat, param, stream, year, month, timestep, back, queue): \f Grid and other stream settings are in the era5__.json files. """ - update = False - if back and timestep != 'mon': - print('You can the backwards option only with monthly data') + if back and stream != 'wfde5' and timestep not in ['mon', 'day']: + print('You can the backwards option only with monthly and some daily data') + sys.exit() + valid_format = list(iproduct(['tgz','zip'],['cems_fire','agera5', 'wfde5'])) + valid_format.extend( list(iproduct( ['netcdf', 'grib'], + ['pressure','surface','land','wave']))) + if (oformat,stream) not in valid_format: + print(f'Download format {oformat} not available for {stream} product') sys.exit() if queue: - dump_args(update, oformat, stream, list(param), year, list(month), timestep, back) + dump_args(oformat, stream, list(param), list(year), list(month), timestep, back, urgent) else: - api_request(update, oformat, stream, list(param), year, list(month), timestep, back) + api_request(oformat, stream, list(param), list(year), list(month), timestep, back) @era5.command() @@ -265,10 +325,34 @@ def scan(infile): """ with open(infile, 'r') as fj: args = json.load(fj) - api_request(args['update'], args['format'], args['stream'], + api_request( args['format'], args['stream'], args['params'], args['year'], args['months'], args['timestep'], args['back']) +@era5.command() +@common_args +@db_args +def db(oformat, param, stream, year, month, timestep, action): + """ + Work on database, options are + - update database, + - delete record, + - build variables list for a stream, check how many files are on disk, + on db and how many are expected for each of them + - check missing files for a variable + """ + + if action == 'update': + update_db(cfg, stream, timestep, list(param)) + elif action == 'delete': + if not stream or not param: + print('A stream and at least one variable should be selected: -s -p ') + sys.exit() + delete_record(cfg, stream, list(param), list(year), list(month), timestep) #, oformat) + else: + varlist = [] + variables_stats(cfg, stream, timestep, list(param)) + if __name__ == '__main__': era5() diff --git a/config.json.tmp b/era5/data/config.json.tmp similarity index 63% rename from config.json.tmp rename to era5/data/config.json.tmp index ce6a11f..897657a 100644 --- a/config.json.tmp +++ b/era5/data/config.json.tmp @@ -3,13 +3,17 @@ "datadir": "...../ub4/era5/netcdf", "staging": "...../ub4/era5/staging", "logdir": "../log", + "requestdir": "....../era5/Requests/", "getcmd": "curl -o", "resumewget": "wget -c -O", "resumecmd": "curl -C - -o", "retry": 5, "qccmd": "ncdump -h", "nccmd": "nccopy -k 4 -d5 -s", + "untar": "tar -xf", + "concat": "cdo --history -L -s -f nc4c -z zip_5 cat -setreftime,1900-01-01,00:00:00", "db": "..../era5.sqlite", - "slowips": [ "198" ], - "altips": [ "105", "110", "153", "201", "210", "235", "236" ] + "slowips": [ "198", "201" ], + "altips": [ "105", "110", "153", "201", "210", "235", "236" ], + "users": [ "1", "2", "3" ] } diff --git a/era5/data/era5_agera5_day.json b/era5/data/era5_agera5_day.json new file mode 100644 index 0000000..678e48d --- /dev/null +++ b/era5/data/era5_agera5_day.json @@ -0,0 +1,13 @@ +{ + "area": "90/-180/-90/179.9", + "dsid": "sis-agrometeorological-indicators", + "grid": "global", + "format": "tgz", + "statistics": [ + "24_hour_maximum", "24_hour_mean", "24_hour_minimum", + "day_time_maximum", "day_time_mean", "night_time_mean", + "night_time_minimum"], + "levels": [], + "params": ["cloud_cover" + ] +} diff --git a/era5/data/era5_cems_fire_day.json b/era5/data/era5_cems_fire_day.json new file mode 100644 index 0000000..9021182 --- /dev/null +++ b/era5/data/era5_cems_fire_day.json @@ -0,0 +1,15 @@ +{ + "area": "90/-180/-90/179.75", + "dsid": "cems-fire-historical", + "version": "3.1", + "format": "tgz", + "version": "3.1", + "product_type": "reanalysis", + "dataset": "Consolidated dataset", + "grid": "global", + "levels": [], + "params": [ + "bui", "danger_risk", "dc", "dmc", + "dsr", "ffmc", "fwi", "isi", "fdi", + "kbdi", "bi", "erc", "ic", "sc"] +} diff --git a/era5/data/era5_derived.json b/era5/data/era5_derived.json new file mode 100644 index 0000000..c04bd60 --- /dev/null +++ b/era5/data/era5_derived.json @@ -0,0 +1,24 @@ +{"bui": ["bui", "build_up_index"], + "danger_risk": ["danger_risk", "danger_risk"], + "dc": ["dc", "drought_code"], + "dmc": ["dmc", "duff_moisture_code"], + "dsr": ["dsr", "fire_daily_severity_rating"], + "ffmc": ["ffmc", "fine_fuel_moisture_code"], + "fwi": ["fwi", "fire_weather_index"], + "isi": ["isi", "initial_fire_spread_index"], + "fdi": ["fdi", "fire_danger_index"], + "kbdi": ["kbdi", "keetch_byram_drought_index"], + "bi": ["bi", "burning_index"], + "erc": ["erc", "energy_release_component"], + "ic": ["ic", "ignition_component"], + "sc": ["sc", "spread_component"], + "ASurf": ["ASurf","grid_point_altitude"], + "Tair": ["Tair", "near_surface_air_temperature"], + "Qair": ["Qair", "near_surface_specific_humidity"], + "Wind": ["Wind", "near_surface_wind_speed"], + "Rainf": ["Rainf", "rainfall_flux"], + "Snowf": ["Snowf", "snowfall_flux"], + "PSurf": ["PSurf", "surface_air_pressure"], + "LWdown": ["LWdown", "surface_downwelling_longwave_radiation"], + "SWdown": ["SWdown", "surface_downwelling_shortwave_radiation"] +} diff --git a/era5_land_hr.json b/era5/data/era5_land_hr.json similarity index 52% rename from era5_land_hr.json rename to era5/data/era5_land_hr.json index 8a7991e..3f4cc3b 100644 --- a/era5_land_hr.json +++ b/era5/data/era5_land_hr.json @@ -4,11 +4,23 @@ "grid": "global", "levels": [], "params": [ + "8.128", + "9.128", "39.128", "40.128", "41.128", "42.128", + "66.128", + "67.128", + "101.128", + "103.128", + "139.128", + "167.128", + "168.128", + "205.128", "228.128", + "235.128", + "251.228", "182.260" ] } diff --git a/era5/data/era5_land_mon.json b/era5/data/era5_land_mon.json new file mode 100644 index 0000000..dc84b97 --- /dev/null +++ b/era5/data/era5_land_mon.json @@ -0,0 +1,15 @@ +{ + "product_type":"monthly_averaged_reanalysis", + "area": "90/-180/-90/179.9", + "dsid": "reanalysis-era5-land-monthly-means", + "grid": "global", + "levels": [], + "params": [ + "146.128", + "147.128", + "176.128", + "177.128", + "205.128", + "228.128" + ] +} diff --git a/era5/data/era5_pressure51_hr.json b/era5/data/era5_pressure51_hr.json new file mode 100644 index 0000000..bed127d --- /dev/null +++ b/era5/data/era5_pressure51_hr.json @@ -0,0 +1,16 @@ +{ + "product_type":"reanalysis", + "dsid" : "reanalysis-era5.1-complete", + "area": "20/78/-57/-140", + "format" : "netcdf", + "grid" : "aus", + "levels": "1/2/3/5/7/10/20/30/50/70/100/125/150/175/200/225/250/300/350/400/450/500/550/600/650/700/750/775/800/825/850/875/900/925/950/975/1000", + "params": [ + "129.128", + "130.128", + "131.128", + "132.128", + "133.128", + "135.128", + "157.128"] + } diff --git a/era5_pressure_hr.json b/era5/data/era5_pressure_hr.json similarity index 100% rename from era5_pressure_hr.json rename to era5/data/era5_pressure_hr.json diff --git a/era5_pressure_mon.json b/era5/data/era5_pressure_mon.json similarity index 98% rename from era5_pressure_mon.json rename to era5/data/era5_pressure_mon.json index 47e1f60..d2d5c7c 100644 --- a/era5_pressure_mon.json +++ b/era5/data/era5_pressure_mon.json @@ -43,6 +43,7 @@ "1000" ], "params": [ + "129.128", "131.128", "132.128" ] diff --git a/era5/data/era5_pressure_mon_au.json b/era5/data/era5_pressure_mon_au.json new file mode 100644 index 0000000..6ad761e --- /dev/null +++ b/era5/data/era5_pressure_mon_au.json @@ -0,0 +1,47 @@ +{ + "product_type":"monthly_averaged_reanalysis", + "area": "20/78/-57/-140", + "dsid": "reanalysis-era5-pressure-levels-monthly-means", + "grid": "global", + "levels": [ + "1", + "2", + "3", + "5", + "7", + "10", + "20", + "30", + "50", + "70", + "100", + "125", + "150", + "175", + "200", + "225", + "250", + "300", + "350", + "400", + "450", + "500", + "550", + "600", + "650", + "700", + "750", + "775", + "800", + "825", + "850", + "875", + "900", + "925", + "950", + "975", + "1000" + ], + "params": [ + ] +} diff --git a/era5_surface_hr.json b/era5/data/era5_surface_hr.json similarity index 100% rename from era5_surface_hr.json rename to era5/data/era5_surface_hr.json diff --git a/era5_surface_mon.json b/era5/data/era5_surface_mon.json similarity index 97% rename from era5_surface_mon.json rename to era5/data/era5_surface_mon.json index 63e9d9d..79f7432 100644 --- a/era5_surface_mon.json +++ b/era5/data/era5_surface_mon.json @@ -20,6 +20,7 @@ "147.128", "149.128", "150.128", + "151.128", "165.128", "166.128", "167.128", diff --git a/era5_vars.json b/era5/data/era5_vars.json similarity index 96% rename from era5_vars.json rename to era5/data/era5_vars.json index dcafccc..cf9212f 100644 --- a/era5_vars.json +++ b/era5/data/era5_vars.json @@ -41,7 +41,7 @@ "203.128": ["o3", "ozone_mass_mixing_ratio"], "60.128": ["pv", "potential_vorticity"], "157.128": ["r", "relative_humidity"], - "205.128": ["ro", "runoff "], + "205.128": ["ro", "runoff"], "34.128": ["sst", "sea_surface_temperature"], "31.128": ["siconc", "sea_ice_cover"], "198.128": ["src", "skin_reservoir_content"], @@ -92,10 +92,10 @@ "131.128": ["u", "u_component_of_wind"], "132.128": ["v", "v_component_of_wind"], "135.128": ["w", "vertical_velocity"], - "39.128": ["swsl1", "volumetric_soil_water_layer_1"], - "40.128": ["swsl2", "volumetric_soil_water_layer_2"], - "41.128": ["swl1", "volumetric_soil_water_layer_3"], - "42.128": ["swsl4", "volumetric_soil_water_layer_4"], + "39.128": ["swvl1", "volumetric_soil_water_layer_1"], + "40.128": ["swvl2", "volumetric_soil_water_layer_2"], + "41.128": ["swvl3", "volumetric_soil_water_layer_3"], + "42.128": ["swvl4", "volumetric_soil_water_layer_4"], "138.128": ["vo", "vorticity"], "49.128": ["fg10", "10m_wind_gust_since_previous_post_processing"], "75.128": ["crwc", "specific_rain_water_content"], @@ -167,7 +167,7 @@ "17.128": ["alnip", "near_ir_albedo_for_direct_radiation"], "18.128": ["alnid", "near_ir_albedo_for_diffuse_radiation"], "67.128": ["lai_hv", "leaf_area_index_high_vegetation"], - "66.128": ["lai_lv", "leaf_area_index_high_vegetation"], + "66.128": ["lai_lv", "leaf_area_index_low_vegetation"], "182.260": ["evapt", "evapotranspiration"], "38.260": ["snowc", "snow_cover"], "101.228": ["evabs", "evaporation_from_bare_soil"], diff --git a/era5_wave_hr.json b/era5/data/era5_wave_hr.json similarity index 100% rename from era5_wave_hr.json rename to era5/data/era5_wave_hr.json diff --git a/era5_wave_mon.json b/era5/data/era5_wave_mon.json similarity index 100% rename from era5_wave_mon.json rename to era5/data/era5_wave_mon.json diff --git a/era5/data/era5_wfde5_hr.json b/era5/data/era5_wfde5_hr.json new file mode 100644 index 0000000..c3fbb3f --- /dev/null +++ b/era5/data/era5_wfde5_hr.json @@ -0,0 +1,11 @@ +{ + "area": "90/-180/-90/179.75", + "dsid": "derived-near-surface-meteorological-variables", + "format": "tgz", + "reference_dataset": "cru", + "grid": "global", + "levels": [], + "params": [ + "Rainf", "Tair", "SWdown", "LWdown", + "Wind", "Snowf", "PSurf", "Qair"] +} diff --git a/era5/era5_db.py b/era5/era5_db.py new file mode 100644 index 0000000..4a978d4 --- /dev/null +++ b/era5/era5_db.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python +# Copyright 2019 ARC Centre of Excellence for Climate Extremes (CLEx) and NCI +# Author: Paola Petrelli for CLEx +# Matt Nethery for NCI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Crawl era5 netcdf directories and update ERA db with new files found +# contact: paolap@utas.edu.au +# last updated 28/04/2020 + +import os +import sqlite3 +from datetime import datetime +from glob import glob +from itertools import repeat +from era5.era5_functions import define_args, read_vars +import sys + + +def db_connect(cfg): + """ connect to ERA5 files sqlite db + """ + return sqlite3.connect(cfg['db'], timeout=10, isolation_level=None) + + +def create_table(conn): + """ create file table if database doesn't exists or empty + :conn connection object + """ + file_sql = 'CREATE TABLE IF NOT EXISTS file( filename TEXT PRIMARY KEY, location TEXT, ncidate TEXT, size INT);' + try: + c = conn.cursor() + c.execute(file_sql) + except Error as e: + print(e) + + +def query(conn, sql, tup): + """ generic query + """ + with conn: + c = conn.cursor() + c.execute(sql, tup) + return [ x[0] for x in c.fetchall() ] + + +def crawl(g, xl, basedir): + """ Crawl base directory for all netcdf files + """ + # get stats for all files not in db yet + tsfmt = '%FT%T' + file_list = [] + for f in g: + dn, fn = os.path.split(f) + if not fn in xl: + s = os.stat(f) + l = dn.replace(basedir + '/', '') + ts = datetime.fromtimestamp(s.st_mtime).strftime(tsfmt) + file_list.append( (fn, l, ts, s.st_size) ) + return file_list + + +def set_query(st, var, tstep, yr='____', mn='%'): + """ Set up the sql query based on constraints + """ + if st in ['wfde5','cems_fire','agera5']: + fname = f'{var}_%_{yr}{mn}01_%.nc' + location = f'{st}/{var}' + elif tstep == 'hr': + fname = f'{var}_%_{yr}{mn}01_%.nc' + location = f'{st}/{var}/{yr}' + else: + fname = f'{var}_%_mon_%_{yr}{mn}.nc' + location = f'{st}/{var}/monthly' + return fname, location + + +def get_basedir(cfg, stream): + """ Return base directory base don stream + """ + if stream in ['cems_fire', 'agera5', 'wfde5']: + return cfg['derivdir'] + else: + return cfg['datadir'] + + +def list_files(basedir, match): + """ List all matching files for given base dir, stream and tstep + """ + fsmatch = match.replace("____", "????") + fsmatch = fsmatch.replace("%", "*") + d = os.path.join(basedir, fsmatch, "*.nc") + print(f'Searching on filesystem: {d} ...') + g = glob(d) + print(f'Found {len(g)} files.') + return g + + +def exp_files(stream, tstep): + """ Return expected number of files to date for stream/tstep + """ + #wfde5 and agera5 have fixed lengths + if stream in ['wfde5', 'agera5']: + return 40 + # assume start year as 1979 and no delay + delay = 0 + start_yr = 1979 + # work out current month and delay from date, after 15th of month we should have downloaded last month unless delay + today = datetime.today().strftime('%Y-%m-%d') + yr, mn, day = today.split('-') + end_yr = int(yr) + if int(day) <= 15: + end_mn = int(mn) - 2 + else: + end_mn = int(mn) - 1 + # land starts currently from 1981 + if stream in ['land']: + start_yr = 1981 + # cems_fire, land and all the monthly are delayed an extra month + if stream in ['cems_fire', 'land'] or tstep == 'mon': + delay = -1 + if tstep == 'hr': + # should have 12 files x year + monthly for current year + nfiles = (end_yr - start_yr)*12 + end_mn + delay + elif tstep == 'mon': + # should have in total 1 file for past years + 1 for each month + nfiles = 1 + end_mn + delay + else: + # should have 1 file x year + monthly for current year + nfiles = end_yr - start_yr + end_mn + delay + return nfiles + + +def compare(conn, basedir, match, var, nfiles): + """ + """ + # get a list of matching files on filesystem + fs_list = list_files(basedir, match) + total = len(fs_list) + #print(f'Found {total} files for {var}.') + # up to here + if nfiles == total: + print(f'All expected files are present for {var}\n') + elif nfiles > total: + print(f'{nfiles-total} files are missing for {var}\n') + else: + print(f'{total-nfiles} extra files than expected for {var}\n') + # get a list of matching files in db + sql = f"SELECT filename FROM file AS t WHERE t.location LIKE '{match}' ORDER BY filename ASC" + xl = query(conn, sql, ()) + print(f'Records already in db: {len(xl)}') + + +def update_db(cfg, stream, tstep, var): + # read configuration and open ERA5 files database + conn = db_connect(cfg) + create_table(conn) + + # List all netcdf files in datadir and derivdir + if not stream: + sql = 'SELECT filename FROM FILE ORDER BY filename ASC' + fs_list = list_files(cfg['datadir'],'*/*/*') + fs_list2 = list_files(cfg['derivdir'],'*/*') + else: + fs_list = [] + if not var: + var=['%'] + for v in var: + fname, location = set_query(stream, v, tstep) + basedir = get_basedir(cfg, stream) + sql = f"SELECT filename FROM file AS t WHERE t.location LIKE '{location}' ORDER BY filename ASC" + fs_list.extend(list_files(basedir, location)) + + xl = query(conn, sql, ()) + if not stream: + stats_list = crawl(fs_list, xl, cfg['datadir']) + stats_list.extend( crawl(fs_list2, xl, cfg['derivdir'])) + fs_list.extend(fs_list2) + else: + stats_list = crawl(fs_list, xl, basedir) + print(f'Records already in db: {len(xl)}') + print(f'New files found: {len(fs_list)-len(xl)}') + # insert into db + if len(stats_list) > 0: + print('Updating db ...') + with conn: + c = conn.cursor() + sql = 'INSERT OR IGNORE INTO file (filename, location, ncidate, size) values (?,?,?,?)' + #debug(sql) + c.executemany(sql, stats_list) + c.execute('select total_changes()') + print('Rows modified:', c.fetchall()[0][0]) + print('--- Done ---') + + +def variables_stats(cfg, stream, tstep, varlist=[]): + """ + """ + # read configuration and open ERA5 files database + conn = db_connect(cfg) + + # If no variable is selected list all variables in stream json file + dsargs = define_args(stream, tstep) + if not any(varlist): + varlist=[] + vardict = read_vars(stream) + print('Variables currently updated for this stream are:\n') + for code in dsargs['params']: + print(f'{vardict[code][0]} - {vardict[code][1]} - {code}') + varlist.append(vardict[code][0]) + # calculate expected number of files + nfiles = exp_files(stream, tstep) + basedir = get_basedir(cfg, stream) + + for var in varlist: + fname, location = set_query(stream, var, tstep) + compare(conn, basedir, location, var, nfiles) + + +def delete_record(cfg, st, var, yr, mn, tstep): + # connect to db + conn = db_connect(cfg) + mn, yr, var = tuple(['%'] if not x else x for x in [mn, yr, var] ) + + # Set up query + for v in var: + for y in yr: + for m in mn: + fname, location = set_query(st, v, tstep, y, m) + sql = f'SELECT filename FROM file WHERE file.location="{location}" AND file.filename LIKE "{fname}"' + print(sql) + xl = query(conn, sql, ()) + print(f'Selected records in db: {xl}') + # Delete from db + if len(xl) > 0: + confirm = input('Confirm deletion from database: Y/N ') + if confirm == 'Y': + print('Updating db ...') + for fname in xl: + with conn: + c = conn.cursor() + sql = f'DELETE from file where filename="{fname}" AND location="{location}"' + c.execute(sql) + c.execute('select total_changes()') + print('Rows modified:', c.fetchall()[0][0]) + return diff --git a/era5_delete_record.py b/era5/era5_delete_record.py similarity index 98% rename from era5_delete_record.py rename to era5/era5_delete_record.py index ef5735e..9ddd1a9 100644 --- a/era5_delete_record.py +++ b/era5/era5_delete_record.py @@ -26,7 +26,7 @@ from glob import glob from datetime import datetime -from era5_functions import read_config +from era5.era5_functions import read_config def parse(): diff --git a/era5_functions.py b/era5/era5_functions.py similarity index 65% rename from era5_functions.py rename to era5/era5_functions.py index a98d286..2c064b5 100644 --- a/era5_functions.py +++ b/era5/era5_functions.py @@ -21,11 +21,11 @@ import logging import json -import cdsapi -from calendar import monthrange -from datetime import datetime import os +import pkg_resources import subprocess as sp +from calendar import monthrange +from datetime import datetime def config_log(debug): @@ -76,7 +76,8 @@ def read_config(): Read config from config.json file """ try: - with open('config.json','r') as fj: + cfg_file = pkg_resources.resource_filename(__name__, 'data/config.json') + with open(cfg_file,'r') as fj: cfg = json.load(fj) except FileNotFoundError: print(f"Can't find file config.json in {os.getcwd()}") @@ -109,15 +110,20 @@ def define_args(stream, tstep): ''' Return parameters and levels lists and step, time depending on stream type''' # this import the stream_dict dictionary : ['time','step','params','levels'] # I'm getting the information which is common to all pressure/surface/wave variables form here, plus a list of the variables we download for each stream - with open(f'era5_{stream}_{tstep}.json', 'r') as fj: + stream_file = pkg_resources.resource_filename(__name__, f'data/era5_{stream}_{tstep}.json') + with open(stream_file, 'r') as fj: dsargs = json.load(fj) return dsargs -def read_vars(): +def read_vars(stream): """Read parameters info from era5_vars.json file """ - with open('era5_vars.json','r') as fj: + if stream in ['cems_fire', 'wfde5']: + var_file = pkg_resources.resource_filename(__name__, 'data/era5_derived.json') + else: + var_file = pkg_resources.resource_filename(__name__, 'data/era5_vars.json') + with open(var_file,'r') as fj: vardict = json.load(fj) return vardict @@ -139,19 +145,54 @@ def build_dict(dsargs, yr, mn, var, daylist, oformat, tstep, back): 'area' : dsargs['area']} if 'product_type' in dsargs.keys(): rdict['product_type'] = dsargs['product_type'] + if 'version' in dsargs.keys(): + rdict['version'] = dsargs['version'] + if 'dataset' in dsargs.keys(): + rdict['dataset'] = dsargs['dataset'] + if 'reference_dataset' in dsargs.keys(): + rdict['reference_dataset'] = dsargs['reference_dataset'] if dsargs['levels'] != []: rdict['pressure_level']= dsargs['levels'] if tstep == 'mon': rdict['time'] = '00:00' - if back: - rdict['month'] = ["%.2d" % i for i in range(1,13)] - if dsargs['dsid'] == 'reanalysis-era5-land-monthly-means': - rdict['year'] = ["%.2d" % i for i in range(1981,2019)] - elif dsargs['dsid'] == 'reanalysis-era5-single-levels-monthly-means': - rdict['year'] = ["%.2d" % i for i in range(1979,2020)] - else: + elif tstep == 'day': + rdict['day'] = daylist + elif tstep == 'hr' and dsargs['dsid'][:12] != 'derived-near': rdict['day'] = daylist rdict['time'] = timelist + # for pressure mon, fire and agera5 daily download a yera at the time + # for surface monthly donwload all years fully available + if back: + rdict['month'] = ["%.2d" % i for i in range(1,13)] + if dsargs['dsid'] == 'reanalysis-era5-land-monthly-means': + rdict['year'] = ["%.2d" % i for i in range(1981,2020)] + elif dsargs['dsid'] == 'reanalysis-era5-single-levels-monthly-means': + rdict['year'] = ["%.2d" % i for i in range(1979,2020)] + return rdict + +def build_mars(dsargs, yr, mn, param, oformat, tstep, back): + """ Create request for MARS """ + rdict={ 'param' : param, + 'levtype': 'pl', + 'type': 'an', + 'grid' : '0.25/0.25', + 'format' : oformat, + 'area' : dsargs['area']} + datestr = f'{yr}-{mn}-01/to/{yr}-{mn}-{monthrange(int(yr),int(mn))[1]}' + if tstep == 'mon': + rdict['time'] = '00:00' + rdict['stream'] = 'moda' + if back: + datestr = '' + for m in range(1,13): + datestr = (datestr+yr+(str(m)).zfill(2)+'01/') + datestr = datestr[:-1] + else: + rdict['stream'] = 'oper' + rdict['time'] = '00:00:00/01:00:00/02:00:00/03:00:00/04:00:00/05:00:00/06:00:00/07:00:00/08:00:00/09:00:00/10:00:00/11:00:00/12:00:00/13:00:00/14:00:00/15:00:00/16:00:00/17:00:00/18:00:00/19:00:00/20:00:00/21:00:00/22:00:00/23:00:00' + if dsargs['levels'] != []: + rdict['levelist']= dsargs['levels'] + rdict['date'] = datestr return rdict @@ -185,34 +226,46 @@ def file_down(url, tempfn, size, era5log): return False -def target(stream, var, yr, mn, dsargs, tstep, back): +def target(stream, var, yr, mn, dsargs, tstep, back, oformat): """Build output paths and filename, build list of days to process based on year and month """ + # temporary fix to go from netcdf to nc + if oformat == 'netcdf': oformat='nc' # did is era5land for land stream and era5 for anything else did = 'era5' - if stream == 'land': - did+='land' + if stream in ['cems_fire','agera5', 'wfde5']: + did=stream + elif stream == 'land': + did+=stream # set output path + ydir = yr + # if monthly data change ydir and create empty daylist if tstep == 'mon': - ydir = 'monthly' - fname = f"{var}_{did}_mon_{dsargs['grid']}_{yr}{mn}.nc" daylist = [] + ydir = 'monthly' + else: + daylist = define_dates(yr,mn) + if tstep in ['mon','day'] or stream == 'wfde5': + fname = f"{var}_{did}_{tstep}_{dsargs['grid']}_{yr}{mn}.{oformat}" if back: if stream == 'land': - fname = f"{var}_{did}_mon_{dsargs['grid']}_198101_201812.nc" - if stream == 'pressure': - fname = f"{var}_{did}_mon_{dsargs['grid']}_{yr}01_{yr}12.nc" + fname = f"{var}_{did}_{tstep}_{dsargs['grid']}_198101_201912.{oformat}" + elif stream == 'pressure': + fname = f"{var}_{did}_{tstep}_{dsargs['grid']}_{yr}01_{yr}12.{oformat}" + elif stream in ['cems_fire', 'agera5', 'wfde5']: + fname = f"{var}_{did}_{tstep}_{dsargs['grid']}_{yr}0101_{yr}1231.{oformat}" else: - fname = f"{var}_{did}_mon_{dsargs['grid']}_197901_201912.nc" + fname = f"{var}_{did}_{tstep}_{dsargs['grid']}_197901_201912.{oformat}" else: - ydir = yr # define filename based on var, yr, mn and stream attributes startmn=mn - daylist = define_dates(yr,mn) - fname = f"{var}_{did}_{dsargs['grid']}_{yr}{startmn}{daylist[0]}_{yr}{mn}{daylist[-1]}.nc" + fname = f"{var}_{did}_{dsargs['grid']}_{yr}{startmn}{daylist[0]}_{yr}{mn}{daylist[-1]}.{oformat}" stagedir = os.path.join(cfg['staging'],stream, var,ydir) - destdir = os.path.join(cfg['datadir'],stream,var,ydir) + if stream in ['cems_fire','agera5','wfde5']: + destdir = os.path.join(cfg['derivdir'],stream,var) + else: + destdir = os.path.join(cfg['datadir'],stream,var,ydir) # create path if required if not os.path.exists(stagedir): os.makedirs(stagedir) @@ -221,13 +274,15 @@ def target(stream, var, yr, mn, dsargs, tstep, back): return stagedir, destdir, fname, daylist -def dump_args(up, of, st, ps, yr, mns, tstep, back): +def dump_args(of, st, ps, yr, mns, tstep, back, urgent): """ Create arguments dictionary and dump to json file """ tstamp = datetime.now().strftime("%Y%m%d%H%M%S") fname = f'era5_request_{tstamp}.json' + requestdir = cfg['requestdir'] + if urgent: + requestdir += 'Urgent/' args = {} - args['update'] = up args['format'] = of args['stream'] = st args['params'] = ps @@ -235,7 +290,7 @@ def dump_args(up, of, st, ps, yr, mns, tstep, back): args['months'] = mns args['timestep'] = tstep args['back'] = back - with open(fname, 'w+') as fj: + with open(requestdir + fname, 'w+') as fj: json.dump(args, fj) return diff --git a/era5_update_db.py b/era5/era5_update_db.py similarity index 77% rename from era5_update_db.py rename to era5/era5_update_db.py index 499042a..cec4897 100644 --- a/era5_update_db.py +++ b/era5/era5_update_db.py @@ -23,7 +23,7 @@ import os from datetime import datetime import sqlite3 -from era5_functions import read_config +from era5.era5_functions import read_config def db_connect(cfg): """ connect to ERA5 files sqlite db @@ -49,45 +49,52 @@ def query(conn, sql, tup): c.execute(sql, tup) return [ x[0] for x in c.fetchall() ] +def crawl(g, xl, datadir): + """ crawl base directory for all netcdf files + """ + # get stats for all files not in db yet + tsfmt = '%FT%T' + file_list = [] + for f in g: + dn, fn = os.path.split(f) + if not fn in xl: + s = os.stat(f) + l = dn.replace(datadir + '/', '') + ts = datetime.fromtimestamp(s.st_mtime).strftime(tsfmt) + file_list.append( (fn, l, ts, s.st_size) ) + return file_list + def main(): # read configuration and open ERA5 files database cfg = read_config() conn = db_connect(cfg) create_table(conn) - # List all netcdf files in datadir - d = os.path.join(cfg['datadir'], '*/*/*/*.nc') - print(f'Searching: {d} ...') - g = glob(d) - print(f'Found {len(g)} files.') - # get a list of files already in db sql = 'select filename from file order by filename asc' xl = query(conn, sql, ()) print(f'Records already in db: {len(xl)}') - # get stats for all files not in db yet - tsfmt = '%FT%T' - fl = [] - for f in g: - dn, fn = os.path.split(f) - if not fn in xl: - s = os.stat(f) - l = dn.replace(cfg['datadir'] + '/', '') - ts = datetime.fromtimestamp(s.st_mtime).strftime(tsfmt) - fl.append( (fn, l, ts, s.st_size) ) - - print(f'New files found: {len(fl)}') + # List all netcdf files in datadir and derivdir + file_list = [] + d1 = os.path.join(cfg['datadir'], '*/*/*/*.nc') + d2 = os.path.join(cfg['derivdir'], '*/*/*.nc') + for d,datadir in [(d1,cfg['datadir']), (d2,cfg['derivdir'])]: + print(f'Searching: {d} ...') + g = glob(d) + print(f'Found {len(g)} files.') + file_list.extend(crawl(g,xl,datadir)) + print(f'New files found: {len(file_list)}') # insert into db - if len(fl) > 0: + if len(file_list) > 0: print('Updating db ...') with conn: c = conn.cursor() #sql = 'insert or replace into file (filename, location, ncidate, size) values (?,?,?,?)' sql = 'insert or ignore into file (filename, location, ncidate, size) values (?,?,?,?)' #print(sql) - c.executemany(sql, fl) + c.executemany(sql, file_list) c.execute('select total_changes()') print('Rows modified:', c.fetchall()[0][0]) diff --git a/era5_wrapper.sh b/era5_wrapper.sh.temp similarity index 71% rename from era5_wrapper.sh rename to era5_wrapper.sh.temp index 9217476..b1aa0f0 100755 --- a/era5_wrapper.sh +++ b/era5_wrapper.sh.temp @@ -2,16 +2,17 @@ # # Wrapper script (called by cron) # -# Usage: era5_wrapper.sh +# Usage: era5_wrapper.sh +# must correspond to an existing priority directory # set some vars LOCKFILE="/tmp/era5_wrapper.lock" TSTAMP=$(date -u +%Y%m%dT%H%M%S) -#ERRORLOG="/g/data/ub4/Work/Logs/ERA5/era5_wrapper_error.log" SCRIPTDIR=$(dirname $0) cd $SCRIPTDIR -ERRORLOG="../log/era5_wrapper_error.log" -REQUESTDIR="/mnt/ub4/Work/Scripts/Requests" +ERRORLOG="....../era5/log/era5_wrapper_error.log" +REQUESTDIR="....../era5/Requests/${1}" +COMPLETEDIR="...../era5/Requests/Completed" echo "--- Starting $0 ($TSTAMP) ---" @@ -34,21 +35,20 @@ fi # refresh requests # couple of options: git pull; or rsync ; or nothing -echo "Checking for new requests ..." +echo "Checking for new requests, priority: $1" REQUESTS="$(ls $REQUESTDIR/era5_request*.json)" # loop through list of request files and run the download command echo "Starting download ..." for J in $REQUESTS ; do echo " $J" - python3 cli.py scan -f $J 1>/dev/null 2>>$ERRORLOG + era5 scan -f $J 1>/dev/null 2>>$ERRORLOG echo " Finished , moving request $J" - mv $J ${REQUESTDIR}/Completed/$(basename "$J") - #python3 cli.py scan -f $J + mv $J ${COMPLETEDIR}/$(basename "$J") done # update sqlite db echo "Updating database ..." -python3 era5_update_db.py +era5 db echo "--- Done ---" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4e4f54b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +# Add general dependencies here +# Optional dependencies e.g. [dev] are added in `setup.cfg` +click +#sqlalchemy diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..ba988e5 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,46 @@ +[metadata] +name = era5 +author = Paola Petrelli +author-email = paola.petrelli@utas.edu.au +summary = "Download ERA5 from CDS Data Store using their cdsapi" +description-file = README.rst +licence = Apache 2.0 +classifier = + Development Status :: 3 - Alpha + Environment :: Console + Intended Audience :: Science/Research + License :: OSI Approved :: Apache Software License + Operating System :: POSIX :: Linux + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + +[files] +packages = + era5 +package-data = + era5 = data/*json + +[pbr] +autodoc_tree_index_modules = True +autodoc_tree_excludes = + setup.py + conftest.py + test + +[extras] +# Optional dependencies +dev = + pytest + sphinx + +[entry_points] +console_scripts = + era5 = era5.cli:era5_catch +# arx = clef.cli:arx + +[build_sphinx] +source-dir = docs +build-dir = docs/_build + +[tool:pytest] +addopts = --doctest-modules --doctest-glob='*.rst' --ignore setup.py --ignore conftest.py --ignore docs/conf.py diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..2107a41 --- /dev/null +++ b/setup.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# +# Uses Python Build Reasonableness https://docs.openstack.org/developer/pbr/ +# Add configuration to `setup.cfg` + +from setuptools import setup + +setup( + setup_requires=['pbr>=1.9', 'setuptools>=17.1'], + pbr=True, + )