Module ehfheatwaves.ehfheatwaves

Main heatwave calculation module. The main program routine is executed from this module.

The procedure is:

  • parse arguments
  • load the base period data
  • calculate the percentile thresholds
  • load the remaining data
  • calculate heatwave indices
  • calculate seasonal characteristics for each hemisphere and join them back together
  • save to netCDF files.

The EHF Index is defined from Nairn et al., (2009) and Nairn and Fawcett, (2013).

EHI_{sig} = \frac{(T_i + T_{i-1} + T_{i-2})}{3} - T_{90}

EHI_{accl} = \frac{(T_i + T_{i-1} + T_{i-2})}{3} - \frac{(T_i + ... + T_{-30})}{30}

EHF = EHI_{sig} \cdot max(1, EHI_{accl})

Nairn J, Fawcett R. 2013. Defining heatwaves: heatwave defined as a heat-impact event servicing all community and business sectors in Australia. CAWCR Tech. Rep. 60: 10–15.

Nairn J, Fawcett R, Ray D. 2009. Defining and predicting excessive heat events, a national system. CAWCR Tech. Rep. 60: 83–86.

Expand source code
# -*- coding: utf-8 -*-
"""Main heatwave calculation module. The main program routine is executed from this module.

The procedure is:

- parse arguments
- load the base period data
- calculate the percentile thresholds
- load the remaining data
- calculate heatwave indices
- calculate seasonal characteristics for each hemisphere and join them back together
- save to netCDF files.

The EHF Index is defined from Nairn et al., (2009) and Nairn and Fawcett, (2013).

$$ EHI_{sig} = \\frac{(T_i + T_{i-1} + T_{i-2})}{3} - T_{90} $$

$$ EHI_{accl} = \\frac{(T_i + T_{i-1} + T_{i-2})}{3} - \\frac{(T_i + ... + T_{-30})}{30} $$

$$ EHF = EHI_{sig} \cdot max(1, EHI_{accl}) $$

Nairn J, Fawcett R. 2013. Defining heatwaves: heatwave defined as a heat-impact event servicing
all community and business sectors in Australia. CAWCR Tech. Rep. 60: 10–15.

Nairn J, Fawcett R, Ray D. 2009. Defining and predicting excessive heat events, a national system.
CAWCR Tech. Rep. 60: 83–86.
"""
import sys
import warnings

import numpy as np

import ehfheatwaves.constants as const
import ehfheatwaves.getoptions as getoptions
import ehfheatwaves.ncio as ncio
import ehfheatwaves.qtiler as qtiler
from ehfheatwaves.getoptions import options

warnings.simplefilter('ignore', category=RuntimeWarning)


class GridDescription(object):
    """Description of grid."""

    def __init__(self, lats=np.array([])):
        """## Arguments:

        - lats : Latitudes.
        """
        self.lats = lats


def window_percentile(temp:np.ndarray, daysinyear:int=365, wsize:int=15)->np.ndarray:
    """Calculate a day-of-year moving window percentile.

    ## Arguments:

    - temp : Temperature data.
    - daysinyear : Number of days in a year.
    - wsize : Number of days in moving window.
    """
    # Initialise array.
    pctl = np.ones(((daysinyear,)+temp.shape[1:]))*const.FILL_VAL

    # Construct the window.
    window = np.zeros(daysinyear, dtype=bool)
    window[-np.floor(wsize/2.).astype(int):] = 1
    window[:np.ceil(wsize/2.).astype(int)] = 1
    window = np.tile(window, options.bpend + 1 - options.bpstart)

    # Select the interpolation method.
    if options.qtilemethod=='python':
        percentile = np.percentile
        parameter = 0
    elif options.qtilemethod=='zhang':
        percentile = qtiler.quantile_zhang_fast
        parameter = False
    elif options.qtilemethod=='matlab':
        percentile = qtiler.quantile_R
        parameter = 5
    elif options.qtilemethod=='climpact':
        percentile = qtiler.quantile_climpact
        parameter = False

    # Set the percentile for each day of year.
    for day in range(daysinyear):
        pctl[day,...] = percentile(temp[window,...], options.pcntl, parameter)
        window = np.roll(window, 1)

    # Remaining nans are missing data.
    pctl[np.isnan(pctl)] = const.MISSING_VAL

    return pctl


def identify_hw(ehfs:np.ndarray)->tuple:
    """Locate heatwaves from EHF and returns an event indicator and a duration indicator.

    ## Arguments:

    - ehfs : EHF values.

    ## Returns:
    - events : array of bools for heatwave events.
    - endss : array of integers for heatwave duration.
    """
    # Agregate consecutive days with EHF>0
    # First day contains duration
    if np.isnan(ehfs).any(): # then ehfs is a view to tmax or tmin
        # This is handled differently to EHFs because tmax or tmin could theoretically hold -ve
        # values. Therefore we identify values of the exceedence index that are not nan values as
        # heatwaves.
        events = np.logical_not(np.isnan(ehfs)).astype(int)
    else: # ehfs is a view to actual EHF values
        events = (ehfs>0.0).astype(int)
        events[events.mask==True] = 0
    for i in range(events.shape[0] - 2, -1, -1):
         events[i,events[i,...]>0] = events[i+1,events[i,...]>0]+1

    # Identify when heatwaves start with duration
    # Given that first day contains duration
    diff = np.zeros(events.shape)
    # Insert the first diff value as np.diff doesn't catch it because
    # there is no pevious value to compare to.
    diff[0,...] = events[0,...]
    diff[1:,...] = np.diff(events, axis=0)
    endss = np.ma.zeros(ehfs.shape, dtype=int)
    endss[diff>2] = events[diff>2]

    # Remove events less than 3 days
    events[diff==2] = 0
    events[np.roll(diff==2, 1, axis=0)] = 0
    events[diff==1] = 0
    del diff
    events[events>0] = 1
    events = events.astype(bool)
    endss[endss<3] = 0
    return events, endss


