import subprocess
import tempfile
import os
import time
from arch_xml_modification import *
from file_handling import *
from script_editing import *
from printing import print_verbose
import shutil
import sys
and2_blif_path = "/benchmarks/and2/and2.blif"
[docs]
def run_command_in_temp_dir(command, original_dir, handle_error=True, verbose=False):
"""
Run a command in a temporary directory.
Args:
command (list): Command to execute
original_dir (str): Original working directory
Returns:
subprocess.CompletedProcess: Result of the command execution
"""
if verbose:
print_verbose("Running Command: ")
command_string = " ".join(command)
print_verbose(f"{command_string}")
# Create a temporary directory
with tempfile.TemporaryDirectory() as temporary_dir:
try:
# Change to the temporary directory
os.chdir(temporary_dir)
# Execute the command
result = subprocess.run(command,
capture_output=True,
text=True,
check=True)
if verbose:
print_verbose("Command output:")
print_verbose(result.stdout)
return result
except subprocess.CalledProcessError as e:
if verbose:
print_verbose(f"Command failed with error code {e.returncode}")
print_verbose("Error output:")
print_verbose(e.stdout)
print_verbose(e.stderr)
if handle_error:
raise
except FileNotFoundError:
print_verbose(os.getcwd())
print_verbose("Command not found. Please check the path and permissions.")
raise
finally:
# Always change back to the original directory
os.chdir(original_dir)
[docs]
def run_task(original_dir, temp_dir=""):
run_command = ["python3",
original_dir + "/OpenFPGA/openfpga_flow/scripts/run_fpga_task.py",
temp_dir]
run_command_in_temp_dir(run_command, original_dir, handle_error=False, verbose=False)
[docs]
def run_flow(original_dir, width, height, channel_width, benchmark_name="", temp_dir ="", type_sb="full", arch_path="", rrg_3d_path="", percent_connectivity=0.5, place_algorithm="cube_bb", connection_type="subset", run_num=1, output_additional_info=""):
print_verbose(f"Temp Dir for benchmark {benchmark_name}: {temp_dir}")
start_time = time.time()
run_task(original_dir, temp_dir)
end_time = time.time()
run_time = (end_time - start_time) * 1000
print_verbose(f"Running OpenFPGA task for benchmark {benchmark_name} took {run_time:0.2f} ms")
# Now copy results into somewhere else maybe?
# Feels a bit sketch should automate it so the reuslts are auto placed into nice named area but oh well
task_result_path = "/run001/task_result.csv"
results_path = "/results/3d_" + type_sb + "_cw_" + output_file_name(channel_width=channel_width, width=width, height=height, percent_connectivity=percent_connectivity, place_algorithm=place_algorithm, connection_type=connection_type, run_num=run_num, additional_info=output_additional_info) + "/"
result_file_name = benchmark_name + "_results_cw_" + output_file_name(channel_width=channel_width, width=width, height=height, percent_connectivity=percent_connectivity, place_algorithm=place_algorithm, connection_type=connection_type, run_num=run_num, additional_info=output_additional_info) + ".csv"
print_verbose(f"Trying to copy for benchmark {benchmark_name} the results to {original_dir + results_path + result_file_name} from {temp_dir + task_result_path}")
# Make sure the results directory exists
os.makedirs(original_dir + "/results", exist_ok=True)
if os.path.exists(temp_dir + task_result_path):
start_time = time.time()
copy_results(original_dir, task_result_path, results_path, result_file_name, temp_dir)
end_time = time.time()
run_time = (end_time - start_time) * 1000
print_verbose(f"Copying the results for benchmark {benchmark_name} to {original_dir + results_path + result_file_name} took {run_time:0.2f} ms")
else:
start_time = time.time()
generate_empty_results(original_dir, results_path, result_file_name, benchmark_name)
end_time = time.time()
run_time = (end_time - start_time) * 1000
print_verbose(f"Generating Empty Results file and writing to {original_dir + results_path + result_file_name} took {run_time:0.2f} ms")
[docs]
def setup_flow(original_dir, width, height, channel_width, type_sb="full", percent_connectivity=0.5, place_algorithm="cube_bb", is_verilog_benchmarks=False, connection_type="subset", arch_file="", random_seed=1, run_num=1, extra_vpr_options="", output_additional_info="", temp_dir="", vertical_connectivity=1, sb_switch_name="", sb_segment_name="", sb_input_pattern=[], sb_output_pattern=[], sb_location_pattern="repeated_interval", sb_grid_csv_path="", vertical_delay_ratio=1, sb_3d_switch_name="3D_SB_switch", base_delay_switch="", switch_interlayer_pairs={}, update_arch_delay=False):
# Copy the task directory to the temp directory and work on it from there
shutil.copytree(original_dir + "/task", temp_dir + "/task", dirs_exist_ok=True)
# copy template script to designs
# TODO: This is a bit sketchy, should make it more robust in the future to allow for multiple arch runs at the same time, no hard coding values
command = ["cp", temp_dir + "/task/config_templates/bitstream_script_template.openfpga", temp_dir + "/task/designs/bitstream_script.openfpga"]
run_command_in_temp_dir(command, original_dir)
script_path ="/designs/bitstream_script.openfpga"
# Set the base arch file and output directory based on the type of switch block
# Base arch layout will be modified to have the correct dimensions and then saved to the output directory
if type_sb == "3d_cb":
arch_base_file = original_dir + "/arch_files/templates/basic/vtr_3d_cb_arch.xml"
arch_output_dir = original_dir + "/arch_files/3d_cb_arch/"
elif type_sb == "2d":
arch_base_file = original_dir + "/arch_files/templates/basic/vtr_2d_arch.xml"
arch_output_dir = original_dir + "/arch_files/2d_arch/"
elif type_sb == "3d_cb_out_only":
arch_base_file = original_dir + "/arch_files/templates/basic/vtr_3d_cb_out_only_arch.xml"
arch_output_dir = original_dir + "/arch_files/3d_cb_arch/"
elif type_sb == "hybrid_cb":
arch_base_file = original_dir + "/arch_files/templates/basic/vtr_3d_cb_arch.xml"
arch_output_dir = original_dir + "/arch_files/3d_cb_arch/vtr_3d_hybrid_cb_arch_" + str(width) + "x" + str(height) + ".xml"
elif type_sb == "hybrid_cb_out":
arch_base_file = original_dir + "/arch_files/templates/basic/vtr_3d_cb_out_only_arch.xml"
arch_output_dir = original_dir + "/arch_files/3d_cb_arch/vtr_3d_hybrid_cb_out_arch_" + str(width) + "x" + str(height) + ".xml"
else:
arch_base_file = original_dir + "/arch_files/templates/basic/vtr_arch.xml"
arch_output_dir = original_dir + "/arch_files/3d_arch/"
if arch_file != "":
arch_base_file = arch_file
arch_output_dir = original_dir + "/arch_files/"
delay_ratio_string = ""
if update_arch_delay:
delay_ratio_string = "_delay_ratio_" + str(vertical_delay_ratio)
arch_output_file_name = os.path.splitext(os.path.basename(arch_base_file))[0] + "_" + str(width) + "x" + str(height) + delay_ratio_string + ".xml"
arch_output_file_path = arch_output_dir + arch_output_file_name
# Check if the modified Arch XML already exists, if not make it
if not os.path.exists(original_dir + arch_output_file_path):
start_time = time.time()
tree, root = load_xml(arch_base_file)
end_time = time.time()
run_time = (end_time - start_time) * 1000
print_verbose(f"Loading Base Arch XML took {run_time:0.2f} ms")
start_time = time.time()
set_fixed_layout_dimensions(root, width=width, height=height)
if update_arch_delay:
update_vertical_delay_ratio(root, vertical_delay_ratio=vertical_delay_ratio, sb_3d_switch_name=sb_3d_switch_name, base_delay_switch=base_delay_switch, switch_interlayer_pairs=switch_interlayer_pairs)
end_time = time.time()
run_time = (end_time - start_time) * 1000
print_verbose(f"Modifiying Base Arch XML took {run_time:0.2f} ms")
start_time = time.time()
save_xml(tree, arch_output_file_path)
end_time = time.time()
run_time = (end_time - start_time) * 1000
print_verbose(f"Saving the modified Arch XML took {run_time:0.2f} ms")
else:
print_verbose(f"Modified Arch XML previously generated")
# copy the modified arch file into the task
# TODO: This is a bit sketchy, should make it more robust in the future to allow for multiple arch runs at the same time, no hard coding values
command = ["cp", arch_output_file_path, temp_dir + "/task/designs/vtr_arch.xml"]
run_command_in_temp_dir(command, original_dir)
relative_arch_path = os.path.relpath(arch_output_file_path, original_dir)
relative_arch_path = "/" + relative_arch_path
start_time = time.time()
append_cw_to_script(temp_dir + "/task" + script_path, str(channel_width))
end_time = time.time()
run_time = (end_time - start_time) * 1000
print_verbose(f"Adding Channel Width to execution script took {run_time:0.2f} ms")
start_time = time.time()
append_place_algorithm_to_script(temp_dir + "/task" + script_path, place_algorithm)
end_time = time.time()
run_time = (end_time - start_time) * 1000
print_verbose(f"Adding Placement Algorithm to execution script took {run_time:0.2f} ms")
rrg_path = "/base_rrg/rrg_cw_" + str(channel_width) + "_" + arch_output_file_name
vertical_connectivity_string = "vp_" + str(vertical_connectivity) + "_"
if vertical_connectivity == 1:
vertical_connectivity_string = ""
sb_pattern_string = ""
# if the connection type is custom, add its pattern to the path
if connection_type == "custom":
if sb_input_pattern != []:
sb_input_pattern = [str(x) for x in sb_input_pattern]
sb_pattern_string = "_input_" + "_".join(sb_input_pattern)
else:
print("ERROR: No input pattern provided for custom connection type.")
exit(1)
if sb_output_pattern != []:
sb_output_pattern = [str(x) for x in sb_output_pattern]
sb_pattern_string += "_output_" + "_".join(sb_output_pattern)
else:
print("ERROR: No output pattern provided for custom connection type.")
exit(1)
# if the location pattern is custom, add its pattern to the path
if sb_location_pattern != "repeated_interval":
if sb_location_pattern == "custom":
if sb_grid_csv_path == "":
print("ERROR: No custom SB grid CSV path provided.")
exit(1)
else:
sb_pattern_string += "_" + sb_location_pattern + "_" + os.path.splitext(os.path.basename(sb_grid_csv_path))[0]
else:
sb_pattern_string += "_" + sb_location_pattern
rrg_3d_path = "/rrg_3d/rrg_3d_" + type_sb + "_cw_" + str(channel_width) + "_" + str(int(percent_connectivity * 100)) + "percent_" + connection_type + sb_pattern_string + "_" + vertical_connectivity_string + arch_output_file_name
# if base rrg does not exist, create it (AKA run VTR)
if not os.path.exists(original_dir + rrg_path):
start_time = time.time()
create_base_rrg(original_dir, relative_arch_path, channel_width=channel_width, path_to_write_rrg=rrg_path)
end_time = time.time()
run_time = (end_time - start_time) * 1000
print_verbose(f"Generating Base RRG took {run_time:0.2f} ms")
else:
print_verbose(f"Base RRG previously generated at {original_dir + rrg_path}")
# if 3d rrg does not exist, create it
if not os.path.exists(original_dir + rrg_3d_path) and type_sb != "3d_cb" and type_sb != "2d" and type_sb != "3d_cb_out_only":
start_time = time.time()
create_custom_3d_rrg(rrg_path, rrg_3d_path, original_dir, percent_connectivity, connection_type, arch_file=arch_base_file, vertical_connectivity=vertical_connectivity, sb_switch_name=sb_switch_name, sb_segment_name=sb_segment_name, sb_input_pattern=sb_input_pattern, sb_output_pattern=sb_output_pattern, sb_location_pattern=sb_location_pattern, sb_grid_csv_path=sb_grid_csv_path)
end_time = time.time()
run_time = (end_time - start_time) * 1000
print_verbose(f"Generating 3D RRG from Base RRG took {run_time:0.2f} ms")
else:
if type_sb == "3d_cb" or type_sb == "2d" or type_sb == "3d_cb_out_only":
print_verbose(f"3D RRG not generated since type {type_sb} does not need a custom RRG, using base RRG")
else:
print_verbose(f"3D RRG previously generated at {original_dir + rrg_3d_path}")
if type_sb == "3d_cb" or type_sb == "2d" or type_sb == "3d_cb_out_only":
# Copy Base RRG path into task script (only need base since 3D SBs are not used)
append_rrg_to_script(temp_dir + "/task" + script_path, original_dir + rrg_path)
else:
# Copy 3D RRG path into task script
append_rrg_to_script(temp_dir + "/task" + script_path, original_dir + rrg_3d_path)
if is_verilog_benchmarks:
# TODO: This is a bit sketchy, should make it more robust in the future to allow for multiple arch runs at the same time, no hard coding values
command = ["cp", temp_dir + "/task/config_templates/verilog_task.conf", temp_dir + "/task/config/task.conf"]
run_command_in_temp_dir(command, original_dir)
else:
# TODO: This is a bit sketchy, should make it more robust in the future to allow for multiple arch runs at the same time, no hard coding values
command = ["cp", temp_dir + "/task/config_templates/blif_task.conf", temp_dir + "/task/config/task.conf"]
run_command_in_temp_dir(command, original_dir)
#append random seed to script
append_random_seed_to_script(temp_dir + "/task" + script_path, random_seed)
#append extra vpr options to script
if extra_vpr_options != "":
append_extra_vpr_option_to_script(temp_dir + "/task" + script_path, extra_vpr_options)
# Make tasks_run directory
#Output folder name based on parameters and time of run
curr_time = time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime())
folder_name = "3d_" + type_sb + "_cw_" + output_file_name(channel_width=channel_width, width=width, height=height, percent_connectivity=percent_connectivity, place_algorithm=place_algorithm, connection_type=connection_type, run_num=run_num, additional_info=output_additional_info) + "_" + curr_time
os.makedirs(original_dir + "/tasks_run/" + folder_name, exist_ok=True)
return original_dir + "/tasks_run/" + folder_name
[docs]
def create_base_rrg(original_dir:str, path_to_arch:str, channel_width=2, path_to_write_rrg="/base_rrg/rr_graph.xml", path_to_benchmark=and2_blif_path):
# Make sure the output directory exists
os.makedirs(os.path.dirname(original_dir + path_to_write_rrg), exist_ok=True)
command = [original_dir + "/OpenFPGA/build/vtr-verilog-to-routing/vpr/vpr",
original_dir + path_to_arch,
original_dir + path_to_benchmark,
"--route_chan_width",
str(channel_width),
"--write_rr_graph",
original_dir + path_to_write_rrg,
"--clock_modeling",
"route",
"--pack"]
print_verbose(f"Creating Base RRG with command: {' '.join(command)}")
run_command_in_temp_dir(command, original_dir, handle_error=False, verbose=False)
[docs]
def create_custom_3d_rrg(base_arch_path, output_file_path, original_dir, percent_connectivity=0.5, connection_type="subset", arch_file ="", vertical_connectivity=1, sb_switch_name="", sb_segment_name="", sb_input_pattern=[], sb_output_pattern=[], sb_location_pattern="repeated_interval", sb_grid_csv_path=""):
# Make sure the output directory exists
os.makedirs(os.path.dirname(original_dir + output_file_path), exist_ok=True)
command = ["python3",
original_dir + "/scripts/3d_sb_creator.py",
"-f", original_dir + base_arch_path,
"-o", original_dir + output_file_path,
"-p", str(percent_connectivity),
"-c", connection_type,
"-a", arch_file,
"-vp", str(vertical_connectivity),
"--sb_location_pattern", sb_location_pattern,]
sb_grid_csv_string = ""
if sb_location_pattern == "custom":
if sb_grid_csv_path == "":
print("ERROR: No custom SB grid CSV path provided.")
exit(1)
else:
command.append("--sb_grid_csv")
command.append(sb_grid_csv_path)
sb_input_pattern_string = ""
sb_output_pattern_string = ""
if connection_type == "custom":
if sb_input_pattern != []:
sb_input_pattern = [str(x) for x in sb_input_pattern]
command.append("--sb_input_pattern")
for x in sb_input_pattern:
command.append(str(x))
else:
print("ERROR: No input pattern provided for custom connection type.")
exit(1)
if sb_output_pattern != []:
sb_output_pattern = [str(x) for x in sb_output_pattern]
command.append("--sb_output_pattern")
for x in sb_output_pattern:
command.append(str(x))
else:
print("ERROR: No output pattern provided for custom connection type.")
exit(1)
if sb_switch_name != "":
command.append("--sb_3d_switch")
command.append(sb_switch_name)
if sb_segment_name != "":
command.append("--sb_3d_segment")
command.append(sb_segment_name)
run_command_in_temp_dir(command, original_dir, verbose=True)
[docs]
def copy_results(original_dir, task_result_path, results_path, result_name, temp_dir=""):
os.makedirs(os.path.dirname(original_dir + results_path), exist_ok=True)
command = ["cp",
temp_dir + task_result_path,
original_dir + results_path + result_name]
run_command_in_temp_dir(command, original_dir)