Source code for aquarius_webportal.main

import io
import urllib.parse

import requests
from lxml import html
import pandas as pd


[docs] class AquariusWebPortal: """Access data from a deployment of Aquarius Web Portal. Args: server (str): URL of the Web Portal deployment. session (optional): requests.Session object to use The main methods to use are: - :meth:`aquarius_webportal.AquariusWebPortal.fetch_locations`: fetch metadata for all locations - :meth:`aquarius_webportal.AquariusWebPortal.fetch_datasets`: fetch metadata for datasets measuring a queried parameter - :meth:`aquarius_webportal.AquariusWebPortal.fetch_dataset`: fetch data for a single timeseries Relevant attributes of the ``AquariusWebPortal`` object are: Attributes: server (str): as initialised params (pd.DataFrame): the available parameters. If the portal is disclaimer-blocked, this will be empty (see ReadTheDocs documentation for further details) session: reqeusts.Session object """ def __init__(self, server="water.data.sa.gov.au", session=None, **kwargs): if not server.startswith("http"): server = "https://" + server if session: self.session = session else: self.session = requests.Session(**kwargs) self.server = server self.params = self.fetch_params()
[docs] def fetch_params(self, payload=None): """Fetch the list of available parameters. Returns: pd.DataFrame: a table of available parameters with these columns: - param_id (int) - param_name (str) - param_desc (str) """ r1 = self.session.post(self.server + "/Data/List/", payload) return parse_params_from_html(r1.text)
[docs] def get_param(self, param_name=None, param_desc=None, param_id=None): """Fetch/identify a single parameter from the ``params`` attribute. Args: param_name (str): select a parameter with this name param_desc (str): select a parameter with the description (note that usually the description functions as a "long name") param_id (int): select the parameter with this ID number Returns: pd.Series: the relevant row from ``self.params`` with these fields: - param_id (int) - param_name (str) - param_desc (str) """ if param_name: return self.params[self.params.param_name == param_name].iloc[0] elif param_desc: return self.params[self.params.param_desc == param_desc].iloc[0] elif param_id: return self.params[self.params.param_id == param_id].iloc[0]
[docs] def fetch_locations(self): """Fetch a list of all locations from the portal. Returns: pd.DataFrame: a table of location metadata. The available fields may vary between different portals, but these may be present: - wp_loc_id (called "LocationId" in the AQWP internal APIs) - lon (called "LocX" in the AQWP internal APIs) - lat (called "LocY" in the AQWP internal APIs) - loc_name (called "Location" in the AQWP internal APIs) - loc_id (called "LocationIdentifier" in the AQWP internal APIs) - loc_type (called "LocType" in the AQWP internal APIs) - loc_folder (called "LocationFolder" in the AQWP internal APIs) """ return self.fetch_list()
[docs] def fetch_datasets(self, param_name=None, param_desc=None, param_id=None): """Fetch a list of all datasets from the portal with a given parameter Args: param_name (str): select a parameter with this name param_desc (str): select a parameter with the description (note that usually the description functions as a "long name") param_id (int): select the parameter with this ID number Returns: pd.DataFrame: a table of dataset metadata. The available fields may vary between different portals, but these may be present: - wp_loc_id (called "LocationId" in the AQWP internal APIs) - wp_dset_id (called "DatasetId" in the AQWP internal APIs) - lon (called "LocX" in the AQWP internal APIs) - lat (called "LocY" in the AQWP internal APIs) - loc_name (called "Location" in the AQWP internal APIs) - loc_id (called "LocationIdentifier" in the AQWP internal APIs) - dset_name (called "DatasetIdentifier" in the AQWP internal APIs) - loc_type (called "LocType" in the AQWP internal APIs) - loc_folder (called "LocationFolder" in the AQWP internal APIs) - dset_start (called "StartOfRecord" in the AQWP internal APIs) - dset_end (called "EndOfRecord" in the AQWP internal APIs) - param (str) - derived from dset_name - label (str) - derived from dset_name """ if not param_id: params = self.fetch_params() if param_name in params.param_name.unique(): param_id = params[params.param_name == param_name].param_id.iloc[0] if param_desc in params.param_desc.unique(): param_id = params[params.param_desc == param_desc].param_id.iloc[0] if param_id is None: return Exception("failed to identify parameter") else: return self.fetch_list(param_id=param_id)
[docs] def fetch_list(self, param_id=None): """Internal function that fetches list data from the /Data/Data_List endpoint. Args: param_id (int): if not supplied, the list is of Locations. If supplied, the list is of Datasets/Time series. Returns: pd.DataFrame: a table of results with some columns renamed for convenience: - wp_loc_id (called "LocationId" in the AQWP internal APIs) - wp_dset_id (called "DatasetId" in the AQWP internal APIs) - lon (called "LocX" in the AQWP internal APIs) - lat (called "LocY" in the AQWP internal APIs) - loc_name (called "Location" in the AQWP internal APIs) - loc_id (called "LocationIdentifier" in the AQWP internal APIs) - dset_name (called "DatasetIdentifier" in the AQWP internal APIs) - loc_type (called "LocType" in the AQWP internal APIs) - loc_folder (called "LocationFolder" in the AQWP internal APIs) - dset_start (called "StartOfRecord" in the AQWP internal APIs) - dset_end (called "EndOfRecord" in the AQWP internal APIs) - classification (called "Classification" in the AQWP internal APIs) - bgcolor (called "Background" in the AQWP internal APIs) - seq (called "Sequence" in the AQWP internal APIs) - param (str) - derived from dset_name if the latter exists - label (str) - derived from dset_name if the latter exists Any other columns will not be renamed. """ page_size = 5000 page_no = 1 request_complete = False results = [] total_results = None n = 0 while (request_complete) is False and n < 15: query = { "page": page_no, "pageSize": page_size, } if param_id is not None: query["parameters[0]"] = param_id url = self.server + "/Data/Data_List?" + urllib.parse.urlencode(query) resp = self.session.post(url, data=query) data = resp.json() if n == 0: total_results = data["Total"] results += data["Data"] n += 1 if len(results) < total_results: page_no += 1 else: request_complete = True df = pd.DataFrame(results) df = df.rename( columns={ "LocationId": "wp_loc_id", "DatasetId": "wp_dset_id", "LocX": "lon", "LocY": "lat", "Location": "loc_name", "LocationIdentifier": "loc_id", "DatasetIdentifier": "dset_name", "LocType": "loc_type", "LocationFolder": "loc_folder", "StartOfRecord": "dset_start", "EndOfRecord": "dset_end", "Classification": "classification", "Background": "bgcolor", "Sequence": "seq", } ) if "dset_name" in df: df["param"] = df.dset_name.apply(lambda v: v.split("@")[0].split(".")[0]) df["label"] = df.dset_name.apply(lambda v: v.split("@")[0].split(".")[1]) return df
[docs] def fetch_dataset( self, dset_name, date_range=None, extra_data_types=None, start=None, finish=None, session=None, **kwargs, ): """Fetch timeseries data for a single dataset. Args: dset_name (str): the dataset name as ``param.label@location`` - you can get this from the dset_name column of the table returned by :meth:`aquarius_webportal.AquariusWebPortal.fetch_datasets` extra_data_types (str/sequence): The additional metadata fields to retrieve for each data point - either "all", None, or a sequence of strings with one or more of "grade", "approval", "qualifier", and "interpolation_type". None is the default. data_range (str): either None (the default) or "Days7" start (pd.Timestamp): earliest data to retrieve - None by default finish (pd.Timestamp): latest data to retrieve - None by default There are three ways of querying to speed things up, and these are selected depending on the values of the **date_range**, **start** and **finish** arguments: (1) Entire period of record - the default - leave the **date_range**, **start** and **finish** arguments null. (2) A custom period - leave **date_range** null and provide **start** and **finish** arguments. (3) The last week - use "Days7" for **date_range** and leave **start** and **finish** null. Returns: pd.DataFrame: a table of timeseries data. The table has a DateTimeIndex with timezone-aware timestamps. The time zone is derived from that provided by Aquarius Web portal in the header of the CSV which is downloaded in the background by this function. The first column will be the requested parameter (short) name with its unit in parentheses e.g. "Discharge (m^3/s)". Following columns will be the extra_data_types if requested. """ query = { "Calendar": "CALENDARYEAR", "Interval": "PointsAsRecorded", "Step": 1, "ExportFormat": "csv", "TimeAligned": True, "RoundData": True, "Datasets[0].DatasetName": dset_name, } if date_range is None and start is None and finish is None: query["DateRange"] = "EntirePeriodOfRecord" elif start and finish: query["DateRange"] = "Custom" query["StartTime"] = pd.Timestamp(start).strftime("%Y-%m-%d %H:%M") query["EndTime"] = pd.Timestamp(finish).strftime("%Y-%m-%d %H:%M") elif date_range == "Days7": query["DateRange"] = "Days7" if extra_data_types == "all": extra_data_types = ["grade", "approval", "qualifier", "interpolation_type"] elif not extra_data_types: extra_data_types = [] query["IncludeGradeCodes"] = (True if "grade" in extra_data_types else False,) query["IncludeApprovalLevels"] = ( True if "approval" in extra_data_types else False, ) query["IncludeQualifiers"] = ( True if "qualifier" in extra_data_types else False, ) query["IncludeInterpolationTypes"] = ( True if "interpolation_type" in extra_data_types else False, ) url = self.server + "/Export/BulkExport" skiprows = 0 resp = self.session.get(url, data=query) header = resp.text[:500].splitlines() for i, line in enumerate(header): if line.startswith("Timestamp ("): skiprows = i header_line = header[skiprows - 1].split(",") if skiprows == 0: print(f"Error:\n{header}") with io.StringIO(resp.text) as f: df = pd.read_csv( f, skiprows=4, ) cols = [ df.columns[0], df.columns[1].replace("Value", header_line[1].split(".")[0]), ] + header_line[2:] df.columns = cols index_col = cols[0] tz_offset = index_col.split("(UTC")[1][:-1].replace(":", "") df[index_col] = df[index_col] + " " + tz_offset df[index_col] = pd.to_datetime(df[cols[0]], format="%Y-%m-%d %H:%M:%S %z") df = df.set_index(index_col) return df
[docs] def parse_params_from_html(source): """Obtain a list of parameter names, descriptions, and IDs from the HTML source of a Web Portal page (either the List or Map pages will work). Returns: pd.DataFrame: a table of available parameters with these columns: - param_id (int) - param_name (str) - param_desc (str) """ root = html.document_fromstring(source) params = [] for element in root.xpath("//option[@data-code]"): attrs = element.attrib params.append( { "param_id": attrs["value"], "param_name": attrs["data-code"], "param_desc": element.text, } ) pdf = pd.DataFrame(params, columns=["param_id", "param_name", "param_desc"]) pdf = pdf[ ~pd.isnull(pdf.param_id.apply(lambda v: pd.to_numeric(v, errors="coerce"))) ] return pdf.drop_duplicates()