def identify_semi_hw(ehfs:np.ndarray)->tuple:
    """identify_hw locates heatwaves from EHF and returns an event indicator and a duration
    indicator. This function does not exclude events less than three days in duration.

    ## Arguments:

    - ehfs : EHF values.

    ## Returns:

    - events : array of bools for heatwave events.
    - endss -- array of integers for heatwave duration.
    """
    # Agregate consecutive days with EHF>0
    # First day contains duration
    if np.isnan(ehfs).any(): # then ehfs is a view to tmax or tmin
        # This is handled differently to EHFs because tmax or tmin could theoretically hold -ve
        # values. Therefore we identify values of the exceedence index that are not nan values as
        # heatwaves.
        events = np.logical_not(np.isnan(ehfs)).astype(int)
    else: # ehfs is a view to actual EHF values
        events = (ehfs>0.0).astype(int)
        events[events.mask==True] = 0
    for i in range(events.shape[0] - 2, -1, -1):
         events[i,events[i,...]>0] = events[i+1,events[i,...]>0]+1

    # Identify when heatwaves start with duration
    # Given that first day contains duration
    diff = np.zeros(events.shape)
    # Insert the first diff value as np.diff doesn't catch it because
    # there is no pevious value to compare to.
    diff[0,...] = events[0,...]
    diff[1:,...] = np.diff(events, axis=0)
    endss = np.ma.zeros(ehfs.shape, dtype=int)
    endss[diff>0] = events[diff>0]
    del diff
    events[events>0] = 1
    events = events.astype(bool)
    return events, endss


def hw_aspects(EHF:np.ndarray, season:str, hemisphere:str)->tuple:
    """Call `identify_hw` and/or `identify_semi_hw` and calculate seasonal aspects.

    ## Arguments:

    - EHF : EHF values.
    - season : The season in which to calculate aspects for.
    - hemisphere : The hemisphere to calculate heatwaves for.

    ## Returns:

    - HWA : Amplitude
    - HWM : Magnitude
    - HWN : Number
    - HWF : Frequency
    - HWD : Maximum duration
    - HWT : Timing
    """
    global timedata
    # Select indices depending on calendar season and hemisphere
    if season=='summer':
        if hemisphere=='south':
            startday = timedata.SHS[0]
            endday = timedata.SHS[1]
        else:
            startday = timedata.SHW[0]
            endday = timedata.SHW[1]
    elif season=='winter':
        if hemisphere=='south':
            startday = timedata.SHW[0]
            endday = timedata.SHW[1]
        else:
            startday = timedata.SHS[0]
            endday = timedata.SHS[1]
    # Initialize arrays
    HWA = np.ones(((timedata.nyears,)+(EHF.shape[1],)))*const.FILL_VAL
    HWM = HWA.copy()
    HWN = HWA.copy()
    HWF = HWA.copy()
    HWD = HWA.copy()
    HWT = HWA.copy()
    # Loop over years
    for iyear, year in enumerate(range(timedata.first_year, timedata.daylast.year)):
        if options.oldmethod:
            if (year==timedata.daylast.year): continue # Incomplete yr
            # Select this years season
            allowance = 14 # For including heawave days after the end of the season
            ifrom = startday + timedata.daysinyear*iyear - 1 # -1 to include Oct 31st
            ito = endday + timedata.daysinyear*iyear + allowance
            EHF_i = EHF[ifrom:ito,...]
            event_i, duration_i = identify_hw(EHF_i)
            # Identify heatwaves that span the entire season
            perpetual = event_i[:-allowance,...].all(axis=0)
            perphw = duration_i[0,perpetual] - 1 # -1 to exclude Oct 31st
            # Remove events that start after the end of the season and before start
            EHF_i = EHF_i[1:,...]
            duration_i = duration_i[1:-allowance,...]
            event_i = event_i[1:-allowance,...]
            # Indicate perpetual heatwaves if they occur.
            if perpetual.any(): duration_i[0,perpetual] = perphw
        else:
            # Select this years season
            allowance = 14 # For including heawave days after the end of the season
            ifrom = startday + timedata.daysinyear*iyear - 2
            ito = endday + timedata.daysinyear*iyear + allowance
            EHF_i = EHF[ifrom:ito,...]
            event_i, duration_i = identify_hw(EHF_i)
            # Remove EHF values in pre season
            EHF_i = EHF_i[2:,...]
            # Identify semi heatwaves that overlap the start of season only including days within the season.
            event_i = event_i[2:,...]
            event_i, duration_i = identify_semi_hw(event_i)
            # Identify heatwaves that span the entire season
            perpetual = event_i[:-allowance,...].all(axis=0)
            perphw = duration_i[0,perpetual] - 1 # -1 to exclude Oct 31st
            # Indicate locations of perpetual heatwaves if they occur.
            if perpetual.any(): duration_i[0,perpetual] = perphw
            # Remove events that start after the end of the season
            duration_i = duration_i[:-allowance,...]
        # Calculate metrics
        HWN[iyear,...] = (duration_i>0).sum(axis=0)
        HWF[iyear,...] = duration_i.sum(axis=0)
        HWD[iyear,...] = duration_i.max(axis=0)
        HWT[iyear,...] = np.argmax(event_i, axis=0)
        # HWM and HWA must be done on each gridcell
        for x in range(EHF_i.shape[1]):
            hw_mag = []
            # retrieve indices where heatwaves start.
            i = np.where(duration_i[:,x]>0)[0] # time
            d = duration_i[i,x] # duration
            if (d==0).all(): continue
            for hw in range(len(d)):
                # retireve this heatwave's EHF values and mean magnitude
                hwdat = EHF_i[i[hw]:i[hw]+d[hw],x]
                hw_mag.append(np.nanmean(hwdat))
            HWM[iyear,x] = np.nanmean(hw_mag)
            # Find the hottest heatwave magnitude
            idex = np.where(hw_mag==max(hw_mag))[0][0]
            # Find that heatwave's hottest day as EHF value.
            HWA[iyear,x] = EHF_i[i[idex]:i[idex]+d[idex],x].max()
        # Locate invalid values or misisng values
        missing = EHF_i.mask.all(axis=0)
        if missing.any():
            HWT[iyear,missing] = const.MISSING_VAL
            HWN[iyear,missing] = const.MISSING_VAL
            HWF[iyear,missing] = const.MISSING_VAL
            HWD[iyear,missing] = const.MISSING_VAL
            HWA[iyear,missing] = const.MISSING_VAL
            HWM[iyear,missing] = const.MISSING_VAL
        invalid = HWN[iyear,...]==0
        HWT[iyear,invalid] = const.INVALID_VAL
        HWD[iyear,invalid] = const.INVALID_VAL
        HWA[iyear,invalid] = const.INVALID_VAL
        HWM[iyear,invalid] = const.INVALID_VAL
    return HWA, HWM, HWN, HWF, HWD, HWT


