270 lines
8.8 KiB
Python
270 lines
8.8 KiB
Python
|
from pathlib import Path
|
||
|
from typing import Any
|
||
|
from unittest import TestCase
|
||
|
|
||
|
import warnings
|
||
|
import numpy as np
|
||
|
|
||
|
import shutil
|
||
|
|
||
|
import unittest
|
||
|
import tempfile
|
||
|
import subprocess
|
||
|
import itertools
|
||
|
import argparse
|
||
|
|
||
|
|
||
|
class GrayTest:
|
||
|
inputs: Path = None # directory of the input files
|
||
|
reference: Path = None # directory of the reference outputs
|
||
|
candidate: Path = None # directory of the candidate outputs
|
||
|
|
||
|
# Extra parameters to pass to gray
|
||
|
gray_params: dict[str, Any] = {}
|
||
|
|
||
|
@classmethod
|
||
|
def setUpClass(cls):
|
||
|
'''
|
||
|
Sets up the test case
|
||
|
'''
|
||
|
# directory of the test case
|
||
|
base = Path().joinpath(*cls.__module__.split('.'))
|
||
|
|
||
|
if cls.inputs is None:
|
||
|
cls.inputs = base / 'inputs'
|
||
|
if cls.reference is None:
|
||
|
cls.reference = base / 'outputs'
|
||
|
|
||
|
# temporary directory holding the candidate outputs
|
||
|
cls._tempdir = tempfile.mkdtemp(prefix=f'gray-test-{base.name}.')
|
||
|
cls.candidate = Path(cls._tempdir)
|
||
|
|
||
|
# replace reference with candidate
|
||
|
if options.update:
|
||
|
print()
|
||
|
print('Setting new reference for ' + cls.__module__)
|
||
|
cls.candidate = cls.reference
|
||
|
|
||
|
# run gray to generate the candidate outputs
|
||
|
proc = run_gray(cls.inputs, cls.candidate, params=cls.gray_params,
|
||
|
binary=options.binary)
|
||
|
assert proc.returncode == 0, \
|
||
|
f"gray failed with exit code {proc.returncode}"
|
||
|
|
||
|
# store the stderr for manual inspection
|
||
|
with open(str(cls.candidate / 'log'), 'w') as log:
|
||
|
log.write(proc.stderr)
|
||
|
|
||
|
@classmethod
|
||
|
def tearDownClass(cls):
|
||
|
'''
|
||
|
Clean up after all tests
|
||
|
'''
|
||
|
# remove temporary directory
|
||
|
if cls._passed or not options.keep_failed:
|
||
|
shutil.rmtree(cls._tempdir)
|
||
|
else:
|
||
|
print()
|
||
|
print('Some tests failed: preserving outputs in', cls._tempdir)
|
||
|
|
||
|
def run(self, result: unittest.runner.TextTestResult):
|
||
|
'''
|
||
|
Override to store the test results for tearDownClass
|
||
|
'''
|
||
|
TestCase.run(self, result)
|
||
|
self.__class__._passed = result.failures == []
|
||
|
|
||
|
def test_eccd_values(self):
|
||
|
'''
|
||
|
Comparing the ECCD values
|
||
|
'''
|
||
|
from collections import defaultdict
|
||
|
|
||
|
ref = load_unit(self.reference / 'fort.7')
|
||
|
cand = load_unit(self.candidate / 'fort.7')
|
||
|
|
||
|
# precision as number of decimal places
|
||
|
prec = defaultdict(lambda: 3, [
|
||
|
('dPdVp', -2), ('dPdVmx', -2),
|
||
|
('Jphip', -2), ('Jphimx', -2),
|
||
|
('stmx', -1),
|
||
|
('chipol', -1), ('psipol', -1),
|
||
|
])
|
||
|
|
||
|
for val in ref.dtype.names:
|
||
|
with self.subTest(value=val):
|
||
|
for i, ray in enumerate(ref['index_rt']):
|
||
|
self.assertAlmostEqual(
|
||
|
ref[val][i], cand[val][i], prec[val],
|
||
|
msg=f"{val} changed (ray {int(ray)})")
|
||
|
|
||
|
def test_final_position(self):
|
||
|
'''
|
||
|
Comparing the final position of the central ray
|
||
|
'''
|
||
|
ref = load_unit(self.reference / 'fort.4')
|
||
|
cand = load_unit(self.candidate / 'fort.4')
|
||
|
|
||
|
# coordinates
|
||
|
self.assertAlmostEqual(ref['R'][-1], cand['R'][-1], 1)
|
||
|
self.assertAlmostEqual(ref['z'][-1], cand['z'][-1], 1)
|
||
|
self.assertAlmostEqual(ref['phi'][-1], cand['phi'][-1], 2)
|
||
|
|
||
|
# optical path length
|
||
|
self.assertAlmostEqual(ref['sst'][-1], cand['sst'][-1], 1)
|
||
|
|
||
|
def test_final_direction(self):
|
||
|
'''
|
||
|
Comparing the final direction of the central ray
|
||
|
'''
|
||
|
ref = load_unit(self.reference / 'fort.4')
|
||
|
cand = load_unit(self.candidate / 'fort.4')
|
||
|
|
||
|
self.assertAlmostEqual(ref['Npl'][-1], cand['Npl'][-1], 1)
|
||
|
self.assertAlmostEqual(ref['Nperp'][-1], cand['Nperp'][-1], 1)
|
||
|
|
||
|
def test_beam_shape(self):
|
||
|
'''
|
||
|
Comparing the final beam shape
|
||
|
'''
|
||
|
ref = load_unit(self.reference / 'fort.9')
|
||
|
cand = load_unit(self.candidate / 'fort.9')
|
||
|
if ref.size == 0 or cand.size == 0:
|
||
|
raise unittest.SkipTest("Beam shape info not available")
|
||
|
|
||
|
if options.visual:
|
||
|
import matplotlib.pyplot as plt
|
||
|
|
||
|
plt.subplot(aspect='equal')
|
||
|
plt.title(self.__module__ + '.test_beam_shape')
|
||
|
plt.xlabel('$x$ / cm')
|
||
|
plt.ylabel('$y$ / cm')
|
||
|
plt.scatter(ref['xt'], ref['yt'], c='red',
|
||
|
marker='_', label='reference')
|
||
|
plt.scatter(cand['xt'], cand['yt'], c='green',
|
||
|
alpha=0.6, marker='+', label='candidate')
|
||
|
plt.legend()
|
||
|
plt.show()
|
||
|
|
||
|
for ref, cand in zip(ref, cand):
|
||
|
with self.subTest(ray=(int(ref['j']), int(ref['k']))):
|
||
|
self.assertAlmostEqual(ref['xt'], cand['xt'], 1)
|
||
|
self.assertAlmostEqual(ref['yt'], cand['yt'], 1)
|
||
|
|
||
|
def test_error_biased(self):
|
||
|
'''
|
||
|
Test for a proportionality between Λ and any of X, Y, N∥
|
||
|
'''
|
||
|
|
||
|
data = load_unit(self.candidate / 'fort.4')
|
||
|
|
||
|
# restrict to within the plasma, half of the first pass
|
||
|
in_plasma = data['Xg'] > 0
|
||
|
first_pass = data['index_rt'] == data['index_rt'].min()
|
||
|
data = data[in_plasma & first_pass]
|
||
|
data = data[:int(data.size // 2)]
|
||
|
|
||
|
if data.size < 2:
|
||
|
self.skipTest("There is no plasma")
|
||
|
|
||
|
if options.visual:
|
||
|
import matplotlib.pyplot as plt
|
||
|
|
||
|
left = plt.subplot()
|
||
|
plt.title(self.__module__ + '.test_error_biased')
|
||
|
left.set_xlabel('$s$ / cm')
|
||
|
left.set_ylabel('$Λ$', color='xkcd:ocean blue')
|
||
|
left.tick_params(axis='y', labelcolor='xkcd:ocean blue')
|
||
|
left.plot(data['sst'], data['ddr'], color='xkcd:ocean blue')
|
||
|
|
||
|
right1 = left.twinx()
|
||
|
right1.set_ylabel('$X$', color='xkcd:orange')
|
||
|
right1.tick_params(axis='y', labelcolor='xkcd:orange')
|
||
|
right1.plot(data['sst'], data['Xg'], color='xkcd:orange')
|
||
|
|
||
|
right2 = left.twinx()
|
||
|
right2.set_ylabel('$Y$', color='xkcd:vermillion')
|
||
|
right2.tick_params(axis='y', labelcolor='xkcd:vermillion')
|
||
|
right2.plot(data['sst'], data['Yg'], color='xkcd:vermillion')
|
||
|
right2.spines["right"].set_position(("axes", 1.1))
|
||
|
|
||
|
right3 = left.twinx()
|
||
|
right3.set_ylabel('$N_∥$', color='xkcd:green')
|
||
|
right3.tick_params(axis='y', labelcolor='xkcd:green')
|
||
|
right3.spines["right"].set_position(("axes", 1.2))
|
||
|
right3.plot(data['sst'], data['Npl'], color='xkcd:green')
|
||
|
|
||
|
plt.subplots_adjust(right=0.78)
|
||
|
plt.show()
|
||
|
|
||
|
err = data['ddr'].var() / 10
|
||
|
self.assertGreater(err, 0, msg="Λ is exactly constant")
|
||
|
|
||
|
def χ2(k, var):
|
||
|
'''
|
||
|
Reduced χ² for the curve fit: Λ(s) = k⋅var(s)
|
||
|
'''
|
||
|
res = (data['ddr'] - k*data[var]) / err
|
||
|
return np.sum(res**2) / (data.size - 1)
|
||
|
|
||
|
import scipy.optimize
|
||
|
|
||
|
for var in ['Xg', 'Yg', 'Npl']:
|
||
|
k_best = scipy.optimize.minimize(χ2, x0=1, args=var).x[0]
|
||
|
with self.subTest(var=var):
|
||
|
self.assertGreater(χ2(k_best, var), 1)
|
||
|
|
||
|
|
||
|
# Command line options
|
||
|
options = argparse.Namespace()
|
||
|
|
||
|
|
||
|
def get_basedir(module: str) -> Path:
|
||
|
"""
|
||
|
Given a module name (es. tests.03-TCV) returns its
|
||
|
base directory as a path (es. tests/03-TCV).
|
||
|
"""
|
||
|
return Path().joinpath(*module.split('.'))
|
||
|
|
||
|
|
||
|
def run_gray(inputs: Path, outputs: Path,
|
||
|
# extra gray parameters
|
||
|
params: dict[str, Any] = {},
|
||
|
# which units to generate
|
||
|
units: list[int] = [4, 7, 8, 9, 48, 33, 70, 71],
|
||
|
# which gray binary to use
|
||
|
binary: str = 'gray'
|
||
|
) -> subprocess.CompletedProcess:
|
||
|
'''
|
||
|
Runs gray on the inputs from the `inputs` directory and storing the results
|
||
|
in the `outputs` directory.
|
||
|
'''
|
||
|
outputs.mkdir(exist_ok=True, parents=True)
|
||
|
|
||
|
params = [['-g', f'{k}={v}'] for k, v in params.items()]
|
||
|
args = [
|
||
|
binary,
|
||
|
'-p', str(inputs / 'gray_params.data'),
|
||
|
'-u', ','.join(map(str, units)),
|
||
|
'-o', str(outputs),
|
||
|
'-v'
|
||
|
] + list(itertools.chain(*params))
|
||
|
proc = subprocess.run(args, capture_output=True, text=True)
|
||
|
|
||
|
if proc.returncode != 0:
|
||
|
# show the log on errors
|
||
|
print(proc.stderr)
|
||
|
print(proc.stdout)
|
||
|
return proc
|
||
|
|
||
|
|
||
|
def load_unit(fname: Path) -> np.array:
|
||
|
'''
|
||
|
Loads a GRAY output unit file as a structured numpy array
|
||
|
(columns are named as in the file header)
|
||
|
'''
|
||
|
# ignore warnings about empty files
|
||
|
with warnings.catch_warnings():
|
||
|
warnings.simplefilter("ignore")
|
||
|
return np.genfromtxt(fname, names=True, skip_header=21, ndmin=1)
|