flopy/autotest/t039_test.py

386 lines
12 KiB
Python

"""
Test zonbud utility
"""
import os
import numpy as np
from flopy.utils import CellBudgetFile, ZoneBudget, \
MfListBudget, read_zbarray, write_zbarray
loadpth = os.path.join('..', 'examples', 'data', 'zonbud_examples')
outpth = os.path.join('temp', 't039')
cbc_f = os.path.join(loadpth, 'freyberg.gitcbc')
zon_f = os.path.join(loadpth, 'zonef_mlt.zbr')
zbud_f = os.path.join(loadpth, 'freyberg_mlt.csv')
if not os.path.isdir(outpth):
os.makedirs(outpth)
def read_zonebudget_file(fname):
with open(fname, 'r') as f:
lines = f.readlines()
rows = []
for line in lines:
items = line.split(',')
# Read time step information for this block
if "Time Step" in line:
kstp, kper, totim = int(items[1]) - 1, int(items[3]) - 1, \
float(items[5])
continue
# Get names of zones
elif 'ZONE' in items[1]:
zonenames = [i.strip() for i in items[1:-1]]
zonenames = ['_'.join(z.split()) for z in zonenames]
continue
# Set flow direction flag--inflow
elif 'IN' in items[1]:
flow_dir = 'FROM'
continue
# Set flow direction flag--outflow
elif 'OUT' in items[1]:
flow_dir = 'TO'
continue
# Get mass-balance information for this block
elif 'Total' in items[0] or 'IN-OUT' in items[0] or 'Percent Error' in \
items[0]:
continue
# End of block
elif items[0] == '' and items[1] == '\n':
continue
record = '{}_'.format(flow_dir) + '_'.join(items[0].strip().split())
if record.startswith(('FROM_', 'TO_')):
record = '_'.join(record.split('_')[1:])
vals = [float(i) for i in items[1:-1]]
row = (totim, kstp, kper, record,) + tuple(v for v in vals)
rows.append(row)
dtype_list = [('totim', float),
('time_step', int),
('stress_period', int),
('name', '<U50')] + [(z, '<f8') for z in zonenames]
dtype = np.dtype(dtype_list)
return np.array(rows, dtype=dtype)
def test_compare2zonebudget(rtol=1e-2):
"""
t039 Compare output from zonbud.exe to the budget calculated by zonbud
utility using the multilayer transient freyberg model.
"""
zba = read_zonebudget_file(zbud_f)
zonenames = [n for n in zba.dtype.names if 'ZONE' in n]
times = np.unique(zba['totim'])
zon = read_zbarray(zon_f)
zb = ZoneBudget(cbc_f, zon, totim=times, verbose=False)
fpa = zb.get_budget()
for time in times:
zb_arr = zba[zba['totim'] == time]
fp_arr = fpa[fpa['totim'] == time]
for name in fp_arr['name']:
r1 = np.where((zb_arr['name'] == name))
r2 = np.where((fp_arr['name'] == name))
if r1[0].shape[0] < 1 or r2[0].shape[0] < 1:
continue
if r1[0].shape[0] != r2[0].shape[0]:
continue
a1 = np.array([v for v in zb_arr[zonenames][r1[0]][0]])
a2 = np.array([v for v in fp_arr[zonenames][r2[0]][0]])
allclose = np.allclose(a1, a2, rtol)
mxdiff = np.abs(a1 - a2).max()
idxloc = np.argmax(np.abs(a1 - a2))
# txt = '{}: {} - Max: {} a1: {} a2: {}'.format(time,
# name,
# mxdiff,
# a1[idxloc],
# a2[idxloc])
# print(txt)
s = 'Zonebudget arrays do not match at time {0} ({1}): {2}.' \
.format(time, name, mxdiff)
assert allclose, s
return
# def test_compare2mflist_mlt(rtol=1e-2):
#
# loadpth = os.path.join('..', 'examples', 'data', 'zonbud_examples', 'freyberg_mlt')
#
# list_f = os.path.join(loadpth, 'freyberg.list')
# mflistbud = MfListBudget(list_f)
# print(help(mflistbud))
# mflistrecs = mflistbud.get_data(idx=-1, incremental=True)
# print(repr(mflistrecs))
#
# zon = np.ones((3, 40, 20), dtype=int)
# cbc_fname = os.path.join(loadpth, 'freyberg.cbc')
# kstp, kper = CellBudgetFile(cbc_fname).get_kstpkper()[-1]
# zb = ZoneBudget(cbc_fname, zon, kstpkper=(kstp, kper))
# zbrecs = zb.get_budget()
# print(repr(zbrecs))
# return
def test_zonbud_get_record_names():
"""
t039 Test zonbud get_record_names method
"""
zon = read_zbarray(zon_f)
zb = ZoneBudget(cbc_f, zon, kstpkper=(0, 0))
recnames = zb.get_record_names()
assert len(recnames) > 0, 'No record names returned.'
recnames = zb.get_record_names(stripped=True)
assert len(recnames) > 0, 'No record names returned.'
return
def test_zonbud_aliases():
"""
t039 Test zonbud aliases
"""
zon = read_zbarray(zon_f)
aliases = {1: 'Trey', 2: 'Mike', 4: 'Wilson', 0: 'Carini'}
zb = ZoneBudget(cbc_f, zon, kstpkper=(0, 1096), aliases=aliases, verbose=True)
bud = zb.get_budget()
assert bud[bud['name'] == 'FROM_Mike'].shape[0] > 0, 'No records returned.'
return
def test_zonbud_to_csv():
"""
t039 Test zonbud export to csv file method
"""
zon = read_zbarray(zon_f)
zb = ZoneBudget(cbc_f, zon, kstpkper=[(0, 1094), (0, 1096)])
f_out = os.path.join(outpth, 'test.csv')
zb.to_csv(f_out)
with open(f_out, 'r') as f:
lines = f.readlines()
assert len(lines) > 0, 'No data written to csv file.'
return
def test_zonbud_math():
"""
t039 Test zonbud math methods
"""
zon = read_zbarray(zon_f)
cmd = ZoneBudget(cbc_f, zon, kstpkper=(0, 1096))
cmd / 35.3147
cmd * 12.
cmd + 1e6
cmd - 1e6
return
def test_zonbud_copy():
"""
t039 Test zonbud copy
"""
zon = read_zbarray(zon_f)
cfd = ZoneBudget(cbc_f, zon, kstpkper=(0, 1096))
cfd2 = cfd.copy()
assert cfd is not cfd2, 'Copied object is a shallow copy.'
return
def test_zonbud_readwrite_zbarray():
"""
t039 Test zonbud read write
"""
x = np.random.randint(100, 200, size=(5, 150, 200))
write_zbarray(os.path.join(outpth, 'randint'), x)
write_zbarray(os.path.join(outpth, 'randint'), x, fmtin=35, iprn=2)
z = read_zbarray(os.path.join(outpth, 'randint'))
assert np.array_equal(x, z), 'Input and output arrays do not match.'
return
def test_dataframes():
try:
import pandas
zon = read_zbarray(zon_f)
cmd = ZoneBudget(cbc_f, zon, totim=1095.)
df = cmd.get_dataframes()
assert len(df) > 0, 'Output DataFrames empty.'
except ImportError as e:
print('Skipping DataFrames test, pandas not installed.')
print(e)
return
def test_get_budget():
zon = read_zbarray(zon_f)
aliases = {1: 'Trey', 2: 'Mike', 4: 'Wilson', 0: 'Carini'}
zb = ZoneBudget(cbc_f, zon, kstpkper=(0, 0), aliases=aliases)
zb.get_budget(names='FROM_CONSTANT_HEAD', zones=1)
zb.get_budget(names=['FROM_CONSTANT_HEAD'], zones=[1, 2])
zb.get_budget(net=True)
return
def test_get_model_shape():
ZoneBudget(cbc_f, read_zbarray(zon_f), kstpkper=(0, 0), verbose=True).get_model_shape()
return
def test_zonebudget_output_to_netcdf():
from flopy.utils import HeadFile, ZoneBudgetOutput
from flopy.modflow import Modflow
from flopy.mf6 import MFSimulation
from flopy.export.utils import output_helper
model_ws = os.path.join("..", "examples", "data",
"freyberg_multilayer_transient")
zb_ws = os.path.join("..", "examples", "data", "zonbud_examples")
hds = "freyberg.hds"
nam = "freyberg.nam"
zon = "zonef_mlt.zbr"
hds = HeadFile(os.path.join(model_ws, hds))
ml = Modflow.load(nam, model_ws=model_ws)
zone_array = read_zbarray(os.path.join(zb_ws, zon))
# test with standard zonebudget output
zbout = "freyberg_mlt.txt"
ncf_name = zbout + ".nc"
zb = ZoneBudgetOutput(os.path.join(zb_ws, zbout), ml.dis, zone_array)
vdf = zb.volumetric_flux()
netobj = zb.dataframe_to_netcdf_fmt(vdf, flux=False)
export_dict = {"hds": hds,
"zonebud": netobj}
output_helper(os.path.join(outpth, ncf_name), ml, export_dict)
# test with zonebudget csv 1 output
zbout = "freyberg_mlt.1.csv"
ncf_name = zbout + ".nc"
zb = ZoneBudgetOutput(os.path.join(zb_ws, zbout), ml.dis, zone_array)
netobj = zb.dataframe_to_netcdf_fmt(zb.dataframe)
export_dict = {"hds": hds,
"zonebud": netobj}
output_helper(os.path.join(outpth, ncf_name), ml, export_dict)
# test with zonebudget csv 2 output
zbout = "freyberg_mlt.2.csv"
ncf_name = zbout + ".nc"
zb = ZoneBudgetOutput(os.path.join(zb_ws, zbout), ml.dis, zone_array)
vdf = zb.volumetric_flux(extrapolate_kper=True)
netobj = zb.dataframe_to_netcdf_fmt(vdf, flux=False)
export_dict = {"hds": hds,
"zonebud": netobj}
output_helper(os.path.join(outpth, ncf_name), ml, export_dict)
# test built in export function
zbout = "freyberg_mlt.2.csv"
ncf_name = zbout + ".bi1" + ".nc"
zb = ZoneBudgetOutput(os.path.join(zb_ws, zbout), ml.dis, zone_array)
zb.export(os.path.join(outpth, ncf_name), ml)
# test built in export function with NetCdf output object
zbout = "freyberg_mlt.2.csv"
ncf_name = zbout + ".bi2" + ".nc"
zb = ZoneBudgetOutput(os.path.join(zb_ws, zbout), ml.dis, zone_array)
export_dict = {"hds": hds}
ncfobj = output_helper(os.path.join(outpth, ncf_name), ml, export_dict)
zb.export(ncfobj, ml)
# test with modflow6/zonebudget6
sim_ws = os.path.join("..", "examples", 'data',
'mf6', 'test005_advgw_tidal')
hds = "advgw_tidal.hds"
nam = "mfsim"
zon = "zonebudget6.csv"
ncf_name = zon + ".nc"
zone_array = np.ones((3, 15, 10), dtype=int)
zone_array = np.add.accumulate(zone_array, axis=0)
sim = MFSimulation.load(nam, sim_ws=sim_ws, exe_name='mf6')
sim.set_sim_path(outpth)
sim.write_simulation()
sim.run_simulation()
hds = HeadFile(os.path.join(outpth, hds))
ml = sim.get_model("gwf_1")
zb = ZoneBudgetOutput(os.path.join(zb_ws, zon), sim.tdis, zone_array)
vdf = zb.volumetric_flux()
netobj = zb.dataframe_to_netcdf_fmt(vdf, flux=False)
export_dict = {"hds": hds,
"zbud": netobj}
output_helper(os.path.join(outpth, ncf_name), ml, export_dict)
def test_zonbud_active_areas_zone_zero(rtol=1e-2):
try:
import pandas as pd
except ImportError:
'Pandas is not available'
# Read ZoneBudget executable output and reformat
zbud_f = os.path.join(loadpth, 'zonef_mlt_active_zone_0.2.csv')
zbud = pd.read_csv(zbud_f)
zbud.columns = [c.strip() for c in zbud.columns]
zbud.columns = ['_'.join(c.split()) for c in zbud.columns]
zbud.index = pd.Index(['ZONE_{}'.format(z) for z in zbud.ZONE.values],
name='name')
cols = [c for c in zbud.columns if 'ZONE_' in c]
zbud = zbud[cols]
# Run ZoneBudget utility and reformat output
zon_f = os.path.join(loadpth, 'zonef_mlt_active_zone_0.zbr')
zon = read_zbarray(zon_f)
zb = ZoneBudget(cbc_f, zon, kstpkper=(0, 1096))
fpbud = zb.get_dataframes().reset_index()
fpbud = fpbud[['name'] + [c for c in fpbud.columns if 'ZONE' in c]]
fpbud = fpbud.set_index('name').T
fpbud = fpbud[[c for c in fpbud.columns if 'ZONE' in c]]
fpbud = fpbud.loc[['ZONE_{}'.format(z) for z in range(1, 4)]]
# Test for equality
allclose = np.allclose(zbud, fpbud, rtol)
s = 'Zonebudget arrays do not match.'
assert allclose, s
return
if __name__ == '__main__':
# test_compare2mflist_mlt()
test_compare2zonebudget()
test_zonbud_aliases()
test_zonbud_to_csv()
test_zonbud_math()
test_zonbud_copy()
test_zonbud_readwrite_zbarray()
test_zonbud_get_record_names()
test_dataframes()
test_get_budget()
test_get_model_shape()
test_zonebudget_output_to_netcdf()
test_zonbud_active_areas_zone_zero()