# Calculate metrics year by year
def split_hemispheres(EHF:np.ndarray, north:bool, south:bool)->tuple:
    """Split the input data by hemispheres, and glue them back together after heatwave
    calculations.

    The EHF spatial axes are reshaped into a single dimension.
    The output arrays are 2D. When saving, data should be reshaped or indexed with a land-sea mask.

    ## Arguments:

    - EHF : EHF values.
    - north : Calculate heatwaves for the northern hemisphere.
    - south : Calculate heatwaves for the southern hemisphere.

    ## Returns:

    - HWA : Amplitude
    - HWM : Magnitude
    - HWN : Number
    - HWF : Frequency
    - HWD : Maximum duration
    - HWT : Timing
    """
    lats = grid.lats
    if south:
        if options.maskfile:
            EHF_s = EHF[:,:(mask[lats<=0]>0).sum()]
        else:
            EHF_s = EHF[:,lats<=0,...]
        # Reshape to 2D
        space = EHF_s.shape[1:]
        if len(space)>1:
            EHF_s = EHF_s.reshape(EHF_s.shape[0], space[0]*space[1])
        # Southern hemisphere aspects
        HWA_s, HWM_s, HWN_s, HWF_s, HWD_s, HWT_s = hw_aspects(EHF_s, options.season, 'south')
        del EHF_s
    if north:
        if options.maskfile:
            EHF_n = EHF[:,(mask[lats<=0]>0).sum():]
        else:
            EHF_n = EHF[:,lats>0,...]
        # Reshape to 2D
        space = EHF_n.shape[1:]
        if len(space)>1:
            EHF_n = EHF_n.reshape(EHF_n.shape[0],space[0]*space[1])
        # Northern hemisphere aspects
        HWA_n, HWM_n, HWN_n, HWF_n, HWD_n, HWT_n = hw_aspects(EHF_n, options.season, 'north')
        del EHF_n
    # Glue hemispheres back together
    if north and south:
        HWA = np.append(HWA_s, HWA_n, axis=1)
        HWM = np.append(HWM_s, HWM_n, axis=1)
        HWN = np.append(HWN_s, HWN_n, axis=1)
        HWF = np.append(HWF_s, HWF_n, axis=1)
        HWD = np.append(HWD_s, HWD_n, axis=1)
        HWT = np.append(HWT_s, HWT_n, axis=1)
    elif north:
        HWA = HWA_n
        HWM = HWM_n
        HWN = HWN_n
        HWF = HWF_n
        HWD = HWD_n
        HWT = HWT_n
    elif south:
        HWA = HWA_s
        HWM = HWM_s
        HWN = HWN_s
        HWF = HWF_s
        HWD = HWD_s
        HWT = HWT_s
    return HWA, HWM, HWN, HWF, HWD, HWT


