"""
Script runs ACCURATE pipeline, converted from Denis's Bash script.
Author: Denis Schenk, ARTORG Center for Biomedical Engineering Research, SITEM Insel, University of Bern
Maintained by: Simone Poncioni, ARTORG Center for Biomedical Engineering Research, SITEM Insel, University of Bern
Date: April 2021
Latest update: 16.11.2023
UPDATES:
- Updated to run multiple simulations independently in parallel (POS)
"""
import logging
import os
import sys
from pathlib import Path
from shutil import move
from time import time
import hfe_abq.aim2fe as aim2fe
import hfe_abq.simulation as simulation
import hfe_accurate.postprocessing as postprocessing
import hfe_utils.imutils as imutils
import hfe_utils.print_optim_report as por
import numpy as np
import yaml
from hfe_utils.f_decomposition import decomposition_to_vtu
from hfe_utils.io_utils import print_mem_usage, write_timing_summary
from hfe_utils.odb2vtk_wrapper import Odb2VtkWrapper
os.environ["NUMEXPR_MAX_THREADS"] = "16"
LOGGING_NAME = "HFE-ACCURATE"
logger = logging.getLogger(LOGGING_NAME)
logger.propagate = False
# flake8: noqa: E402, W503
"""
# TODO: reactivate this for mesh sensitivity analysis
# n_sim = int(15) # has to match sweep in config
# min = 5, 5, 2, 7
# max = 20, 50, 10, 50 did not work, reducing to 20, 40, 10, 40
n_elms_longitudinal = np.linspace(1, 20, n_sim, dtype=int)
n_elms_transverse_trab = np.linspace(3, 50, n_sim, dtype=int)
n_elms_transverse_cort = np.linspace(1, 10, n_sim, dtype=int)
n_radial = np.linspace(3, 50, n_sim, dtype=int)
# update meshing settings with sweep factor for sensitivity analysis
sweep = cfg.meshing_settings.sweep_factor
cfg.meshing_settings.n_elms_longitudinal = int(
n_elms_longitudinal[sweep - 1].item()
)
cfg.meshing_settings.n_elms_transverse_trab = int(
n_elms_transverse_trab[sweep - 1].item()
)
cfg.meshing_settings.n_elms_transverse_cort = int(
n_elms_transverse_cort[sweep - 1].item()
)
cfg.meshing_settings.n_elms_radial = int(n_radial[sweep - 1].item())
"""
[docs]
def pipeline_hfe(cfg, folder_id, grayscale_filename):
"""
Executes the homogenized finite elements (HFE) pipeline for a given sample.
Args:
cfg: Configuration object containing all necessary settings.
folder_id (str): Identifier for the folder containing the sample data.
grayscale_filename (str): Filename of the grayscale image to be processed.
Returns:
tuple: A tuple containing the time record dictionary and the summary path.
"""
# timing
time_record = {}
start_full = time()
start_sample = time()
print_mem_usage()
# Sets paths
workdir = cfg.socket_paths.workdir
feadir = Path(workdir, cfg.paths.feadir)
umat = Path(workdir, cfg.abaqus.umat)
sumdir = Path(workdir, cfg.paths.sumdir)
sumdir.mkdir(parents=True, exist_ok=True)
feadir.mkdir(parents=True, exist_ok=True)
current_version = cfg.version.current_version
sampledir = Path(feadir) / cfg.simulations.folder_id[grayscale_filename]
inputfilename = f"{grayscale_filename}.inp".format(
grayscale_filename, current_version
)
inputfile = sampledir / inputfilename
sampledir.mkdir(parents=True, exist_ok=True)
bone = {}
bone, abq_inp_path = aim2fe.aim2fe_psl(cfg, grayscale_filename)
if cfg.image_processing.BVTVd_comparison is True:
bone = imutils.compute_bvtv_d_seg(bone, grayscale_filename)
if cfg.mesher.meshing == "spline":
inputfile = str(abq_inp_path.resolve())
# 3.2) FZ_MAX
cogs_dict = bone["cogs"]
DIMZ = 0.0
for arr in cogs_dict.values():
for arr2 in arr.values():
if arr2[-1] > DIMZ:
DIMZ = arr2[-1]
# create_loadcases.create_loadcase_fz_max(cfg, grayscale_filename, "FZ_MAX")
start_simulation = time()
try:
odb_path = simulation.simulate_loadcase(
cfg, grayscale_filename, inputfile, umat, ""
)
end_simulation = time()
except Exception:
logger.error("Simulation of FZ_MAX loadcase resulted in error")
logger.error(sys.stderr)
end_simulation = time()
pass
else:
end_simulation = time()
time_record["simulation"] = end_simulation - start_simulation
optim = {}
optim = postprocessing.datfilereader_psl(cfg, grayscale_filename, optim, "FZ_MAX")
# timing
end_sample = time()
optim["processing_time"] = end_sample - start_sample
time_record[grayscale_filename] = end_sample - start_sample
path2dat = Path(inputfile).parent / (
grayscale_filename + "_" + cfg.version.current_version[0:2] + ".dat"
)
thickness = max(val[2] for val in bone["nodes"].values()) - min(
val[2] for val in bone["nodes"].values()
)
optim = por.compute_optim_report_variables(optim, path2dat, thickness)
bone = por.compute_bone_report_variables_no_psl(bone)
# only for sensitivity analysis
mesh_parameters_dict = {
"n_elms_longitudinal": cfg.meshing_settings.n_elms_longitudinal,
"n_elms_transverse_trab": cfg.meshing_settings.n_elms_transverse_trab,
"n_elms_transverse_cort": cfg.meshing_settings.n_elms_transverse_cort,
"n_elms_radial": cfg.meshing_settings.n_elms_radial,
}
postprocessing.write_data_summary(
cfg,
optim,
bone,
grayscale_filename,
mesh_parameters_dict,
DOFs=bone["degrees_of_freedom"],
time_sim=time_record[grayscale_filename],
)
if cfg.strain_localisation.strain_analysis is True:
odb2vtkpath = cfg.socket_paths.odb2vtk
odb_path = odb_path
abq_path = cfg.solver.abaqus
odb2vtk_wrapper = Odb2VtkWrapper(
odb2vtkpath, odb_path, abq_path, only_last_frame=True
)
vtk_path = odb2vtk_wrapper.convert()
print(f"ODB to VTK file written to {vtk_path}")
decomposition_to_vtu(vtk_path)
if cfg.abaqus.delete_odb is True:
odbfilename = "{}_FZ_MAX_{}.odb".format(
grayscale_filename, current_version[0:2]
)
odbfile = os.path.join(feadir, folder_id, odbfilename)
os.remove(odbfile)
sampledir_parent = Path(sampledir).parent
try:
# move whole content of feadir to sampledir except subdirectories
for file in os.listdir(sampledir_parent):
if os.path.isfile(os.path.join(sampledir_parent, file)):
move(os.path.join(sampledir_parent, file), sampledir)
except Exception:
current_time = time.strftime("%Y%m%d-%H%M%S")
child_dir_time_path = (
Path(sampledir) / f"simulation_current_time_{current_time}"
)
child_dir_time_path.mkdir(parents=True, exist_ok=True)
logger.info(
f"File in this location already exists, moving to {child_dir_time_path}"
)
for file in os.listdir(sampledir_parent):
if os.path.isfile(os.path.join(sampledir_parent, file)):
move(os.path.join(sampledir_parent, file), child_dir_time_path)
end_full = time()
time_record["full"] = end_full - start_full
summary_path = Path(
sumdir / str(grayscale_filename + "_V_" + current_version + "_summary.txt")
)
print(yaml.dump(time_record, default_flow_style=False))
write_timing_summary(cfg, grayscale_filename, time_record)
return time_record, summary_path