flopy/flopy/pakbase.py

1165 lines
38 KiB
Python

"""
pakbase module
This module contains the base package class from which
all of the other packages inherit from.
"""
from __future__ import print_function
import abc
import os
import webbrowser as wb
import numpy as np
from numpy.lib.recfunctions import stack_arrays
from .modflow.mfparbc import ModflowParBc as mfparbc
from .utils import Util2d, Util3d, Transient2d, MfList, check
from .utils import OptionBlock
from .utils.flopy_io import ulstrd
class PackageInterface:
@property
@abc.abstractmethod
def name(self):
raise NotImplementedError(
"must define name in child " "class to use this base class"
)
@name.setter
@abc.abstractmethod
def name(self, name):
raise NotImplementedError(
"must define name in child " "class to use this base class"
)
@property
@abc.abstractmethod
def parent(self):
raise NotImplementedError(
"must define parent in child " "class to use this base class"
)
@parent.setter
@abc.abstractmethod
def parent(self, name):
raise NotImplementedError(
"must define parent in child " "class to use this base class"
)
@property
@abc.abstractmethod
def package_type(self):
raise NotImplementedError(
"must define package_type in child " "class to use this base class"
)
@property
@abc.abstractmethod
def data_list(self):
# [data_object, data_object, ...]
raise NotImplementedError(
"must define data_list in child " "class to use this base class"
)
@abc.abstractmethod
def export(self, f, **kwargs):
raise NotImplementedError(
"must define export in child " "class to use this base class"
)
@property
@abc.abstractmethod
def plottable(self):
raise NotImplementedError(
"must define plottable in child " "class to use this base class"
)
@property
def has_stress_period_data(self):
return self.__dict__.get("stress_period_data", None) is not None
@staticmethod
def _check_thresholds(chk, array, active, thresholds, name):
"""Checks array against min and max threshold values."""
mn, mx = thresholds
chk.values(
array,
active & (array < mn),
"{} values below checker threshold of {}".format(name, mn),
"Warning",
)
chk.values(
array,
active & (array > mx),
"{} values above checker threshold of {}".format(name, mx),
"Warning",
)
@staticmethod
def _confined_layer_check(chk):
return
def _other_xpf_checks(self, chk, active):
# check for negative hani
chk.values(
self.__dict__["hani"].array,
active & (self.__dict__["hani"].array < 0),
"negative horizontal anisotropy values",
"Error",
)
# check vkcb if there are any quasi-3D layers
if self.parent.dis.laycbd.sum() > 0:
# pad non-quasi-3D layers in vkcb array with ones so
# they won't fail checker
vkcb = self.vkcb.array.copy()
for l in range(self.vkcb.shape[0]):
if self.parent.dis.laycbd[l] == 0:
# assign 1 instead of zero as default value that
# won't violate checker
# (allows for same structure as other checks)
vkcb[l, :, :] = 1
chk.values(
vkcb,
active & (vkcb <= 0),
"zero or negative quasi-3D confining bed Kv values",
"Error",
)
self._check_thresholds(
chk,
vkcb,
active,
chk.property_threshold_values["vkcb"],
"quasi-3D confining bed Kv",
)
@staticmethod
def _get_nan_exclusion_list():
return []
def _get_check(self, f, verbose, level, checktype):
if checktype is not None:
return checktype(self, f=f, verbose=verbose, level=level)
else:
return check(self, f=f, verbose=verbose, level=level)
def _check_oc(self, f=None, verbose=True, level=1, checktype=None):
spd_inds_valid = True
chk = self._get_check(f, verbose, level, checktype)
spd = getattr(self, "stress_period_data")
nan_exclusion_list = self._get_nan_exclusion_list()
for per in spd.data.keys():
if isinstance(spd.data[per], np.recarray):
spdata = self.stress_period_data.data[per]
inds = chk._get_cell_inds(spdata)
# General BC checks
# check for valid cell indices
spd_inds_valid = chk._stress_period_data_valid_indices(spdata)
# first check for and list nan values
chk._stress_period_data_nans(spdata, nan_exclusion_list)
if spd_inds_valid:
# next check for BCs in inactive cells
chk._stress_period_data_inactivecells(spdata)
# More specific BC checks
# check elevations in the ghb, drain, and riv packages
if self.name[0] in check.bc_stage_names.keys():
# check that bc elevations are above model
# cell bottoms -- also checks for nan values
elev_name = chk.bc_stage_names[self.name[0]]
mg = self.parent.modelgrid
botms = mg.botm[inds]
test = spdata[elev_name] < botms
en = "BC elevation below cell bottom"
chk.stress_period_data_values(
spdata,
test,
col=elev_name,
error_name=en,
error_type="Error",
)
chk.summarize()
return chk
def _get_kparams(self):
# build model specific parameter lists
kparams_all = {
"hk": "horizontal hydraulic conductivity",
"vka": "vertical hydraulic conductivity",
"k": "horizontal hydraulic conductivity",
"k22": "hydraulic conductivity second axis",
"k33": "vertical hydraulic conductivity",
}
kparams = {}
vka_param = None
for kp, name in kparams_all.items():
if kp in self.__dict__:
kparams[kp] = name
if "hk" in self.__dict__:
hk = self.hk.array.copy()
else:
hk = self.k.array.copy()
if "vka" in self.__dict__ and self.layvka.sum() > 0:
vka = self.vka.array
vka_param = kparams.pop("vka")
elif "k33" in self.__dict__:
vka = self.k33.array
vka_param = kparams.pop("k33")
else:
vka = None
if vka is not None:
vka = vka.copy()
return kparams, hk, vka, vka_param
def _check_flowp(self, f=None, verbose=True, level=1, checktype=None):
chk = self._get_check(f, verbose, level, checktype)
active = chk.get_active()
# build model specific parameter lists
kparams, hk, vka, vka_param = self._get_kparams()
# check for zero or negative values of hydraulic conductivity,
# anisotropy, and quasi-3D confining beds
for kp, name in kparams.items():
if self.__dict__[kp].array is not None:
chk.values(
self.__dict__[kp].array,
active & (self.__dict__[kp].array <= 0),
"zero or negative {} values".format(name),
"Error",
)
if "hani" in self.__dict__:
self._other_xpf_checks(chk, active)
# check for unusually high or low values of hydraulic conductivity
# convert vertical anisotropy to Kv for checking
if vka is not None:
if "layvka" in self.__dict__:
for l in range(vka.shape[0]):
vka[l] *= hk[l] if self.layvka.array[l] != 0 else 1
self._check_thresholds(
chk,
vka,
active,
chk.property_threshold_values["vka"],
vka_param,
)
for kp, name in kparams.items():
if self.__dict__[kp].array is not None:
self._check_thresholds(
chk,
self.__dict__[kp].array,
active,
chk.property_threshold_values[kp],
name,
)
if self.name[0] in ["UPW", "LPF"]:
storage_coeff = "STORAGECOEFFICIENT" in self.options or (
"storagecoefficient" in self.__dict__
and self.storagecoefficient.get_data()
)
self._check_storage(chk, storage_coeff)
chk.summarize()
return chk
def check(self, f=None, verbose=True, level=1, checktype=None):
"""
Check package data for common errors.
Parameters
----------
f : str or file handle
String defining file name or file handle for summary file
of check method output. If a sting is passed a file handle
is created. If f is None, check method does not write
results to a summary file. (default is None)
verbose : bool
Boolean flag used to determine if check method results are
written to the screen
level : int
Check method analysis level. If level=0, summary checks are
performed. If level=1, full checks are performed.
checktype : check
Checker type to be used. By default class check is used from
check.py.
Returns
-------
None
Examples
--------
>>> import flopy
>>> m = flopy.modflow.Modflow.load('model.nam')
>>> m.dis.check()
"""
chk = None
if (
self.has_stress_period_data
and self.name[0] != "OC"
and self.package_type.upper() != "OC"
):
chk = self._check_oc(f, verbose, level, checktype)
# check property values in upw and lpf packages
elif self.name[0] in ["UPW", "LPF"] or self.package_type.upper() in [
"NPF"
]:
chk = self._check_flowp(f, verbose, level, checktype)
elif self.package_type.upper() in ["STO"]:
chk = self._get_check(f, verbose, level, checktype)
storage_coeff = self.storagecoefficient.get_data()
if storage_coeff is None:
storage_coeff = False
self._check_storage(chk, storage_coeff)
else:
txt = "check method not implemented for " + "{} Package.".format(
self.name[0]
)
if f is not None:
if isinstance(f, str):
pth = os.path.join(self.parent.model_ws, f)
f = open(pth, "w")
f.write(txt)
f.close()
if verbose:
print(txt)
return chk
def _check_storage(self, chk, storage_coeff):
# only check storage if model is transient
if not np.all(self.parent.modeltime.steady_state):
active = chk.get_active()
# do the same for storage if the model is transient
sarrays = {"ss": self.ss.array, "sy": self.sy.array}
# convert to specific for checking
if storage_coeff:
desc = (
"\r STORAGECOEFFICIENT option is "
+ "activated, storage values are read "
+ "storage coefficients"
)
chk._add_to_summary(type="Warning", desc=desc)
chk.values(
sarrays["ss"],
active & (sarrays["ss"] < 0),
"zero or negative specific storage values",
"Error",
)
self._check_thresholds(
chk,
sarrays["ss"],
active,
chk.property_threshold_values["ss"],
"specific storage",
)
# only check specific yield for convertible layers
if "laytyp" in self.__dict__:
inds = np.array(
[
True
if l > 0 or l < 0 and "THICKSTRT" in self.options
else False
for l in self.laytyp
]
)
sarrays["sy"] = sarrays["sy"][inds, :, :]
active = active[inds, :, :]
else:
iconvert = self.iconvert.array
for ishape in np.ndindex(active.shape):
if active[ishape]:
active[ishape] = (
iconvert[ishape] > 0 or iconvert[ishape] < 0
)
chk.values(
sarrays["sy"],
active & (sarrays["sy"] < 0),
"zero or negative specific yield values",
"Error",
)
self._check_thresholds(
chk,
sarrays["sy"],
active,
chk.property_threshold_values["sy"],
"specific yield",
)
class Package(PackageInterface):
"""
Base package class from which most other packages are derived.
"""
def __init__(
self,
parent,
extension="glo",
name="GLOBAL",
unit_number=1,
extra="",
filenames=None,
allowDuplicates=False,
):
"""
Package init
"""
# To be able to access the parent model object's attributes
self.parent = parent
if not isinstance(extension, list):
extension = [extension]
self.extension = []
self.file_name = []
for idx, e in enumerate(extension):
self.extension.append(e)
file_name = self.parent.name + "." + e
if filenames is not None:
if idx < len(filenames):
if filenames[idx] is not None:
file_name = filenames[idx]
self.file_name.append(file_name)
self.fn_path = os.path.join(self.parent.model_ws, self.file_name[0])
if not isinstance(name, list):
name = [name]
self._name = name
if not isinstance(unit_number, list):
unit_number = [unit_number]
self.unit_number = unit_number
if not isinstance(extra, list):
self.extra = len(self.unit_number) * [extra]
else:
self.extra = extra
self.url = "index.html"
self.allowDuplicates = allowDuplicates
self.acceptable_dtypes = [int, np.float32, str]
return
def __repr__(self):
s = self.__doc__
exclude_attributes = ["extension", "heading", "name", "parent", "url"]
for attr, value in sorted(self.__dict__.items()):
if not (attr in exclude_attributes):
if isinstance(value, list):
if len(value) == 1:
s += " {:s} = {:s}\n".format(attr, str(value[0]))
else:
s += " {:s} ".format(
attr
) + "(list, items = {:d})\n".format(len(value))
elif isinstance(value, np.ndarray):
s += " {:s} (array, shape = ".format(
attr
) + "{:s})\n".format(value.shape.__str__()[1:-1])
else:
s += (
" {:s} = ".format(attr)
+ "{:s} ".format(str(value))
+ "({:s})\n".format(str(type(value))[7:-2])
)
return s
def __getitem__(self, item):
if hasattr(self, "stress_period_data"):
# added this check because stress_period_data also used in Oc and
# Oc88 but is not a MfList
spd = getattr(self, "stress_period_data")
if isinstance(item, MfList):
if not isinstance(item, list) and not isinstance(item, tuple):
msg = (
"package.__getitem__() kper "
+ str(item)
+ " not in data.keys()"
)
assert item in list(spd.data.keys()), msg
return spd[item]
if item[1] not in self.dtype.names:
msg = (
"package.__getitem(): item "
+ str(item)
+ " not in dtype names "
+ str(self.dtype.names)
)
raise Exception(msg)
msg = (
"package.__getitem__() kper "
+ str(item[0])
+ " not in data.keys()"
)
assert item[0] in list(spd.data.keys()), msg
if spd.vtype[item[0]] == np.recarray:
return spd[item[0]][item[1]]
def __setitem__(self, key, value):
raise NotImplementedError("package.__setitem__() not implemented")
def __setattr__(self, key, value):
var_dict = vars(self)
if key in list(var_dict.keys()):
old_value = var_dict[key]
if isinstance(old_value, Util2d):
value = Util2d(
self.parent,
old_value.shape,
old_value.dtype,
value,
name=old_value.name,
fmtin=old_value.format.fortran,
locat=old_value.locat,
array_free_format=old_value.format.array_free_format,
)
elif isinstance(old_value, Util3d):
value = Util3d(
self.parent,
old_value.shape,
old_value.dtype,
value,
name=old_value.name_base,
fmtin=old_value.fmtin,
locat=old_value.locat,
array_free_format=old_value.array_free_format,
)
elif isinstance(old_value, Transient2d):
value = Transient2d(
self.parent,
old_value.shape,
old_value.dtype,
value,
name=old_value.name_base,
fmtin=old_value.fmtin,
locat=old_value.locat,
)
elif isinstance(old_value, MfList):
value = MfList(self, dtype=old_value.dtype, data=value)
elif isinstance(old_value, list):
if len(old_value) > 0:
if isinstance(old_value[0], Util3d):
new_list = []
for vo, v in zip(old_value, value):
new_list.append(
Util3d(
self.parent,
vo.shape,
vo.dtype,
v,
name=vo.name_base,
fmtin=vo.fmtin,
locat=vo.locat,
)
)
value = new_list
elif isinstance(old_value[0], Util2d):
new_list = []
for vo, v in zip(old_value, value):
new_list.append(
Util2d(
self.parent,
vo.shape,
vo.dtype,
v,
name=vo.name,
fmtin=vo.fmtin,
locat=vo.locat,
)
)
value = new_list
super().__setattr__(key, value)
@property
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = name
@property
def parent(self):
return self._parent
@parent.setter
def parent(self, parent):
self._parent = parent
@property
def package_type(self):
if len(self.name) > 0:
return self.name[0].lower()
@property
def plottable(self):
return True
@property
def data_list(self):
# return [data_object, data_object, ...]
dl = []
attrs = dir(self)
if "sr" in attrs:
attrs.remove("sr")
if "start_datetime" in attrs:
attrs.remove("start_datetime")
for attr in attrs:
if "__" in attr or "data_list" in attr:
continue
dl.append(self.__getattribute__(attr))
return dl
def export(self, f, **kwargs):
"""
Method to export a package to netcdf or shapefile based on the
extension of the file name (.shp for shapefile, .nc for netcdf)
Parameters
----------
f : str
filename
kwargs : keyword arguments
modelgrid : flopy.discretization.Grid instance
user supplied modelgrid which can be used for exporting
in lieu of the modelgrid associated with the model object
Returns
-------
None or Netcdf object
"""
from flopy import export
return export.utils.package_export(f, self, **kwargs)
@staticmethod
def add_to_dtype(dtype, field_names, field_types):
"""
Add one or more fields to a structured array data type
Parameters
----------
dtype : numpy.dtype
Input structured array datatype to add to.
field_names : str or list
One or more field names.
field_types : numpy.dtype or list
One or more data types. If one data type is supplied, it is
repeated for each field name.
"""
if not isinstance(field_names, list):
field_names = [field_names]
if not isinstance(field_types, list):
field_types = [field_types] * len(field_names)
newdtypes = dtype.descr
for field_name, field_type in zip(field_names, field_types):
newdtypes.append((str(field_name), field_type))
return np.dtype(newdtypes)
@staticmethod
def _get_sfac_columns():
"""
This should be overriden for individual packages that support an
sfac multiplier for individual list columns
"""
return []
def _confined_layer_check(self, chk):
# check for confined layers above convertible layers
confined = False
thickstrt = False
for option in self.options:
if option.lower() == "thickstrt":
thickstrt = True
for i, l in enumerate(self.laytyp.array.tolist()):
if l == 0 or l < 0 and thickstrt:
confined = True
continue
if confined and l > 0:
desc = (
"\r LAYTYP: unconfined (convertible) "
+ "layer below confined layer"
)
chk._add_to_summary(type="Warning", desc=desc)
def level1_arraylist(self, idx, v, name, txt):
ndim = v.ndim
if ndim == 3:
kon = -1
for [k, i, j] in idx:
if k > kon:
kon = k
tag = name[k].lower().replace(" layer ", "")
txt += (
" {:>10s}".format("layer")
+ "{:>10s}".format("row")
+ "{:>10s}".format("column")
+ "{:>15s}\n".format(tag)
)
txt += " {:10d}{:10d}{:10d}{:15.7g}\n".format(
k + 1, i + 1, j + 1, v[k, i, j]
)
elif ndim == 2:
tag = name[0].lower().replace(" layer ", "")
txt += (
" {:>10s}".format("row")
+ "{:>10s}".format("column")
+ "{:>15s}\n".format(tag)
)
for [i, j] in idx:
txt += " {:10d}{:10d}{:15.7g}\n".format(
i + 1, j + 1, v[i, j]
)
elif ndim == 1:
txt += " {:>10s}{:>15s}\n".format("number", name[0])
for i in idx:
txt += " {:10d}{:15.7g}\n".format(i + 1, v[i])
return txt
def plot(self, **kwargs):
"""
Plot 2-D, 3-D, transient 2-D, and stress period list (MfList)
package input data
Parameters
----------
**kwargs : dict
filename_base : str
Base file name that will be used to automatically generate file
names for output image files. Plots will be exported as image
files if file_name_base is not None. (default is None)
file_extension : str
Valid matplotlib.pyplot file extension for savefig(). Only used
if filename_base is not None. (default is 'png')
mflay : int
MODFLOW zero-based layer number to return. If None, then all
all layers will be included. (default is None)
kper : int
MODFLOW zero-based stress period number to return. (default is
zero)
key : str
MfList dictionary key. (default is None)
Returns
----------
axes : list
Empty list is returned if filename_base is not None. Otherwise
a list of matplotlib.pyplot.axis are returned.
See Also
--------
Notes
-----
Examples
--------
>>> import flopy
>>> ml = flopy.modflow.Modflow.load('test.nam')
>>> ml.dis.plot()
"""
from flopy.plot import PlotUtilities
if not self.plottable:
raise TypeError("Package {} is not plottable".format(self.name))
axes = PlotUtilities._plot_package_helper(self, **kwargs)
return axes
def to_shapefile(self, filename, **kwargs):
"""
Export 2-D, 3-D, and transient 2-D model data to shapefile (polygons).
Adds an attribute for each layer in each data array
Parameters
----------
filename : str
Shapefile name to write
Returns
----------
None
See Also
--------
Notes
-----
Examples
--------
>>> import flopy
>>> ml = flopy.modflow.Modflow.load('test.nam')
>>> ml.lpf.to_shapefile('test_hk.shp')
"""
import warnings
warnings.warn("to_shapefile() is deprecated. use .export()")
self.export(filename)
def webdoc(self):
if self.parent.version == "mf2k":
wa = (
"http://water.usgs.gov/nrp/gwsoftware/modflow2000/Guide/"
+ self.url
)
elif self.parent.version == "mf2005":
wa = (
"http://water.usgs.gov/ogw/modflow/MODFLOW-2005-Guide/"
+ self.url
)
elif self.parent.version == "ModflowNwt":
wa = (
"http://water.usgs.gov/ogw/modflow-nwt/MODFLOW-NWT-Guide/"
+ self.url
)
else:
wa = None
# open the web address
if wa is not None:
wb.open(wa)
def write_file(self, check=False):
"""
Every Package needs its own write_file function
"""
print("IMPLEMENTATION ERROR: write_file must be overloaded")
return
@staticmethod
def load(f, model, pak_type, ext_unit_dict=None, **kwargs):
"""
Default load method for standard boundary packages.
"""
# parse keywords
if "nper" in kwargs:
nper = kwargs.pop("nper")
else:
nper = None
if "unitnumber" in kwargs:
unitnumber = kwargs.pop("unitnumber")
else:
unitnumber = None
if "check" in kwargs:
check = kwargs.pop("check")
else:
check = True
# open the file if not already open
openfile = not hasattr(f, "read")
if openfile:
filename = f
f = open(filename, "r")
elif hasattr(f, "name"):
filename = f.name
else:
filename = "?"
# set string from pak_type
pak_type_str = str(pak_type).lower()
# dataset 0 -- header
while True:
line = f.readline()
if line[0] != "#":
break
# check for mfnwt version 11 option block
nwt_options = None
if model.version == "mfnwt" and "options" in line.lower():
nwt_options = OptionBlock.load_options(f, pak_type)
line = f.readline()
# check for parameters
nppak = 0
if "parameter" in line.lower():
t = line.strip().split()
nppak = int(t[1])
mxl = 0
if nppak > 0:
mxl = int(t[2])
if model.verbose:
msg = (
3 * " "
+ "Parameters detected. Number of "
+ "parameters = {}".format(nppak)
)
print(msg)
line = f.readline()
# dataset 2a
t = line.strip().split()
imax = 2
ipakcb = 0
try:
ipakcb = int(t[1])
except:
if model.verbose:
msg = 3 * " " + "implicit ipakcb in {}".format(filename)
print(msg)
if "modflowdrt" in pak_type_str:
try:
nppak = int(t[2])
imax += 1
except:
if model.verbose:
msg = 3 * " " + "implicit nppak in {}".format(filename)
print(msg)
if nppak > 0:
mxl = int(t[3])
imax += 1
if model.verbose:
msg = (
3 * " "
+ "Parameters detected. Number of "
+ "parameters = {}".format(nppak)
)
print(msg)
options = []
aux_names = []
if len(t) > imax:
it = imax
while it < len(t):
toption = t[it]
if toption.lower() == "noprint":
options.append(toption.lower())
elif "aux" in toption.lower():
options.append(" ".join(t[it : it + 2]))
aux_names.append(t[it + 1].lower())
it += 1
it += 1
# add auxillary information to nwt options
if nwt_options is not None and options:
if options[0] == "noprint":
nwt_options.noprint = True
if len(options) > 1:
nwt_options.auxillary = options[1:]
else:
nwt_options.auxillary = options
options = nwt_options
# set partype
# and read phiramp for modflow-nwt well package
partype = ["cond"]
if "modflowwel" in pak_type_str:
partype = ["flux"]
# check for "standard" single line options from mfnwt
if "nwt" in model.version.lower():
if "flopy.modflow.mfwel.modflowwel".lower() in pak_type_str:
ipos = f.tell()
line = f.readline()
# test for specify keyword if a NWT well file
if "specify" in line.lower():
nwt_options = OptionBlock(
line.lower().strip(), pak_type, block=False
)
if options:
if options[0] == "noprint":
nwt_options.noprint = True
if len(options) > 1:
nwt_options.auxillary = options[1:]
else:
nwt_options.auxillary = options
options = nwt_options
else:
f.seek(ipos)
elif "flopy.modflow.mfchd.modflowchd".lower() in pak_type_str:
partype = ["shead", "ehead"]
# get the list columns that should be scaled with sfac
sfac_columns = pak_type._get_sfac_columns()
# read parameter data
if nppak > 0:
dt = pak_type.get_empty(
1, aux_names=aux_names, structured=model.structured
).dtype
pak_parms = mfparbc.load(
f, nppak, dt, model, ext_unit_dict, model.verbose
)
if nper is None:
nrow, ncol, nlay, nper = model.get_nrow_ncol_nlay_nper()
# read data for every stress period
bnd_output = None
stress_period_data = {}
current = None
for iper in range(nper):
if model.verbose:
msg = (
" loading "
+ str(pak_type)
+ " for kper {:5d}".format(iper + 1)
)
print(msg)
line = f.readline()
if line == "":
break
t = line.strip().split()
itmp = int(t[0])
itmpp = 0
try:
itmpp = int(t[1])
except:
if model.verbose:
print(" implicit itmpp in {}".format(filename))
if itmp == 0:
bnd_output = None
current = pak_type.get_empty(
itmp, aux_names=aux_names, structured=model.structured
)
elif itmp > 0:
current = pak_type.get_empty(
itmp, aux_names=aux_names, structured=model.structured
)
current = ulstrd(
f, itmp, current, model, sfac_columns, ext_unit_dict
)
if model.structured:
current["k"] -= 1
current["i"] -= 1
current["j"] -= 1
else:
current["node"] -= 1
bnd_output = np.recarray.copy(current)
else:
if current is None:
bnd_output = None
else:
bnd_output = np.recarray.copy(current)
for iparm in range(itmpp):
line = f.readline()
t = line.strip().split()
pname = t[0].lower()
iname = "static"
try:
tn = t[1]
c = tn.lower()
instance_dict = pak_parms.bc_parms[pname][1]
if c in instance_dict:
iname = c
else:
iname = "static"
except:
if model.verbose:
print(
" implicit static instance for "
+ "parameter {}".format(pname)
)
par_dict, current_dict = pak_parms.get(pname)
data_dict = current_dict[iname]
par_current = pak_type.get_empty(
par_dict["nlst"], aux_names=aux_names
)
# get appropriate parval
if model.mfpar.pval is None:
parval = float(par_dict["parval"])
else:
try:
parval = float(model.mfpar.pval.pval_dict[pname])
except:
parval = float(par_dict["parval"])
# fill current parameter data (par_current)
for ibnd, t in enumerate(data_dict):
t = tuple(t)
par_current[ibnd] = tuple(
t[: len(par_current.dtype.names)]
)
if model.structured:
par_current["k"] -= 1
par_current["i"] -= 1
par_current["j"] -= 1
else:
par_current["node"] -= 1
for ptype in partype:
par_current[ptype] *= parval
if bnd_output is None:
bnd_output = np.recarray.copy(par_current)
else:
bnd_output = stack_arrays(
(bnd_output, par_current),
asrecarray=True,
usemask=False,
)
if bnd_output is None:
stress_period_data[iper] = itmp
else:
stress_period_data[iper] = bnd_output
dtype = pak_type.get_empty(
0, aux_names=aux_names, structured=model.structured
).dtype
if openfile:
f.close()
# set package unit number
filenames = [None, None]
if ext_unit_dict is not None:
unitnumber, filenames[0] = model.get_ext_dict_attr(
ext_unit_dict, filetype=pak_type._ftype()
)
if ipakcb > 0:
iu, filenames[1] = model.get_ext_dict_attr(
ext_unit_dict, unit=ipakcb
)
model.add_pop_key_list(ipakcb)
pak = pak_type(
model,
ipakcb=ipakcb,
stress_period_data=stress_period_data,
dtype=dtype,
options=options,
unitnumber=unitnumber,
filenames=filenames,
)
if check:
pak.check(
f="{}.chk".format(pak.name[0]),
verbose=pak.parent.verbose,
level=0,
)
return pak