def main():
    """Main function that is called by the entry point script.

    See the following documentation for more information on entry points.
    https://setuptools.pypa.io/en/latest/userguide/entry_point.html?highlight=entry

    This function parses the command line arguments and options then calculates the heatwaves and
    saves them to netcdf files.
    """
    global grid, timedata, mask, options
    # Get the options and variables
    options = getoptions.parse_arguments(sys.argv[1:])

    # Load time data
    if options.verbose: print("Loading data")
    if options.tmaxfile:
        filename = options.tmaxfile
    elif options.tminfile:
        filename = options.tminfile
    timedata = ncio.TimeData(filename, options.timevname)
    grid = GridDescription()

    # Load land-sea mask
    if options.maskfile: mask = ncio.get_mask(options)
    else: mask = None

    # Load the temperature data over the base period
    if options.keeptave or options.keeptmax:
        tmax = ncio.load_bp_data(options, timedata, variable='tmax', mask=mask)
    if options.keeptave or options.keeptmin:
        tmin = ncio.load_bp_data(options, timedata, variable='tmin', mask=mask)
    if options.keeptave:
        tave_base = (tmax + tmin)/2.

    # Caclulate percentile
    if options.verbose: print("Calculating percentiles")
    if options.keeptave:
        tpct = window_percentile(tave_base, daysinyear=timedata.daysinyear)
    if options.keeptmax:
        txpct = window_percentile(tmax, daysinyear=timedata.daysinyear)
    if options.keeptmin:
        tnpct = window_percentile(tmin, daysinyear=timedata.daysinyear)
    if not options.noehf: del tave_base

    # Load all data
    if options.verbose: print("Loading data")
    if options.keeptave or options.keeptmax:
        tmax, grid.lats = ncio.get_all_data(options.tmaxfile, options.tmaxvname, options)
        original_shape = tmax.shape
    if options.keeptave or options.keeptmin:
        tmin, grid.lats = ncio.get_all_data(options.tminfile, options.tminvname, options)
        original_shape = tmin.shape
    if options.keeptave:
        tave = (tmax + tmin)/2.

    # Remove leap days from data
    if (timedata.calendar=='gregorian')|(timedata.calendar=='proleptic_gregorian')|(timedata.calendar=='standard'):
        if options.keeptave:
            tave = tave[(timedata.dates.month!=2)|(timedata.dates.day!=29),...]
            original_shape = (tave.shape[0], original_shape[1], original_shape[2])
        if options.keeptmax or options.keeptave:
            tmax = tmax[(timedata.dates.month!=2)|(timedata.dates.day!=29),...]
            original_shape = (tmax.shape[0], original_shape[1], original_shape[2])
        if options.keeptmin or options.keeptave:
            tmin = tmin[(timedata.dates.month!=2)|(timedata.dates.day!=29),...]
            original_shape = (tmin.shape[0], original_shape[1], original_shape[2])
        timedata.calendar = '365_day'

    # Remove incomplete starting year
    timedata.first_year = timedata.dayone.year
    if (timedata.dayone.month!=1)|(timedata.dayone.day!=1):
        timedata.first_year = timedata.dayone.year+1
        start = np.argmax(timedata.dates.year==timedata.first_year)
        if options.keeptave:
            tave = tave[start:,...]
            original_shape = (tave.shape[0], original_shape[1], original_shape[2])
        if options.keeptmax:
            tmax = tmax[start:,...]
            original_shape = (tmax.shape[0], original_shape[1], original_shape[2])
        if options.keeptmin:
            tmin = tmin[start:,...]
            original_shape = (tmin.shape[0], original_shape[1], original_shape[2])

    # Apply mask
    if options.maskfile:
        if options.keeptave:
            tave = tave[:,mask]
        if options.keeptmax:
            tmax = tmax[:,mask]
        if options.keeptmin:
            tmin = tmin[:,mask]

    if options.verbose: print("Caclulating definition")
    # Calculate EHF
    if not options.noehf:
        EHF = np.ma.ones(tave.shape)*np.nan
        for i in range(32, tave.shape[0]):
            EHIaccl = tave[i-2:i+1,...].sum(axis=0)/3.0 - tave[i-32:i-2,...].sum(axis=0)/30.0
            EHIsig = tave[i-2:i+1,...].sum(axis=0)/3.0 - tpct[i-timedata.daysinyear*int((i+1)/timedata.daysinyear),...]
            EHF[i,...] = np.ma.maximum(EHIaccl, 1.0)*EHIsig
        EHF[EHF<0] = 0
    if options.ehi:
        EHIaccl = np.ma.ones(tave.shape)*np.nan
        EHIsig = np.ma.ones(tave.shape)*np.nan
        for i in range(32, tave.shape[0]):
            EHIaccl[i,...] = tave[i-2:i+1,...].sum(axis=0)/3.0 - tave[i-32:i-2,...].sum(axis=0)/30.0
            EHIsig[i,...] = tave[i-2:i+1,...].sum(axis=0)/3.0 - tpct[i-timedata.daysinyear*int((i+1)/timedata.daysinyear),...]

    # Tx90pc exceedences
    if options.keeptmin or options.keeptmax:
        if options.keeptmax:
            txexceed = np.ma.ones(tmax.shape)*np.nan
            for i in range(0, tmax.shape[0]):
                idoy = i-timedata.daysinyear*int((i+1)/timedata.daysinyear)
                txexceed[i,...] = tmax[i,...]>txpct[idoy,...]
            txexceed[txexceed>0] = tmax[txexceed>0]
            txexceed[txexceed==0] = np.nan # need nans to be identified correctly
        if options.keeptmin:
            tnexceed = np.ma.ones(tmin.shape)*np.nan
            for i in range(0, tmin.shape[0]):
                idoy = i-timedata.daysinyear*int((i+1)/timedata.daysinyear)
                tnexceed[i,...] = tmin[i,...]>tnpct[idoy,...]
            tnexceed[tnexceed>0] = tmin[tnexceed>0]
            tnexceed[tnexceed==0] = np.nan # need nans to be identified correctly

    # Calculate daily output
    if options.dailyout or options.dailyout: event, ends = identify_hw(EHF)
    if options.tx90pcd: event_tx, ends_tx = identify_hw(txexceed)
    if options.tn90pcd: event_tn, ends_tn = identify_hw(tnexceed)

    timedata.nyears = len(range(timedata.first_year, timedata.daylast.year + 1))

    # Calculate yearly output
    if options.yearlyout:
        if options.verbose: print("Calculating yearly aspects")
        # Split by latitude
        north = (grid.lats>0).any()
        south = (grid.lats<=0).any()
        if not options.noehf:
            HWA_EHF, HWM_EHF, HWN_EHF, HWF_EHF, HWD_EHF, HWT_EHF = split_hemispheres(
                    EHF,
                    north,
                    south,
                    )
        if options.tx90pc:
            HWA_tx, HWM_tx, HWN_tx, HWF_tx, HWD_tx, HWT_tx = split_hemispheres(
                    txexceed,
                    north,
                    south,
                    )
        if options.tn90pc:
            HWA_tn, HWM_tn, HWN_tn, HWF_tn, HWD_tn, HWT_tn = split_hemispheres(
                    tnexceed,
                    north,
                    south,
                    )

    if options.verbose: print("Saving")
    # Save yearly data to netcdf
    if options.yearlyout:
        if not options.noehf:
            ncio.save_yearly(
                    HWA_EHF,
                    HWM_EHF,
                    HWN_EHF,
                    HWF_EHF,
                    HWD_EHF,
                    HWT_EHF,
                    tpct,
                    "EHF",
                    timedata,
                    options,
                    mask,
                    )
        if options.tx90pc:
            ncio.save_yearly(
                    HWA_tx,
                    HWM_tx,
                    HWN_tx,
                    HWF_tx,
                    HWD_tx,
                    HWT_tx,
                    txpct,
                    "tx90pct",
                    timedata,
                    options,
                    mask,
                    )
        if options.tn90pc:
            ncio.save_yearly(
                    HWA_tn,
                    HWM_tn,
                    HWN_tn,
                    HWF_tn,
                    HWD_tn,
                    HWT_tn,
                    tnpct,
                    "tn90pct",
                    timedata,
                    options,
                    mask,
                    )

    # Save daily data to netcdf
    if options.dailyout:
        if options.keeptave:
            ncio.save_daily(
                    EHF,
                    event,
                    ends,
                    options,
                    timedata,
                    original_shape,
                    mask,
                    defn='EHF',
                    )
        if options.tx90pcd:
            ncio.save_daily(
                    txexceed,
                    event_tx,
                    ends_tx,
                    options,
                    timedata,
                    original_shape,
                    mask,
                    defn='tx90pct',
                    )
        if options.tn90pcd:
            ncio.save_daily(
                    tnexceed,
                    event_tn,
                    ends_tn,
                    options,
                    timedata,
                    original_shape,
                    mask,
                    defn='tn90pct',
                    )

    # save EHIs
    if options.ehi:
        ncio.save_ehi(EHIsig, EHIaccl, options, timedata, original_shape, mask)

