Source code for pyH2A.Utilities.plugin_input_output_processing

from pathlib import Path

from pyH2A.Utilities.input_modification import insert, convert_input_to_dictionary, check_for_meta_module, import_plugin, merge, parse_parameter
from pyH2A.Discounted_Cash_Flow import Discounted_Cash_Flow

[docs]def is_parameter_or_output(line, spaces_for_tab = 4, spaces_cutoff = 5): '''Detection of parameters and output values in line based on presence of more than `spaces_cuttoff` spaces (tabs are converted to four spaces). ''' line = line.replace('\t', ' ' * spaces_for_tab) # replace tabs with given number of spaces return line[:spaces_cutoff] != ' ' * spaces_cutoff
[docs]def process_single_line(line, output_dict, origin, variable_string, **kwargs): '''Process single line to extract parameter/output information and comments''' if is_parameter_or_output(line, **kwargs): # is a line containing parameter/output information complete_string = line.split(':') variable_string = complete_string[0].strip(', \t') variable_type = complete_string[1].strip(', \t') output_dict[variable_string] = {'Type': variable_type, 'Origin': origin} return variable_string else: # is a line containing comment for parameter/output comment = line.strip(' \t') bottom_key = parse_parameter(variable_string)[-1] comment_string = f'Comment {bottom_key}' if comment_string in output_dict[variable_string]: output_dict[variable_string][comment_string] += ' ' + comment else: output_dict[variable_string][comment_string] = comment return variable_string
[docs]def extract_input_output_from_docstring(target, **kwargs): '''Convert docstring to structured dictionary.''' doc_string = target.__doc__.split('\n') parameters_dict = {} output_dict = {} parameters = False output = False variable_string = None for counter, line in enumerate(doc_string): if '---' in line and 'Parameters' in doc_string[counter-1]: parameters = True if '---' in line and 'Returns' in doc_string[counter-1]: parameters = False output = True if line.strip(' \t') == '': parameters = False output = False if parameters and '---' not in line: variable_string = process_single_line(line, parameters_dict, target.__name__, variable_string, **kwargs) if output and '---' not in line: variable_string = process_single_line(line, output_dict, target.__name__, variable_string, **kwargs) plugin_dict = {'Parameters': parameters_dict, 'Output': output_dict} return plugin_dict
[docs]def convert_inp_to_requirements(dictionary, path = None): '''Convert inp dictionary structure to requirements dictionary structure.''' output = {} for top_key, top_item in dictionary.items(): for middle_key, middle_item in top_item.items(): for bottom_key, bottom_item in middle_item.items(): path = f'{top_key} > {middle_key} > {bottom_key}' output[path] = {'Entry': bottom_item, 'Origin': 'Input'} return output
[docs]class Generate_Template_Input_File: '''Generate input file template from a minimal input file. Parameters ---------- input_file_stub : str Path to input file containing workflow and analysis specifications. output_file : str Path to file where input template is to be written. origin : bool, optional Include origin of each requested input parameter in input template file ("requested by" information). comment : bool, optional Include comments for each requested input parameter (additional information on parameter). Returns ------- Template : object Template object which contains information on requirements and output. Input template is written to specified output file. ''' def __init__(self, input_file_stub, output_file, origin = False, comment = False): if isinstance(input_file_stub, str): self.inp_stub = convert_input_to_dictionary(input_file_stub) else: self.inp_stub = input_file_stub self.inp = {} post_workflow_position = self.get_post_workflow_position() pre_workflow = {'Description': 'Functions executed before workflow.', 'Position': -1, 'Type': 'function'} post_workflow = {'Description': 'Function executed after workflow.', 'Position': post_workflow_position, 'Type': 'function'} self.inp_stub['Workflow']['pre_workflow'] = pre_workflow self.inp_stub['Workflow']['post_workflow'] = post_workflow self.get_analysis_modules(post_workflow_position) self.sorted_keys = sorted(self.inp_stub['Workflow'], key = lambda x: self.inp_stub['Workflow'][x]['Position']) self.requirements = {} self.output = {} self.provided_inp = convert_inp_to_requirements(self.inp_stub) self.generate_requirements() self.convert_requirements_to_inp(insert_origin = origin, insert_comment = comment) self.inp = merge(self.inp, convert_input_to_dictionary(input_file_stub, merge_default = False)) template_file = Template_File(self.inp) template_file.write_template_file(output_file) def get_post_workflow_position(self): max_position = max(item['Position'] for item in self.inp_stub['Workflow'].values()) return max_position + 1
[docs] def get_analysis_modules(self, post_workflow_position): '''Get analysis modules from input stub. ''' position = post_workflow_position for key in self.inp_stub: if check_for_meta_module(key): position += 1 module = {'Description': 'meta module', 'Position': position, 'Type': 'analysis'} self.inp_stub['Workflow'][key] = module
[docs] def generate_requirements(self): '''Generate dictionary with input requirements. ''' output = self.provided_inp requirements = {} for key in self.sorted_keys: data = self.get_docstring_data(key, self.inp_stub['Workflow'][key]['Type']) needed_parameters = self.check_parameters(data['Parameters'], output) requirements = merge(requirements, needed_parameters) output = merge(output, data['Output']) self.requirements = requirements
[docs] def get_docstring_data(self, target_name, target_type): '''Get parameter requirements and outputs from docstrings. ''' if target_type == 'function': target = getattr(Discounted_Cash_Flow, target_name) data = extract_input_output_from_docstring(target, spaces_cutoff = 9) else: if target_type == 'plugin': plugin_module = True else: plugin_module = False target = import_plugin(target_name, plugin_module) data = extract_input_output_from_docstring(target) return data
[docs] def check_parameters(self, data, output): '''Check if needed parameter is in output. ''' requirements = {} for key, item in data.items(): if key not in output: requirements[key] = item else: item['Fullfilled by'] = output[key]['Origin'] try: if output[key]['Type'] != item['Type']: print('Warning: type of output and requirement differs for: `{0}`. `{1}` required, `{2}` provided'.format(key, item['Type'], output[key]['Type'])) except KeyError: pass return requirements
[docs] def convert_requirements_to_inp(self, insert_origin = False, insert_comment = False): '''Convert dictionary of requirements to formatted self.inp ''' for key, item in self.requirements.items(): path = parse_parameter(key) path = ['[...]' if x == '' else x for x in path] # replacing '>>' with '> [...] >' insert(self, *path, item['Type'], None, print_info = False, add_processed = False, insert_path = False) if insert_origin: insert(self, *path[:-1], 'Requested by', item['Origin'], None, print_info = False, add_processed = False, insert_path = False) if insert_comment: for key in item: if 'Comment' in key: insert(self, *path[:-1], key, item[key], None, print_info = False, add_processed = False, insert_path = False)
[docs]class Template_File: '''Generate input file from inp dictionary. Parameters ---------- inp : dict Dictionary containing information on requested input (generated by `Generate_Template_Input_File`) Returns ------- Template_File : object Object containing formatted string for output file. ''' def __init__(self, inp): self.inp = inp self.convert_inp_to_string()
[docs] def convert_inp_to_string(self): '''Convert inp to string.''' output = '' for top_key, top_item in self.inp.items(): output += f'# {top_key}' columns = list(top_item) column_names = self.get_column_names(top_item) output += '\n\n' + self.convert_column_names_to_string(column_names) output += '\n' + '--- | ' * (len(column_names) - 1) + '---' output += self.get_row_entries(columns, column_names, top_item) self.output = output
[docs] def get_column_names(self, dictionary): '''Get names of table columns.''' column_names_array = [['Parameter']] for key, item in dictionary.items(): column_names_array.append(list(item)) column_names_flat_list = [item for sublist in column_names_array for item in sublist] column_names = list(dict.fromkeys(column_names_flat_list)) return column_names
[docs] def convert_column_names_to_string(self, names): '''Convert list of names to markdown style table string.''' string = '' for name in names[:-1]: string += f'{name} | ' string += names[-1] return string
[docs] def get_row_entries(self, columns, column_names, dictionary): '''Get entries for each row of table.''' string = '' for column in columns: string += f'\n{column} | ' string += self.get_single_row(column_names, column, dictionary) string += '\n\n' return string
[docs] def get_single_row(self, column_names, column, dictionary): '''Get entries for a single row.''' string = '' for counter, name in enumerate(column_names[1:]): try: entry = dictionary[column][name] except KeyError: entry = 'n/a' string += str(entry) if counter < len(column_names[1:]) - 1: string += ' | ' return string
[docs] def write_template_file(self, file_name): '''Write output string to file.''' with open(Path(file_name), 'w') as text_file: text_file.write(self.output)