Source code for gfs_dynamical_core.component

from climt._core import ensure_contiguous_state
from sympl import (
    get_constant, TendencyStepper, initialize_numpy_arrays_with_properties,
    get_numpy_arrays_with_properties, ImplicitTendencyComponentComposite,
    get_tracer_names, restore_data_arrays_with_properties,
)
import numpy as np
import sys
import logging
try:
    from . import _gfs_dynamics
except ImportError as error:
    logging.warning(
        'Import failed. GFS dynamical core is likely not compiled and will not '
        'be available.'
    )
    print(error)


class GFSError(Exception):
    pass


def get_valid_properties(gfs_properties, prognostic_properties, property_type):
    return_dict = {}
    for name, properties in prognostic_properties.items():
        if name not in gfs_properties:
            return_dict[name] = properties
        elif 'dims' in prognostic_properties.keys():
            extra_dims = set(prognostic_properties['dims']).difference(['*'] + list(gfs_properties[name]['dims']))
            if len(extra_dims) != 0:
                raise GFSError(
                    'Cannot handle TendencyComponent with {} {} '
                    'that has extra dimensions {} not used by GFS'.format(
                        property_type, name, extra_dims)
                )
    return return_dict


[docs] class GFSDynamicalCore(TendencyStepper): """ Climt interface to the GFS dynamical core. The GFS code is available on `github`_. .. _github: https://github.com/jswhit/gfs-dycore """ _gfs_input_properties = { 'latitude': { 'units': 'degrees_N', 'dims': ['lat', 'lon'], }, 'longitude': { 'units': 'degrees_E', 'dims': ['lat', 'lon'], }, 'air_temperature': { 'units': 'degK', 'dims': ['mid_levels', 'lat', 'lon'], }, 'atmosphere_hybrid_sigma_pressure_a_coordinate_on_interface_levels': { 'units': 'dimensionless', 'dims': ['interface_levels'], 'alias': 'a_coord', }, 'atmosphere_hybrid_sigma_pressure_b_coordinate_on_interface_levels': { 'units': 'dimensionless', 'dims': ['interface_levels'], 'alias': 'b_coord', }, 'air_pressure': { 'dims': ['mid_levels', 'lat', 'lon'], 'units': 'Pa' }, 'air_pressure_on_interface_levels': { 'dims': ['interface_levels', 'lat', 'lon'], 'units': 'Pa' }, 'surface_air_pressure': { 'units': 'Pa', 'dims': ['lat', 'lon'], }, 'eastward_wind': { 'units': 'm s^-1', 'dims': ['mid_levels', 'lat', 'lon'], }, 'northward_wind': { 'units': 'm s^-1', 'dims': ['mid_levels', 'lat', 'lon'], }, 'divergence_of_wind': { 'units': 's^-1', 'dims': ['mid_levels', 'lat', 'lon'], }, 'atmosphere_relative_vorticity': { 'units': 's^-1', 'dims': ['mid_levels', 'lat', 'lon'], }, 'surface_geopotential': { 'units': 'm^2 s^-2', 'dims': ['lat', 'lon'], }, } _gfs_output_properties = { 'air_temperature': {'units': 'degK'}, 'air_pressure': { 'dims': ['mid_levels', 'lat', 'lon'], 'units': 'Pa' }, 'air_pressure_on_interface_levels': { 'dims': ['interface_levels', 'lat', 'lon'], 'units': 'Pa' }, 'surface_air_pressure': {'units': 'Pa'}, 'eastward_wind': {'units': 'm s^-1'}, 'northward_wind': {'units': 'm s^-1'}, 'divergence_of_wind': {'units': 's^-1'}, 'atmosphere_relative_vorticity': {'units': 's^-1'}, } _gfs_diagnostic_properties = {} input_properties = None output_properties = None diagnostic_properties = None uses_tracers = True tracer_dims = ('tracer', 'mid_levels', 'lat', 'lon') prepend_tracers = (('specific_humidity', 'kg/kg'),) @property def spectral_names(self): return ( 'eastward_wind', 'northward_wind', 'air_temperature', 'surface_air_pressure') + get_tracer_names()
[docs] def __init__( self, tendency_component_list=None, number_of_damped_levels=0, damping_timescale=2.*86400, moist=True, zero_negative_moisture=True, ): """ Initialise the GFS dynamical core. Args: tendency_component_list (list of TendencyComponent, optional): The TendencyComponent objects to use for spectral time stepping. number_of_damped_levels (int, optional): The number of levels from the model top which are Rayleigh damped. damping_timescale (float, optional): The damping timescale in :math:`s` to use for top-of-model Rayleigh damping. zero_negative_moisture (bool, optional): If True, all negative values of moisture will be set to zero before tracers are returned. """ tendency_component_list = tendency_component_list or [] self._tendency_component = ImplicitTendencyComponentComposite(*tendency_component_list) bad_diagnostics = set( self._tendency_component.diagnostic_properties.keys()).intersection( self.spectral_names) if len(bad_diagnostics) > 0: raise GFSError( 'Cannot use TendencyComponent components that produce {} as diagnostic ' 'output as these must only be stepped spectrally.'.format( bad_diagnostics, ) ) bad_diagnostics = set( self._tendency_component.diagnostic_properties.keys()).intersection( self._gfs_diagnostic_properties.keys()) if len(bad_diagnostics) > 0: raise GFSError( 'Cannot use TendencyComponent components that produce {} as diagnostic' ' output as these are already diagnosed by GFS.'.format( bad_diagnostics, ) ) self._update_constants() self._damping_levels = number_of_damped_levels self._tau_damping = damping_timescale self._zero_negative_moisture = zero_negative_moisture self.initialized = False self.input_properties = self._gfs_input_properties.copy() self.output_properties = self._gfs_output_properties.copy() self.diagnostic_properties = self._gfs_diagnostic_properties.copy() super(GFSDynamicalCore, self).__init__() self.input_properties.update( get_valid_properties( self._gfs_input_properties, self._tendency_component.input_properties, 'input') ) self.output_properties.update( get_valid_properties( self._gfs_output_properties, self._tendency_component.tendency_properties, 'output') ) self.diagnostic_properties.update( get_valid_properties( self._gfs_diagnostic_properties, self._tendency_component.diagnostic_properties, 'diagnostic') )
def _update_constants(self): self._radius = get_constant('planetary_radius', 'm') self._omega = get_constant('planetary_rotation_rate', 's^-1') self._R = get_constant('universal_gas_constant', 'J/mole/K') self._Rd = get_constant('gas_constant_of_dry_air', 'J/kg/K') self._Rv = get_constant('gas_constant_of_vapor_phase', 'J/kg/K') self._g = get_constant('gravitational_acceleration', 'm/s^2') self._Cp = get_constant('heat_capacity_of_dry_air_at_constant_pressure', 'J/kg/K') self._Cvap = get_constant('heat_capacity_of_vapor_phase', 'J/kg/K') self._fvirt = (1 - self._Rd/self._Rv)/(self._Rd/self._Rv) self._dry_pressure = get_constant('reference_air_pressure', 'Pa') self._toa_pressure = get_constant('top_of_model_pressure', 'Pa') def _initialize_model(self, state, timestep): assert not self.initialized self._time_step = timestep self._num_tracers = state['tracers'].shape[0] self._num_levs, self._num_lats, self._num_lons = state['air_temperature'].shape self._truncation = int(self._num_lons/3 - 2) self._spectral_dim = int( (self._truncation + 1)*(self._truncation + 2)/2) _gfs_dynamics.set_time_step( self._time_step.total_seconds()) _gfs_dynamics.set_constants( self._radius, self._omega, self._R, self._Rd, self._Rv, self._g, self._Cp, self._Cvap) _gfs_dynamics.set_model_grid( self._num_lats, self._num_lons, self._num_levs, self._truncation, self._spectral_dim, self._num_tracers, state['a_coord'][::-1], state['b_coord'][::-1], ) logging.info('Initialising dynamical core, this could take some time...') gaussian_weights, area_weights, latitudes, longitudes = \ _gfs_dynamics.init_model( self._dry_pressure, self._damping_levels, self._tau_damping, self._toa_pressure) np.testing.assert_allclose(latitudes*180./np.pi, state['latitude']) np.testing.assert_allclose(longitudes*180./np.pi, state['longitude']) logging.info('Done!') self._hash_u = 1000 self._hash_v = 1000 self._hash_temperature = 1000 self._hash_surface_pressure = 1000 self._hash_tracers = 1000 self.initialized = True
[docs] def __call__(self, state, timestep): """ Gets diagnostics from the current model state and steps the state forward in time according to the timestep. Args ---- state : dict A model state dictionary satisfying the input_properties of this object. timestep : timedelta The amount of time to step forward. Returns ------- diagnostics : dict Diagnostics from the timestep of the input state. new_state : dict A dictionary whose keys are strings indicating state quantities and values are the value of those quantities at the timestep after input state. Raises ------ KeyError If a required quantity is missing from the state. InvalidStateError If state is not a value input for the Stepper instance for other reasons. """ self._check_self_is_initialized() self._input_checker.check_inputs(state) raw_state = get_numpy_arrays_with_properties(state, self.input_properties) if self.uses_tracers: raw_state['tracers'] = self._tracer_packer.pack(state) raw_state['time'] = state['time'] tendencies, diagnostics = self._tendency_component(state, timestep) for name, value in tendencies.items(): if name in self.input_properties.keys(): tendencies[name] = value.to_units(self.input_properties[name]['units'] + ' s^-1') raw_diagnostics, raw_new_state = self.array_call( raw_state, timestep, prognostic_tendencies=tendencies) if self.uses_tracers: new_state = self._tracer_packer.unpack(raw_new_state.pop('tracers'), state) else: new_state = {} if self.tendencies_in_diagnostics: self._insert_tendencies_to_diagnostics( raw_state, raw_new_state, timestep, raw_diagnostics) diagnostics.update(restore_data_arrays_with_properties( raw_diagnostics, self.diagnostic_properties, state, self.input_properties, ignore_missing=True)) new_state.update(restore_data_arrays_with_properties( raw_new_state, self.output_properties, state, self.input_properties, ignore_missing=True)) gfs_output_quantities = list(self._gfs_output_properties.keys()) for tracer in self.prepend_tracers: gfs_output_quantities.append(tracer[0]) remaining = set(tendencies.keys()).difference(gfs_output_quantities) for name in remaining: new_state[name] = state[name] + tendencies[name]*timestep.total_seconds() for key in state.keys(): if key not in new_state: new_state[key] = state[key] check_new_state = { name: quantity for (name, quantity) in new_state.items() if name in self.output_properties.keys() } self._diagnostic_checker.check_diagnostics(diagnostics) self._output_checker.check_outputs(check_new_state) return diagnostics, new_state
@ensure_contiguous_state def array_call(self, state, timestep, prognostic_tendencies=None): """ Step the dynamical core by one step Args: state (dict): The state dictionary of numpy arrays. Returns: new_state, diagnostics (dict): The new state and associated diagnostics. """ prognostic_tendencies = prognostic_tendencies or {} self._update_constants() nlev, nlat, nlon = state['air_temperature'].shape if nlat < 16: raise GFSError('GFS requires at least 16 latitudes.') if nlon < 12: raise GFSError('GFS requires at least 12 longitudes') if not self.initialized: self._initialize_model(state, timestep) if nlev != self._num_levs: raise GFSError( 'Number of vertical levels may not change between successive ' 'calls to GFS. Last time was {}, this time is {}'.format( self._num_levs, nlev) ) if nlat != self._num_lats: raise GFSError( 'Number of latitudes may not change between successive ' 'calls to GFS. Last time was {}, this time is {}'.format( self._num_lats, nlat) ) if nlon != self._num_lons: raise GFSError( 'Number of longitudes may not change between successive ' 'calls to GFS. Last time was {}, this time is {}'.format( self._num_lons, nlon) ) if timestep.total_seconds() != self._time_step.total_seconds(): raise GFSError( 'GFSDynamicalCore can only be run with a constant timestep.' ) outputs = initialize_numpy_arrays_with_properties( self.output_properties, state, self.input_properties, prepend_tracers=self.prepend_tracers, tracer_dims=self.tracer_dims, ) lnsp = np.log(state['surface_air_pressure']) t_virt = state['air_temperature']*( 1 + self._fvirt*state['tracers'][0, :, :, :]) outputs['air_pressure_on_interface_levels'][:] = ( state['air_pressure_on_interface_levels'][::-1, :, :]) for name in ( 'air_pressure', 'tracers', 'eastward_wind', 'northward_wind', 'divergence_of_wind', 'atmosphere_relative_vorticity', 'surface_air_pressure',): if np.product(outputs[name].shape) > 0: outputs[name][:] = state[name] _gfs_dynamics.assign_grid_arrays( outputs['eastward_wind'], outputs['northward_wind'], t_virt, lnsp, outputs['tracers'], outputs['atmosphere_relative_vorticity'], outputs['divergence_of_wind']) _gfs_dynamics.assign_pressure_arrays( outputs['surface_air_pressure'], outputs['air_pressure'], outputs['air_pressure_on_interface_levels']) _gfs_dynamics.set_topography(state['surface_geopotential']) _gfs_dynamics.calculate_pressure() np.testing.assert_allclose(outputs['air_pressure'], state['air_pressure']) np.testing.assert_allclose( outputs['air_pressure_on_interface_levels'][::-1, :, :], state['air_pressure_on_interface_levels'] ) self._update_spectral_arrays(state) tendency_arrays = \ self._get_tendency_arrays( prognostic_tendencies, state['air_temperature'].shape) # see Pg. 12 in gfsModelDoc.pdf virtual_temp_tend = tendency_arrays['air_temperature']*( 1 + self._fvirt*state['tracers'][0, :, :, :]) + \ self._fvirt*t_virt*tendency_arrays['tracers'][0, :, :, :] # dlnps/dt = (1/ps)*dps/dt lnps_tend = ((1. / state['surface_air_pressure']) * tendency_arrays['surface_air_pressure']) _gfs_dynamics.assign_tendencies( tendency_arrays['eastward_wind'], tendency_arrays['northward_wind'], virtual_temp_tend, lnps_tend, tendency_arrays['tracers']) _gfs_dynamics.take_one_step() _gfs_dynamics.convert_to_grid() _gfs_dynamics.calculate_pressure() if self._zero_negative_moisture: set_negatives_to_zero(outputs['tracers'][0, :, :, :]) outputs['air_temperature'][:] = t_virt/( 1 + self._fvirt*outputs['tracers'][0, :, :, :]) outputs['air_pressure_on_interface_levels'][:] = \ outputs['air_pressure_on_interface_levels'][::-1, :, :] outputs['time'] = state['time'] return {}, outputs def _get_tendency_arrays(self, tendencies, T_shape): out_arrays = {} out_arrays.update(self._get_3d_tendencies(tendencies, T_shape)) out_arrays.update(self._get_2d_tendencies(tendencies, T_shape)) out_arrays['tracers'] = self._get_tracer_tendencies(tendencies, T_shape) return out_arrays def _get_3d_tendencies(self, tendencies, T_shape): return self._get_tendencies( tendencies, ['air_temperature', 'eastward_wind', 'northward_wind'], ['mid_levels', 'lat', 'lon'], T_shape ) def _get_2d_tendencies(self, tendencies, T_shape): return self._get_tendencies( tendencies, ['surface_air_pressure'], ['lat', 'lon'], T_shape[1:] ) def _get_tendencies(self, tendencies, quantity_names, dims, shape): out_arrays = {} for name in quantity_names: if name in tendencies: property_dict = { name: { 'dims': dims, 'units': tendencies[name].attrs['units'], } } out_arrays.update( get_numpy_arrays_with_properties(tendencies, property_dict)) else: out_arrays[name] = np.zeros(shape) # must broadcast any singleton dimensions for name, current_array in out_arrays.items(): if current_array.shape != shape: new_array = np.empty(shape) new_array[:] = current_array out_arrays[name] = new_array return out_arrays def _get_tracer_tendencies(self, tendencies, T_shape): return_array = np.empty([self._num_tracers] + list(T_shape)) for i, name in enumerate(self._tracer_packer.tracer_names): if name in tendencies: property_dict = { name: { 'dims': ['mid_levels', 'lat', 'lon'], 'units': tendencies[name].attrs['units'], } } tend = get_numpy_arrays_with_properties( tendencies, property_dict)[name] return_array[i, :, :, :] = tend else: return_array[i, :, :, :] = 0. return return_array def _update_spectral_arrays(self, state): """ Checks the state to see if any arrays with spectral counterparts have been modified since they were last returned, and if they have will update the specctral counterpart to reflect the new state array. """ u_hash = get_hash(state['eastward_wind']) v_hash = get_hash(state['northward_wind']) if (u_hash != self._hash_u or v_hash != self._hash_v): _gfs_dynamics.vrt_div_to_spectral() self._hash_u = u_hash self._hash_v = v_hash T_hash = get_hash(state['air_temperature']) if T_hash != self._hash_temperature: _gfs_dynamics.virtemp_to_spectral() self._hash_temperature = T_hash tracer_hash = get_hash(state['tracers']) if tracer_hash != self._hash_tracers: _gfs_dynamics.tracer_to_spectral() self._hash_tracers = tracer_hash p_surf_hash = get_hash(state['surface_air_pressure']) if p_surf_hash != self._hash_surface_pressure: _gfs_dynamics.lnps_to_spectral() self._hash_surface_pressure = p_surf_hash def __del__(self): """ call shutdown in fortran code """ logging.info("Cleaning up dynamical core...") _gfs_dynamics.shut_down_model() logging.info("Done!")
def get_hash(array): if sys.version_info > (3, 0): return hash(array.data.tobytes()) else: array.flags.writeable = False return hash(array.data) def set_negatives_to_zero(array): array[array < 0] = 0 return array ''' def return_tendency_arrays_or_zeros(quantity_list, state, tendencies): tendency_list = [] for quantity in quantity_list: if quantity in tendencies.keys(): tendency_list.append(np.asfortranarray(tendencies[quantity].values)) elif quantity in state.keys(): tendency_list.append(np.zeros(state[quantity].shape, order='F')) else: raise IndexError("{} not found in input state or tendencies".format(quantity)) return tendency_list '''