Functions

def hw_aspects(EHF: numpy.ndarray, season: str, hemisphere: str) ‑> tuple

Call identify_hw() and/or identify_semi_hw() and calculate seasonal aspects.

Arguments:

  • EHF : EHF values.
  • season : The season in which to calculate aspects for.
  • hemisphere : The hemisphere to calculate heatwaves for.

Returns:

  • HWA : Amplitude
  • HWM : Magnitude
  • HWN : Number
  • HWF : Frequency
  • HWD : Maximum duration
  • HWT : Timing
Expand source code
def hw_aspects(EHF:np.ndarray, season:str, hemisphere:str)->tuple:
    """Call `identify_hw` and/or `identify_semi_hw` and calculate seasonal aspects.

    ## Arguments:

    - EHF : EHF values.
    - season : The season in which to calculate aspects for.
    - hemisphere : The hemisphere to calculate heatwaves for.

    ## Returns:

    - HWA : Amplitude
    - HWM : Magnitude
    - HWN : Number
    - HWF : Frequency
    - HWD : Maximum duration
    - HWT : Timing
    """
    global timedata
    # Select indices depending on calendar season and hemisphere
    if season=='summer':
        if hemisphere=='south':
            startday = timedata.SHS[0]
            endday = timedata.SHS[1]
        else:
            startday = timedata.SHW[0]
            endday = timedata.SHW[1]
    elif season=='winter':
        if hemisphere=='south':
            startday = timedata.SHW[0]
            endday = timedata.SHW[1]
        else:
            startday = timedata.SHS[0]
            endday = timedata.SHS[1]
    # Initialize arrays
    HWA = np.ones(((timedata.nyears,)+(EHF.shape[1],)))*const.FILL_VAL
    HWM = HWA.copy()
    HWN = HWA.copy()
    HWF = HWA.copy()
    HWD = HWA.copy()
    HWT = HWA.copy()
    # Loop over years
    for iyear, year in enumerate(range(timedata.first_year, timedata.daylast.year)):
        if options.oldmethod:
            if (year==timedata.daylast.year): continue # Incomplete yr
            # Select this years season
            allowance = 14 # For including heawave days after the end of the season
            ifrom = startday + timedata.daysinyear*iyear - 1 # -1 to include Oct 31st
            ito = endday + timedata.daysinyear*iyear + allowance
            EHF_i = EHF[ifrom:ito,...]
            event_i, duration_i = identify_hw(EHF_i)
            # Identify heatwaves that span the entire season
            perpetual = event_i[:-allowance,...].all(axis=0)
            perphw = duration_i[0,perpetual] - 1 # -1 to exclude Oct 31st
            # Remove events that start after the end of the season and before start
            EHF_i = EHF_i[1:,...]
            duration_i = duration_i[1:-allowance,...]
            event_i = event_i[1:-allowance,...]
            # Indicate perpetual heatwaves if they occur.
            if perpetual.any(): duration_i[0,perpetual] = perphw
        else:
            # Select this years season
            allowance = 14 # For including heawave days after the end of the season
            ifrom = startday + timedata.daysinyear*iyear - 2
            ito = endday + timedata.daysinyear*iyear + allowance
            EHF_i = EHF[ifrom:ito,...]
            event_i, duration_i = identify_hw(EHF_i)
            # Remove EHF values in pre season
            EHF_i = EHF_i[2:,...]
            # Identify semi heatwaves that overlap the start of season only including days within the season.
            event_i = event_i[2:,...]
            event_i, duration_i = identify_semi_hw(event_i)
            # Identify heatwaves that span the entire season
            perpetual = event_i[:-allowance,...].all(axis=0)
            perphw = duration_i[0,perpetual] - 1 # -1 to exclude Oct 31st
            # Indicate locations of perpetual heatwaves if they occur.
            if perpetual.any(): duration_i[0,perpetual] = perphw
            # Remove events that start after the end of the season
            duration_i = duration_i[:-allowance,...]
        # Calculate metrics
        HWN[iyear,...] = (duration_i>0).sum(axis=0)
        HWF[iyear,...] = duration_i.sum(axis=0)
        HWD[iyear,...] = duration_i.max(axis=0)
        HWT[iyear,...] = np.argmax(event_i, axis=0)
        # HWM and HWA must be done on each gridcell
        for x in range(EHF_i.shape[1]):
            hw_mag = []
            # retrieve indices where heatwaves start.
            i = np.where(duration_i[:,x]>0)[0] # time
            d = duration_i[i,x] # duration
            if (d==0).all(): continue
            for hw in range(len(d)):
                # retireve this heatwave's EHF values and mean magnitude
                hwdat = EHF_i[i[hw]:i[hw]+d[hw],x]
                hw_mag.append(np.nanmean(hwdat))
            HWM[iyear,x] = np.nanmean(hw_mag)
            # Find the hottest heatwave magnitude
            idex = np.where(hw_mag==max(hw_mag))[0][0]
            # Find that heatwave's hottest day as EHF value.
            HWA[iyear,x] = EHF_i[i[idex]:i[idex]+d[idex],x].max()
        # Locate invalid values or misisng values
        missing = EHF_i.mask.all(axis=0)
        if missing.any():
            HWT[iyear,missing] = const.MISSING_VAL
            HWN[iyear,missing] = const.MISSING_VAL
            HWF[iyear,missing] = const.MISSING_VAL
            HWD[iyear,missing] = const.MISSING_VAL
            HWA[iyear,missing] = const.MISSING_VAL
            HWM[iyear,missing] = const.MISSING_VAL
        invalid = HWN[iyear,...]==0
        HWT[iyear,invalid] = const.INVALID_VAL
        HWD[iyear,invalid] = const.INVALID_VAL
        HWA[iyear,invalid] = const.INVALID_VAL
        HWM[iyear,invalid] = const.INVALID_VAL
    return HWA, HWM, HWN, HWF, HWD, HWT
def identify_hw(ehfs: numpy.ndarray) ‑> tuple

Locate heatwaves from EHF and returns an event indicator and a duration indicator.

Arguments:

  • ehfs : EHF values.

Returns:

  • events : array of bools for heatwave events.
  • endss : array of integers for heatwave duration.
