gray/tests/__init__.py
2024-02-09 11:16:19 +01:00

273 lines
8.9 KiB
Python

from pathlib import Path
from typing import Any
from unittest import TestCase
import warnings
import numpy as np
import shutil
import unittest
import tempfile
import subprocess
import itertools
import argparse
class GrayTest:
inputs: Path = None # directory of the input files
reference: Path = None # directory of the reference outputs
candidate: Path = None # directory of the candidate outputs
# Extra parameters to pass to gray
gray_params: dict[str, Any] = {}
@classmethod
def setUpClass(cls):
'''
Sets up the test case
'''
# directory of the test case
base = Path().joinpath(*cls.__module__.split('.'))
if cls.inputs is None:
cls.inputs = base / 'inputs'
if cls.reference is None:
cls.reference = base / 'outputs'
# temporary directory holding the candidate outputs
cls._tempdir = tempfile.mkdtemp(prefix=f'gray-test-{base.name}.')
cls.candidate = Path(cls._tempdir)
# replace reference with candidate
if options.update:
print()
print('Setting new reference for ' + cls.__module__)
cls.candidate = cls.reference
# run gray to generate the candidate outputs
proc = run_gray(cls.inputs, cls.candidate, params=cls.gray_params,
binary=options.binary)
assert proc.returncode == 0, \
f"gray failed with exit code {proc.returncode}"
# store the stderr for manual inspection
with open(str(cls.candidate / 'log'), 'w') as log:
log.write(proc.stderr)
@classmethod
def tearDownClass(cls):
'''
Clean up after all tests
'''
# remove temporary directory
if cls._passed or not options.keep_failed:
shutil.rmtree(cls._tempdir)
else:
print()
print('Some tests failed: preserving outputs in', cls._tempdir)
def run(self, result: unittest.runner.TextTestResult):
'''
Override to store the test results for tearDownClass
'''
TestCase.run(self, result)
self.__class__._passed = result.failures == []
def test_eccd_values(self):
'''
Comparing the ECCD values
'''
from collections import defaultdict
ref = load_unit(self.reference / 'fort.7')
cand = load_unit(self.candidate / 'fort.7')
if ref.size == 0 or cand.size == 0:
raise unittest.SkipTest("ECCD results not available")
# precision as number of decimal places
prec = defaultdict(lambda: 3, [
('dPdVp', -2), ('dPdVmx', -2),
('Jphip', -2), ('Jphimx', -2),
('stmx', -1),
('chipol', -1), ('psipol', -1),
])
for val in ref.dtype.names:
with self.subTest(value=val):
for i, ray in enumerate(ref['index_rt']):
self.assertAlmostEqual(
ref[val][i], cand[val][i], prec[val],
msg=f"{val} changed (ray {int(ray)})")
def test_final_position(self):
'''
Comparing the final position of the central ray
'''
ref = load_unit(self.reference / 'fort.4')
cand = load_unit(self.candidate / 'fort.4')
# coordinates
self.assertAlmostEqual(ref['R'][-1], cand['R'][-1], 1)
self.assertAlmostEqual(ref['z'][-1], cand['z'][-1], 1)
self.assertAlmostEqual(ref['phi'][-1], cand['phi'][-1], 2)
# optical path length
self.assertAlmostEqual(ref['sst'][-1], cand['sst'][-1], 1)
def test_final_direction(self):
'''
Comparing the final direction of the central ray
'''
ref = load_unit(self.reference / 'fort.4')
cand = load_unit(self.candidate / 'fort.4')
self.assertAlmostEqual(ref['Npl'][-1], cand['Npl'][-1], 1)
self.assertAlmostEqual(ref['Nperp'][-1], cand['Nperp'][-1], 1)
def test_beam_shape(self):
'''
Comparing the final beam shape
'''
ref = load_unit(self.reference / 'fort.9')
cand = load_unit(self.candidate / 'fort.9')
if ref.size == 0 or cand.size == 0:
raise unittest.SkipTest("Beam shape info not available")
if options.visual:
import matplotlib.pyplot as plt
plt.subplot(aspect='equal')
plt.title(self.__module__ + '.test_beam_shape')
plt.xlabel('$x$ / cm')
plt.ylabel('$y$ / cm')
plt.scatter(ref['xt'], ref['yt'], c='red',
marker='_', label='reference')
plt.scatter(cand['xt'], cand['yt'], c='green',
alpha=0.6, marker='+', label='candidate')
plt.legend()
plt.show()
for ref, cand in zip(ref, cand):
with self.subTest(ray=(int(ref['j']), int(ref['k']))):
self.assertAlmostEqual(ref['xt'], cand['xt'], 1)
self.assertAlmostEqual(ref['yt'], cand['yt'], 1)
def test_error_biased(self):
'''
Test for a proportionality between Λ and any of X, Y, N∥
'''
data = load_unit(self.candidate / 'fort.4')
# restrict to within the plasma, half of the first pass
in_plasma = data['Xg'] > 0
first_pass = data['index_rt'] == data['index_rt'].min()
data = data[in_plasma & first_pass]
data = data[:int(data.size // 2)]
if data.size < 2:
self.skipTest("There is no plasma")
if options.visual:
import matplotlib.pyplot as plt
left = plt.subplot()
plt.title(self.__module__ + '.test_error_biased')
left.set_xlabel('$s$ / cm')
left.set_ylabel('$Λ$', color='xkcd:ocean blue')
left.tick_params(axis='y', labelcolor='xkcd:ocean blue')
left.plot(data['sst'], data['ddr'], color='xkcd:ocean blue')
right1 = left.twinx()
right1.set_ylabel('$X$', color='xkcd:orange')
right1.tick_params(axis='y', labelcolor='xkcd:orange')
right1.plot(data['sst'], data['Xg'], color='xkcd:orange')
right2 = left.twinx()
right2.set_ylabel('$Y$', color='xkcd:vermillion')
right2.tick_params(axis='y', labelcolor='xkcd:vermillion')
right2.plot(data['sst'], data['Yg'], color='xkcd:vermillion')
right2.spines["right"].set_position(("axes", 1.1))
right3 = left.twinx()
right3.set_ylabel('$N_∥$', color='xkcd:green')
right3.tick_params(axis='y', labelcolor='xkcd:green')
right3.spines["right"].set_position(("axes", 1.2))
right3.plot(data['sst'], data['Npl'], color='xkcd:green')
plt.subplots_adjust(right=0.78)
plt.show()
err = data['ddr'].var() / 10
self.assertGreater(err, 0, msg="Λ is exactly constant")
def χ2(k, var):
'''
Reduced χ² for the curve fit: Λ(s) = k⋅var(s)
'''
res = (data['ddr'] - k*data[var]) / err
return np.sum(res**2) / (data.size - 1)
import scipy.optimize
for var in ['Xg', 'Yg', 'Npl']:
k_best = scipy.optimize.minimize(χ2, x0=1, args=var).x[0]
with self.subTest(var=var):
self.assertGreater(χ2(k_best, var), 1)
# Command line options
options = argparse.Namespace()
def get_basedir(module: str) -> Path:
"""
Given a module name (es. tests.03-TCV) returns its
base directory as a path (es. tests/03-TCV).
"""
return Path().joinpath(*module.split('.'))
def run_gray(inputs: Path, outputs: Path,
# extra gray parameters
params: dict[str, Any] = {},
# which units to generate
units: list[int] = [4, 7, 8, 9, 48, 33, 70, 71],
# which gray binary to use
binary: str = 'gray'
) -> subprocess.CompletedProcess:
'''
Runs gray on the inputs from the `inputs` directory and storing the results
in the `outputs` directory.
'''
outputs.mkdir(exist_ok=True, parents=True)
params = [['-g', f'{k}={v}'] for k, v in params.items()]
args = [
binary,
'-p', str(inputs / 'gray_params.data'),
'-u', ','.join(map(str, units)),
'-o', str(outputs),
'-v'
] + list(itertools.chain(*params))
proc = subprocess.run(args, capture_output=True, text=True)
if proc.returncode != 0:
# show the log on errors
print(proc.stderr)
print(proc.stdout)
return proc
def load_unit(fname: Path) -> np.array:
'''
Loads a GRAY output unit file as a structured numpy array
(columns are named as in the file header)
'''
# ignore warnings about empty files
with warnings.catch_warnings():
warnings.simplefilter("ignore")
return np.genfromtxt(fname, names=True, skip_header=21, ndmin=1)