Expand source code
def identify_hw(ehfs:np.ndarray)->tuple:
    """Locate heatwaves from EHF and returns an event indicator and a duration indicator.

    ## Arguments:

    - ehfs : EHF values.

    ## Returns:
    - events : array of bools for heatwave events.
    - endss : array of integers for heatwave duration.
    """
    # Agregate consecutive days with EHF>0
    # First day contains duration
    if np.isnan(ehfs).any(): # then ehfs is a view to tmax or tmin
        # This is handled differently to EHFs because tmax or tmin could theoretically hold -ve
        # values. Therefore we identify values of the exceedence index that are not nan values as
        # heatwaves.
        events = np.logical_not(np.isnan(ehfs)).astype(int)
    else: # ehfs is a view to actual EHF values
        events = (ehfs>0.0).astype(int)
        events[events.mask==True] = 0
    for i in range(events.shape[0] - 2, -1, -1):
         events[i,events[i,...]>0] = events[i+1,events[i,...]>0]+1

    # Identify when heatwaves start with duration
    # Given that first day contains duration
    diff = np.zeros(events.shape)
    # Insert the first diff value as np.diff doesn't catch it because
    # there is no pevious value to compare to.
    diff[0,...] = events[0,...]
    diff[1:,...] = np.diff(events, axis=0)
    endss = np.ma.zeros(ehfs.shape, dtype=int)
    endss[diff>2] = events[diff>2]

    # Remove events less than 3 days
    events[diff==2] = 0
    events[np.roll(diff==2, 1, axis=0)] = 0
    events[diff==1] = 0
    del diff
    events[events>0] = 1
    events = events.astype(bool)
    endss[endss<3] = 0
    return events, endss
def identify_semi_hw(ehfs: numpy.ndarray) ‑> tuple

identify_hw locates heatwaves from EHF and returns an event indicator and a duration indicator. This function does not exclude events less than three days in duration.

Arguments:

  • ehfs : EHF values.

Returns:

  • events : array of bools for heatwave events.
  • endss – array of integers for heatwave duration.
Expand source code
def identify_semi_hw(ehfs:np.ndarray)->tuple:
    """identify_hw locates heatwaves from EHF and returns an event indicator and a duration
    indicator. This function does not exclude events less than three days in duration.

    ## Arguments:

    - ehfs : EHF values.

    ## Returns:

    - events : array of bools for heatwave events.
    - endss -- array of integers for heatwave duration.
    """
    # Agregate consecutive days with EHF>0
    # First day contains duration
    if np.isnan(ehfs).any(): # then ehfs is a view to tmax or tmin
        # This is handled differently to EHFs because tmax or tmin could theoretically hold -ve
        # values. Therefore we identify values of the exceedence index that are not nan values as
        # heatwaves.
        events = np.logical_not(np.isnan(ehfs)).astype(int)
    else: # ehfs is a view to actual EHF values
        events = (ehfs>0.0).astype(int)
        events[events.mask==True] = 0
    for i in range(events.shape[0] - 2, -1, -1):
         events[i,events[i,...]>0] = events[i+1,events[i,...]>0]+1

    # Identify when heatwaves start with duration
    # Given that first day contains duration
    diff = np.zeros(events.shape)
    # Insert the first diff value as np.diff doesn't catch it because
    # there is no pevious value to compare to.
    diff[0,...] = events[0,...]
    diff[1:,...] = np.diff(events, axis=0)
    endss = np.ma.zeros(ehfs.shape, dtype=int)
    endss[diff>0] = events[diff>0]
    del diff
    events[events>0] = 1
    events = events.astype(bool)
    return events, endss
def main()

Main function that is called by the entry point script.

See the following documentation for more information on entry points. https://setuptools.pypa.io/en/latest/userguide/entry_point.html?highlight=entry

This function parses the command line arguments and options then calculates the heatwaves and saves them to netcdf files.

Expand source code
def main():
    """Main function that is called by the entry point script.

    See the following documentation for more information on entry points.
    https://setuptools.pypa.io/en/latest/userguide/entry_point.html?highlight=entry

    This function parses the command line arguments and options then calculates the heatwaves and
    saves them to netcdf files.
    """
    global grid, timedata, mask, options
    # Get the options and variables
    options = getoptions.parse_arguments(sys.argv[1:])

    # Load time data
    if options.verbose: print("Loading data")
    if options.tmaxfile:
        filename = options.tmaxfile
    elif options.tminfile:
        filename = options.tminfile
    timedata = ncio.TimeData(filename, options.timevname)
    grid = GridDescription()

    # Load land-sea mask
    if options.maskfile: mask = ncio.get_mask(options)
    else: mask = None

    # Load the temperature data over the base period
    if options.keeptave or options.keeptmax:
        tmax = ncio.load_bp_data(options, timedata, variable='tmax', mask=mask)
    if options.keeptave or options.keeptmin:
        tmin = ncio.load_bp_data(options, timedata, variable='tmin', mask=mask)
    if options.keeptave:
        tave_base = (tmax + tmin)/2.

    # Caclulate percentile
    if options.verbose: print("Calculating percentiles")
    if options.keeptave:
        tpct = window_percentile(tave_base, daysinyear=timedata.daysinyear)
    if options.keeptmax:
        txpct = window_percentile(tmax, daysinyear=timedata.daysinyear)
    if options.keeptmin:
        tnpct = window_percentile(tmin, daysinyear=timedata.daysinyear)
    if not options.noehf: del tave_base

    # Load all data
    if options.verbose: print("Loading data")
    if options.keeptave or options.keeptmax:
        tmax, grid.lats = ncio.get_all_data(options.tmaxfile, options.tmaxvname, options)
        original_shape = tmax.shape
    if options.keeptave or options.keeptmin:
        tmin, grid.lats = ncio.get_all_data(options.tminfile, options.tminvname, options)
        original_shape = tmin.shape
    if options.keeptave:
        tave = (tmax + tmin)/2.

    # Remove leap days from data
    if (timedata.calendar=='gregorian')|(timedata.calendar=='proleptic_gregorian')|(timedata.calendar=='standard'):
        if options.keeptave:
            tave = tave[(timedata.dates.month!=2)|(timedata.dates.day!=29),...]
            original_shape = (tave.shape[0], original_shape[1], original_shape[2])
        if options.keeptmax or options.keeptave:
            tmax = tmax[(timedata.dates.month!=2)|(timedata.dates.day!=29),...]
            original_shape = (tmax.shape[0], original_shape[1], original_shape[2])
        if options.keeptmin or options.keeptave:
            tmin = tmin[(timedata.dates.month!=2)|(timedata.dates.day!=29),...]
            original_shape = (tmin.shape[0], original_shape[1], original_shape[2])
        timedata.calendar = '365_day'

    # Remove incomplete starting year
    timedata.first_year = timedata.dayone.year
    if (timedata.dayone.month!=1)|(timedata.dayone.day!=1):
        timedata.first_year = timedata.dayone.year+1
        start = np.argmax(timedata.dates.year==timedata.first_year)
        if options.keeptave:
            tave = tave[start:,...]
            original_shape = (tave.shape[0], original_shape[1], original_shape[2])
        if options.keeptmax:
            tmax = tmax[start:,...]
            original_shape = (tmax.shape[0], original_shape[1], original_shape[2])
        if options.keeptmin:
            tmin = tmin[start:,...]
            original_shape = (tmin.shape[0], original_shape[1], original_shape[2])

    # Apply mask
    if options.maskfile:
        if options.keeptave:
            tave = tave[:,mask]
        if options.keeptmax:
            tmax = tmax[:,mask]
        if options.keeptmin:
            tmin = tmin[:,mask]

    if options.verbose: print("Caclulating definition")
    # Calculate EHF
    if not options.noehf:
        EHF = np.ma.ones(tave.shape)*np.nan
        for i in range(32, tave.shape[0]):
            EHIaccl = tave[i-2:i+1,...].sum(axis=0)/3.0 - tave[i-32:i-2,...].sum(axis=0)/30.0
            EHIsig = tave[i-2:i+1,...].sum(axis=0)/3.0 - tpct[i-timedata.daysinyear*int((i+1)/timedata.daysinyear),...]
            EHF[i,...] = np.ma.maximum(EHIaccl, 1.0)*EHIsig
        EHF[EHF<0] = 0
    if options.ehi:
        EHIaccl = np.ma.ones(tave.shape)*np.nan
        EHIsig = np.ma.ones(tave.shape)*np.nan
        for i in range(32, tave.shape[0]):
            EHIaccl[i,...] = tave[i-2:i+1,...].sum(axis=0)/3.0 - tave[i-32:i-2,...].sum(axis=0)/30.0
            EHIsig[i,...] = tave[i-2:i+1,...].sum(axis=0)/3.0 - tpct[i-timedata.daysinyear*int((i+1)/timedata.daysinyear),...]

    # Tx90pc exceedences
    if options.keeptmin or options.keeptmax:
        if options.keeptmax:
            txexceed = np.ma.ones(tmax.shape)*np.nan
            for i in range(0, tmax.shape[0]):
                idoy = i-timedata.daysinyear*int((i+1)/timedata.daysinyear)
                txexceed[i,...] = tmax[i,...]>txpct[idoy,...]
            txexceed[txexceed>0] = tmax[txexceed>0]
            txexceed[txexceed==0] = np.nan # need nans to be identified correctly
        if options.keeptmin:
            tnexceed = np.ma.ones(tmin.shape)*np.nan
            for i in range(0, tmin.shape[0]):
                idoy = i-timedata.daysinyear*int((i+1)/timedata.daysinyear)
                tnexceed[i,...] = tmin[i,...]>tnpct[idoy,...]
            tnexceed[tnexceed>0] = tmin[tnexceed>0]
            tnexceed[tnexceed==0] = np.nan # need nans to be identified correctly

    # Calculate daily output
    if options.dailyout or options.dailyout: event, ends = identify_hw(EHF)
    if options.tx90pcd: event_tx, ends_tx = identify_hw(txexceed)
    if options.tn90pcd: event_tn, ends_tn = identify_hw(tnexceed)

    timedata.nyears = len(range(timedata.first_year, timedata.daylast.year + 1))

    # Calculate yearly output
    if options.yearlyout:
        if options.verbose: print("Calculating yearly aspects")
        # Split by latitude
        north = (grid.lats>0).any()
        south = (grid.lats<=0).any()
        if not options.noehf:
            HWA_EHF, HWM_EHF, HWN_EHF, HWF_EHF, HWD_EHF, HWT_EHF = split_hemispheres(
                    EHF,
                    north,
                    south,
                    )
        if options.tx90pc:
            HWA_tx, HWM_tx, HWN_tx, HWF_tx, HWD_tx, HWT_tx = split_hemispheres(
                    txexceed,
                    north,
                    south,
                    )
        if options.tn90pc:
            HWA_tn, HWM_tn, HWN_tn, HWF_tn, HWD_tn, HWT_tn = split_hemispheres(
                    tnexceed,
                    north,
                    south,
                    )

    if options.verbose: print("Saving")
    # Save yearly data to netcdf
    if options.yearlyout:
        if not options.noehf:
            ncio.save_yearly(
                    HWA_EHF,
                    HWM_EHF,
                    HWN_EHF,
                    HWF_EHF,
                    HWD_EHF,
                    HWT_EHF,
                    tpct,
                    "EHF",
                    timedata,
                    options,
                    mask,
                    )
        if options.tx90pc:
            ncio.save_yearly(
                    HWA_tx,
                    HWM_tx,
                    HWN_tx,
                    HWF_tx,
                    HWD_tx,
                    HWT_tx,
                    txpct,
                    "tx90pct",
                    timedata,
                    options,
                    mask,
                    )
        if options.tn90pc:
            ncio.save_yearly(
                    HWA_tn,
                    HWM_tn,
                    HWN_tn,
                    HWF_tn,
                    HWD_tn,
                    HWT_tn,
                    tnpct,
                    "tn90pct",
                    timedata,
                    options,
                    mask,
                    )

    # Save daily data to netcdf
    if options.dailyout:
        if options.keeptave:
            ncio.save_daily(
                    EHF,
                    event,
                    ends,
                    options,
                    timedata,
                    original_shape,
                    mask,
                    defn='EHF',
                    )
        if options.tx90pcd:
            ncio.save_daily(
                    txexceed,
                    event_tx,
                    ends_tx,
                    options,
                    timedata,
                    original_shape,
                    mask,
                    defn='tx90pct',
                    )
        if options.tn90pcd:
            ncio.save_daily(
                    tnexceed,
                    event_tn,
                    ends_tn,
                    options,
                    timedata,
                    original_shape,
                    mask,
                    defn='tn90pct',
                    )

    # save EHIs
    if options.ehi:
        ncio.save_ehi(EHIsig, EHIaccl, options, timedata, original_shape, mask)
def split_hemispheres(EHF: numpy.ndarray, north: bool, south: bool) ‑> tuple

Split the input data by hemispheres, and glue them back together after heatwave calculations.

The EHF spatial axes are reshaped into a single dimension. The output arrays are 2D. When saving, data should be reshaped or indexed with a land-sea mask.

Arguments:

  • EHF : EHF values.
  • north : Calculate heatwaves for the northern hemisphere.
  • south : Calculate heatwaves for the southern hemisphere.

Returns:

  • HWA : Amplitude
  • HWM : Magnitude
  • HWN : Number
  • HWF : Frequency
  • HWD : Maximum duration
  • HWT : Timing
Expand source code
def split_hemispheres(EHF:np.ndarray, north:bool, south:bool)->tuple:
    """Split the input data by hemispheres, and glue them back together after heatwave
    calculations.

    The EHF spatial axes are reshaped into a single dimension.
    The output arrays are 2D. When saving, data should be reshaped or indexed with a land-sea mask.

    ## Arguments:

    - EHF : EHF values.
    - north : Calculate heatwaves for the northern hemisphere.
    - south : Calculate heatwaves for the southern hemisphere.

    ## Returns:

    - HWA : Amplitude
    - HWM : Magnitude
    - HWN : Number
    - HWF : Frequency
    - HWD : Maximum duration
    - HWT : Timing
    """
    lats = grid.lats
    if south:
        if options.maskfile:
            EHF_s = EHF[:,:(mask[lats<=0]>0).sum()]
        else:
            EHF_s = EHF[:,lats<=0,...]
        # Reshape to 2D
        space = EHF_s.shape[1:]
        if len(space)>1:
            EHF_s = EHF_s.reshape(EHF_s.shape[0], space[0]*space[1])
        # Southern hemisphere aspects
        HWA_s, HWM_s, HWN_s, HWF_s, HWD_s, HWT_s = hw_aspects(EHF_s, options.season, 'south')
        del EHF_s
    if north:
        if options.maskfile:
            EHF_n = EHF[:,(mask[lats<=0]>0).sum():]
        else:
            EHF_n = EHF[:,lats>0,...]
        # Reshape to 2D
        space = EHF_n.shape[1:]
        if len(space)>1:
            EHF_n = EHF_n.reshape(EHF_n.shape[0],space[0]*space[1])
        # Northern hemisphere aspects
        HWA_n, HWM_n, HWN_n, HWF_n, HWD_n, HWT_n = hw_aspects(EHF_n, options.season, 'north')
        del EHF_n
    # Glue hemispheres back together
    if north and south:
        HWA = np.append(HWA_s, HWA_n, axis=1)
        HWM = np.append(HWM_s, HWM_n, axis=1)
        HWN = np.append(HWN_s, HWN_n, axis=1)
        HWF = np.append(HWF_s, HWF_n, axis=1)
        HWD = np.append(HWD_s, HWD_n, axis=1)
        HWT = np.append(HWT_s, HWT_n, axis=1)
    elif north:
        HWA = HWA_n
        HWM = HWM_n
        HWN = HWN_n
        HWF = HWF_n
        HWD = HWD_n
        HWT = HWT_n
    elif south:
        HWA = HWA_s
        HWM = HWM_s
        HWN = HWN_s
        HWF = HWF_s
        HWD = HWD_s
        HWT = HWT_s
    return HWA, HWM, HWN, HWF, HWD, HWT
def window_percentile(temp: numpy.ndarray, daysinyear: int = 365, wsize: int = 15) ‑> numpy.ndarray

Calculate a day-of-year moving window percentile.

Arguments:

  • temp : Temperature data.
  • daysinyear : Number of days in a year.
  • wsize : Number of days in moving window.
Expand source code
def window_percentile(temp:np.ndarray, daysinyear:int=365, wsize:int=15)->np.ndarray:
    """Calculate a day-of-year moving window percentile.

    ## Arguments:

    - temp : Temperature data.
    - daysinyear : Number of days in a year.
    - wsize : Number of days in moving window.
    """
    # Initialise array.
    pctl = np.ones(((daysinyear,)+temp.shape[1:]))*const.FILL_VAL

    # Construct the window.
    window = np.zeros(daysinyear, dtype=bool)
    window[-np.floor(wsize/2.).astype(int):] = 1
    window[:np.ceil(wsize/2.).astype(int)] = 1
    window = np.tile(window, options.bpend + 1 - options.bpstart)

    # Select the interpolation method.
    if options.qtilemethod=='python':
        percentile = np.percentile
        parameter = 0
    elif options.qtilemethod=='zhang':
        percentile = qtiler.quantile_zhang_fast
        parameter = False
    elif options.qtilemethod=='matlab':
        percentile = qtiler.quantile_R
        parameter = 5
    elif options.qtilemethod=='climpact':
        percentile = qtiler.quantile_climpact
        parameter = False

    # Set the percentile for each day of year.
    for day in range(daysinyear):
        pctl[day,...] = percentile(temp[window,...], options.pcntl, parameter)
        window = np.roll(window, 1)

    # Remaining nans are missing data.
    pctl[np.isnan(pctl)] = const.MISSING_VAL

    return pctl

Classes

class GridDescription (lats=array([], dtype=float64))

Description of grid.

Arguments:

  • lats : Latitudes.
Expand source code
class GridDescription(object):
    """Description of grid."""

    def __init__(self, lats=np.array([])):
        """## Arguments:

        - lats : Latitudes.
        """
        self.lats = lats