diff --git a/bubbln.py b/bubbln.py index 1eb1e06..a9d79a9 100644 --- a/bubbln.py +++ b/bubbln.py @@ -1,20 +1,13 @@ import os import pickle -from ansible_runner import run -from tenacity import retry, stop_after_attempt, wait_random_exponential -from input_validation import validate_inputs -from playbook_generation import generate_playbook -from interface_configuration import get_router_interface_configuration +from chatGPT_prompting import validate_openai_key +from parameter_input import parameter_input_function import openai -from fpdf import FPDF import time -import textwrap -import shutil from colorama import init -from pygments import highlight -from pygments.lexers import get_lexer_by_name -from pygments.formatters import TerminalFormatter -import psutil +from cryptography.fernet import Fernet +import getpass +from utility import animate_message, print_line, load_user_config, save_user_config, encrypt_key, decrypt_key # Initialize colorama init() @@ -32,200 +25,50 @@ # Move to the execution folder os.chdir(execution_folder) - -# Function to format the Network configuration report file generated during the program execution -class PDF(FPDF): - def header(self): - self.set_font('Arial', 'B', 12) - self.cell(0, 10, 'Network Configuration Report', 0, 1, 'C') - - def chapter_title(self, title): - self.set_font('Arial', 'B', 14) - self.cell(0, 10, title, 0, 1, 'L') - self.ln(5) - - def chapter_body(self, content): - self.set_font('Arial', '', 12) - self.multi_cell(0, 10, content) - self.ln(10) - - -# Function for animating the welcome message -def animate_message(message): - for char in message: - print(char, end='', flush=True) - # time.sleep(0.008) - # print() - - -# Function to get the terminal width -def get_terminal_width(): - try: - _, columns = shutil.get_terminal_size() - return columns - except: - return 80 - - -# Function to print a line with "=" based on the terminal width -def print_line(): - width = get_terminal_width() - print("=" * 80) - - -# Welcome message animation and logic to decide if it should be displayed, i.e display logic message only at -# first-time execution of the program. Do not display it afterwards. -welcome_message = ("\n\nWelcome. My name is Bubbln. I was developed to aid a research on the potentials of automating " - "a Network using Large Language Models (Specifically, ChatGPT). This research is being undertaken " - "by Olasupo Okunaiya as part of his Masters degree project on the CMP7200 course in the School of " - "Computing and Digital Technology at Birmingham City University. The project is being supervised " - "by Associate Professor Ron Austin (https://www.linkedin.com/in/ronaustin1]) during the period " - "June 2023 to 28th September, 2023. The research will be written up and submitted for assessment " - "in September, 2023 and may be used for external publication for a further 12 months. Thank you. " - "\n\n- Olasupo Okunaiya (linkedin.com/in/olasupoo)\n\n") -wrapped_welcome_message = textwrap.fill(welcome_message, width=get_terminal_width()) -existing_config_file = os.path.join(current_path, "user_config.pkl") - -if os.path.exists(existing_config_file): - with open(existing_config_file, "rb") as file: - user_config = pickle.load(file) - show_full_message = user_config.get("show_full_message", False) -else: - user_config = {} - show_full_message = False - -if not show_full_message: - print_line() - print("\n\t\t----------THIS IS A ONE-TIME WELCOME MESSAGE----------\n") - animate_message(welcome_message) - animate_message( - "\nThis is an experiment to assess the potential of using chatGPT to generate ansible playbooks in network " - "automation\n\n") - - print_line() - user_config["show_full_message"] = True - with open(existing_config_file, "wb") as file: - pickle.dump(user_config, file) - - -# This is the function to execute the generated playbooks which were dynamically stored. -@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(10)) -def execute_playbook(playbook_file, model_name, router_number, text_before_code, text_after_code, conversation_history): - try: - while True: - with open(playbook_file, "r") as file: - playbook_content = file.read() - - print(f"\n\nGenerated Playbook for Router {router_number} has been Saved To: {playbook_file}\n") - print_blue(text_before_code) - print() - print_colored_code(playbook_content) - print() - print_blue(text_after_code) - print() - - r = run( - playbook=playbook_file, - inventory=os.path.join(current_path, 'hosts.yml'), - private_data_dir=current_path, - quiet=False - ) - print() - return playbook_content - - except Exception as e: - print("Error executing Ansible playbook:", e) - - -# This function validates a user's input to ensure it is of integer value e.g OSPF Area value inputted by a user must -# be an integer -def get_positive_integer_input(prompte): - while True: - try: - value = int(input(prompte)) - if value > 0: - return value - else: - animate_message("Sorry, that is an invalid input! Please enter a positive integer\n") - except ValueError: - animate_message("Sorry, that is an invalid input! Please enter a valid integer\n") - - -# This function saves the router parameters inputted by a user for easy reloading in future execution of the program -def save_configuration(configurations): - config_file = os.path.join(current_path, "router_configurations.pkl") - with open(config_file, "wb") as file: - pickle.dump(configurations, file) - - -# This function is responsible for reloading saved router parameters when prompted during execution of the program -def load_configuration(): - config_file = os.path.join(current_path, "router_configurations.pkl") - if os.path.exists(config_file): - with open(config_file, "rb") as file: - return pickle.load(file) +user_config = load_user_config() + +def welcome_message_feature(): + # Welcome message animation and logic to decide if it should be displayed, i.e display logic message only at + # first-time execution of the program. Do not display it afterwards. + welcome_message = ( + "\n\nWelcome. My name is Bubbln. I was developed to aid a research on the potentials of automating " + "a Network using Large Language Models (Specifically, ChatGPT). This research is being undertaken " + "by Olasupo Okunaiya as part of his Masters degree project on the CMP7200 course in the School of " + "Computing and Digital Technology at Birmingham City University. The project is being supervised " + "by Associate Professor Ron Austin (https://www.linkedin.com/in/ronaustin1]) during the period " + "June 2023 to 28th September, 2023. The research will be written up and submitted for assessment " + "in September, 2023 and may be used for external publication for a further 12 months. Thank you. " + "\n\n- Olasupo Okunaiya (linkedin.com/in/olasupoo)\n\n") + + existing_config_file = os.path.join(current_path, "user_config.pkl") + + # Load or create user configuration + if os.path.exists(existing_config_file): + with open(existing_config_file, "rb") as file: + user_config = pickle.load(file) + show_full_message = user_config.get("show_full_message", False) else: - return None - - -# This function generates an animated blue colored font. -def print_blue(text): - animate_message("\033[94m{}\033[0m".format(text)) - - -# This function ensures that playbooks in the stored playbook files and reports are formatted in accordance to yaml -# rules -def print_colored_code(code): - lexer = get_lexer_by_name("yaml") # Set the appropriate lexer for the code language - formatter = TerminalFormatter() - colored_code = highlight(code, lexer, formatter) - print(colored_code) - - -# This function outputs a summary of the execution time for key tasks -def print_summary(input_router_duration, generate_playbooks_duration, execute_playbooks_duration, summary_duration): - print_blue("\n\n********** Below is a Summary of How Long It Took Us To Setup Your Network **********") - print_blue("\n1. We spent {:.2f} seconds to input the various router parameters.".format(input_router_duration)) - print_blue( - "\n2. The process for generating all the playbooks took {:.2f} seconds.".format(generate_playbooks_duration)) - print_blue("\n3. We spent {:.2f} seconds to execute all the generated playbooks".format(execute_playbooks_duration)) - # print_blue("\n4. And to generate and execute playbook for adjacency checks took us {:.2f} seconds".format( - # check_adjacency_duration)) - print_blue("\n4. In Total, it has taken {:.2f} seconds to automate your network from start to finish".format( - summary_duration)) - - -# This function is executed to run a ping test and track the status whether failed or successful -def execute_ping_playbook(playbook_file, inventory_file): - try: - # Run the Ansible playbook for pinging prefixes - r = run( - playbook=playbook_file, - inventory=inventory_file, - private_data_dir=current_path, - quiet=False - ) - - ping_result_dict = {} - - # Check if the playbook execution was successful and stdout is available - if r.rc == 0 and r.stdout: - ping_result_dict['success'] = True - ping_result_dict['output'] = r.stdout.decode('utf-8') # Convert bytes to string - else: - ping_result_dict['success'] = False - ping_result_dict[ - 'error'] = f"Error executing Ansible playbook for ping prefixes: {r.stderr.decode('utf-8')}" - - return ping_result_dict - except Exception as e: - print("Error executing Ansible playbook for ping prefixes:", e) - return {'success': False, 'error': str(e)} - + user_config = {} + show_full_message = False + + # Display welcome message if required + if not show_full_message: + print_line() + print("\n\t\t----------THIS IS A ONE-TIME WELCOME MESSAGE----------\n") + animate_message(welcome_message) + animate_message( + "\nThis is an experiment to assess the potential of using chatGPT to generate ansible playbooks in network " + "automation\n\n") + + print_line() + user_config["show_full_message"] = True + with open(existing_config_file, "wb") as file: + pickle.dump(user_config, file) # This is the main function of the program responsible for taking user inputs, forming prompts and calling other # functions def main(): + welcome_message_feature() elapsed_time = 0 program_start_time = time.time() # Useful to track program execution time @@ -236,13 +79,102 @@ def main(): user_config = pickle.load(file) else: user_config = {} + openai_key = user_config.get("openai_key") + if not openai_key: + print("\n\n\t\t==========This is a One-time Requirement==========\n\n") + while True: + openai_key = getpass.getpass("Please enter your OpenAI API Key: ") + if validate_openai_key(openai_key): + break + else: + print("Invalid API key! You can obtain a key from your OpenAI account.") + + # Generate or load Fernet key + fernet_key = user_config.get("fernet_key") + if fernet_key: + cipher_suite = Fernet(fernet_key) + else: + fernet_key = Fernet.generate_key() + cipher_suite = Fernet(fernet_key) + user_config["fernet_key"] = fernet_key + + encrypted_openai_key = encrypt_key(openai_key, cipher_suite) + user_config["openai_key"] = encrypted_openai_key + save_user_config(user_config) + else: + # Load Fernet key + fernet_key = user_config.get("fernet_key") + cipher_suite = Fernet(fernet_key) + openai_key = decrypt_key(openai_key, cipher_suite) + + openai.api_key = openai_key + + # Function to read SSH IP addresses from a file + def read_ssh_ips(filename): + with open(filename, 'r') as file: + ssh_ips = [line.strip() for line in file.readlines() if line.strip()] + return ssh_ips + + # Read SSH IP addresses from a file + ssh_file = os.path.join(current_path, 'ssh_ip_addresses.txt') + ssh_ips = read_ssh_ips(ssh_file) + + # Initialize an empty dictionary to store router details + router_details = {} + + # Assign SSH IP addresses to routers + for i in range(len(ssh_ips)): + router_details[f"R{i + 1}"] = {"ansible_host": ssh_ips[i]} + + # Load or create fernet key + fernet_key = user_config.get("fernet_key") + if fernet_key: + cipher_suite = Fernet(fernet_key) + else: + key = Fernet.generate_key() + cipher_suite = Fernet(key) + user_config["fernet_key"] = key + + # Prompt user for username and password + while True: + try: + username = input("Please input the SSH username: ") + password = getpass.getpass("Please input the SSH password: ") + + # Encrypt username and password + + encrypted_username = cipher_suite.encrypt(username.encode()) + encrypted_password = cipher_suite.encrypt(password.encode()) + user_config["ansible_user"] = encrypted_username + user_config["ansible_ssh_pass"] = encrypted_password + + # Update the vars section of the hosts.yml file with encrypted credentials + host_file = os.path.join(current_path, '.hosts.yml') + + with open(host_file, 'w') as hosts_file: + # Write router details + hosts_file.write("routers:\n hosts:\n") + for router, details in router_details.items(): + hosts_file.write(f" {router}:\n") + hosts_file.write(f" ansible_host: {details['ansible_host']}\n") + hosts_file.write(f" vars:\n") + hosts_file.write(f" ansible_connection: network_cli\n") + hosts_file.write(f" ansible_network_os: ios\n") + hosts_file.write(f" ansible_action_warnings: False\n") + hosts_file.write(f" ansible_user: {encrypted_username}\n") + hosts_file.write(f" ansible_ssh_pass: {encrypted_password}\n") + + # Successfully got username and password, break out of loop + break + + except Exception as e: + print("An error occurred while processing your input:", e) + print("Please try again.") - openai_key = "Insert Your OpenAI key here" # You need to register to openai.com to obtain an API key. - # The inventory file is loaded inventory_file = user_config.get("inventory_file") if not inventory_file: - inventory_file = 'hosts.yml' + inventory_file = '.hosts.yml' user_config["inventory_file"] = inventory_file with open(config_file, "wb") as file: @@ -251,270 +183,11 @@ def main(): openai.api_key = openai_key model_name = "gpt-4" - # Logic to get router parameters from user - while True: - animate_message("\n\n\t\t\tLet's Begin Setup of Your Network") - print("\n\t\t\t===================================\n\n") - - input_param_start_time = time.time() - - interface_configurations = load_configuration() - if interface_configurations is None: - interface_configurations = [] - router_count = get_positive_integer_input("How many routers do you want to configure: ") - for i in range(router_count): - print(f"\n\nWe will now take the configuration parameters for Router {i + 1} : \n\n") - router_interface_count = get_positive_integer_input(f"How many interfaces are to be configured?: ") - protocol_count = get_positive_integer_input(f"And how many protocols: ") - interface_configurations.append( - get_router_interface_configuration(protocol_count, router_interface_count)) - - save_configuration(interface_configurations) - else: - - response = input("\nWe found a previous configuration. Would you like to load it? (y/n): ") - # response = 'y' - if response.lower() == "n": - interface_configurations = [] - router_count = get_positive_integer_input("\nHow many routers do you want to configure: ") - for i in range(router_count): - print(f"\n\nWe will now take the configuration parameters for Router {i + 1} : \n\n") - router_interface_count = get_positive_integer_input( - f"How many interfaces are to be configured?: ") - protocol_count = get_positive_integer_input(f"...And how many protocols?: ") - interface_configurations.append( - get_router_interface_configuration(protocol_count, router_interface_count)) - - save_configuration(interface_configurations) - - os.system('clear') - - print("\n\n**********Please review and validate your inputs**********") - - if validate_inputs(interface_configurations): - input_param_end_time = time.time() - input_param_duration = input_param_end_time - input_param_start_time - - # Create a PDF object - pdf = PDF() - pdf.set_auto_page_break(auto=True, margin=15) - - # Add a chapter for router configurations - pdf.add_page() - pdf.chapter_title("Router Configurations") - pdf.chapter_body("Explanation and details about router configurations.") - - generate_playbooks_start_time = time.time() # Start time for generating playbooks - - # Logic to generate prompts and save them. - for i, router_config in enumerate(interface_configurations): - - conversation_history = [] - # Add a subheading for the current router - pdf.set_font('Arial', 'B', 12) - pdf.cell(0, 10, f"Router {i + 1} Configurations", 0, 1, 'L') - pdf.ln(5) - - prompt = ("Requirements: Strictly adhere to the following explicitly stated requirements; Write a " - "simple Ansible playbook with separate tasks for each protocol and interface configurations " - "with the following details;") - prompt += f" hosts: R{i + 1}" - prompt += " Do not worry about the inventory file;" - prompt += " Ensure each Task is named;" - prompt += " Never provide explanations for the generated playbook;" - prompt += " Do not use variables and templates to generate the playbooks;" - prompt += ("Ensure all generated playbooks adhere to yaml's rule of always starting a playbook with " - "`---` and ending the playbook with a new line containing `...`;") - prompt += " Always use ios_config module and ensure unsupported parameters are not generated;;" - prompt += " Use `parents` argument to implement stanzas;" - # prompt += " When configuring interfaces, ensure interface-type and port numbers are combined. e.g - # loopback 90, should be loopback90;" - prompt += ("when configuring interfaces, ensure you generate codes for only provided interfaces and " - "always implement 'No Shutdown' for each interface;") - prompt += ("when configuring routing protocols, ensure you generate codes for only provided protocols " - "and that the protocol is initialized only under the parents argument using the format " - "`router protocol-type xx`. Also, DO NOT configure router id;") - prompt += " set `replace` argument to block. `replace` argument should always be child to `ios_config`;" - - # prompt += " Implement one task to save config when there is modification. Exclude 'commit' and - # 'confirm' arguments" - - redistribute_required = False - - for protocol_config in router_config: - if 'protocol' in protocol_config: - protocol = protocol_config['protocol'] - - prompt += f" Protocol: {protocol}" - - if protocol.lower() == 'ospf': - area = protocol_config['area'] - process_id = protocol_config['process_id'] - num_networks = len(protocol_config['networks']) - prompt += f" OSPF Area: {area}, Process ID: {process_id}, Number of networks to advertise: {num_networks}" - for j, network in enumerate(protocol_config['networks']): - pdf.set_font('Arial', '', 12) - pdf.cell(0, 10, f"network: {network}", 0, 1, 'L') - pdf.ln(5) - prompt += f" network{j + 1}: {network}" - elif protocol.lower() == 'eigrp': - as_number = protocol_config['as_number'] - num_networks = len(protocol_config['networks']) - prompt += f" EIGRP AS Number: {as_number}, Number of networks to advertise: {num_networks}" - for j, network in enumerate(protocol_config['networks']): - pdf.set_font('Arial', '', 12) - pdf.cell(0, 10, f"network: {network}", 0, 1, 'L') - pdf.ln(5) - prompt += f" network{j + 1}: {network}" - # Check if both EIGRP and OSPF are configured - if protocol.lower() == 'eigrp': - for other_protocol_config in router_config: - if 'protocol' in other_protocol_config and other_protocol_config[ - 'protocol'].lower() == 'ospf': - redistribute_required = True - - if redistribute_required: - prompt += (f"; Using dedicated tasks, Please redistribute the routing protocols using " - f"'redistribute ospf {process_id} metric 1000 33 255 1 1500' for redistributing OSPF " - f"into EIGRP and 'redistribute eigrp {as_number} subnets' for redistributing EIGRP " - f"into OSPF;") - prompt += ("The redistribution tasks, should be generated after the routing protocol configuration " - "tasks have been generated;") - - for interface_config in router_config: - if 'interface' in interface_config and 'ip' in interface_config: - interface_name = interface_config['interface'] - ip_address = interface_config['ip'] - pdf.set_font('Arial', '', 12) - pdf.cell(0, 10, f"Interface: {interface_name}, IP: {ip_address}", 0, 1, 'L') - pdf.ln(5) - prompt += f" Interface: {interface_name}, IP: {ip_address}" - - prompt = prompt.rstrip(";") # Remove the trailing semicolon - - # Add a chapter for the prompt used to generate each playbook - pdf.add_page() - pdf.chapter_title(f"Prompt for Router {i + 1}") - pdf.chapter_body(prompt) - - print(f"\n\nPrompt:\n\n") - print_blue(prompt) - text_before_code, playbook, text_after_code, conversation_history = generate_playbook(prompt, - model_name, - conversation_history) - generate_playbooks_duration = time.time() - generate_playbooks_start_time - # Add a chapter for the generated playbook - pdf.add_page() - pdf.chapter_title(f"Generated Playbook for Router {i + 1}") - pdf.chapter_body(playbook) - playbook_file = os.path.join(execution_folder, f"Router_{i + 1}_Playbook.yml") - with open(playbook_file, "w") as file: - file.write(playbook) - start_time = time.time() - execute_playbook(playbook_file, model_name, i + 1, text_before_code, text_after_code, - conversation_history) - elapsed_time = time.time() - start_time - # Save the PDF document - # pdf.output(os.path.join(current_path, "network_configuration_report.pdf"), "F") - pdf.output(os.path.join(execution_folder, "network_configuration_report.pdf"), "F") - - # Print summary - # execute_playbooks_duration = sum([elapsed_time for _ in range(len(interface_configurations))]) - execute_playbooks_duration = elapsed_time - summary_duration = time.time() - program_start_time - print_summary(input_param_duration, generate_playbooks_duration, execute_playbooks_duration, - summary_duration) - experiment_summary_file = f"experiment_summary_{time.strftime('%Y%m%d%H%M%S')}.txt" - with open(experiment_summary_file, "w") as file: - file.write(f"Input Router Duration: {input_param_duration}\n") - file.write(f"Generate Playbooks Duration: {generate_playbooks_duration}\n") - file.write(f"Execute Playbooks Duration: {execute_playbooks_duration}\n") - file.write(f"Total Summary Duration: {summary_duration}\n") - - print() - break - else: - print("\n\nPlease update your inputs:\n") - + #Call function to begin inputting parameters + parameter_input_function(execution_folder, model_name, elapsed_time, program_start_time) if __name__ == "__main__": + main() - # Execute Ansible playbook for pinging specified prefixes on R1 - print() - print("Waiting for the Network to Converge before Running a Ping Test...") - print() - time.sleep(40) - ping_playbook_r1_content = """ - - name: Ping prefixes on R1 - hosts: R1 - gather_facts: false - tasks: - - name: Ping prefixes on R1 - command: ping -c 4 "{{ item }}" - with_items: - - 192.168.10.1 - - 192.168.20.1 - - 192.168.30.1 - - 192.168.40.1 - - 192.168.50.1 - - 192.168.60.1 - - 192.168.70.1 - - 192.168.80.1 - - 172.168.1.17 - - 192.168.2.2 - - 192.168.4.2 - - 192.168.6.2 - """ - - ping_playbook_r1_file = os.path.join(execution_folder, "ping_playbook_R1.yml") - - with open(ping_playbook_r1_file, "w") as file: - file.write(ping_playbook_r1_content) - - # Execute Ansible playbook for pinging prefixes on R1 - ping_inventory_file_r1 = os.path.join(current_path, 'hosts.yml') - ping_result_r1 = execute_ping_playbook(ping_playbook_r1_file, ping_inventory_file_r1) - - # Save the outcome of the pings on R1 as a dictionary - ping_result_r1_file = os.path.join(execution_folder, "ping_result_R1.pkl") - - with open(ping_result_r1_file, "wb") as file: - pickle.dump(ping_result_r1, file) - - # Similarly, repeat the process for R4 - ping_playbook_r4_content = """ - - name: Ping prefixes on R4 - hosts: R4 - gather_facts: false - tasks: - - name: Ping prefixes on R4 - command: ping -c 4 "{{ item }}" - with_items: - - 192.168.10.1 - - 192.168.20.1 - - 192.168.30.1 - - 192.168.40.1 - - 192.168.50.1 - - 192.168.60.1 - - 192.168.70.1 - - 192.168.80.1 - - 172.168.1.17 - - 192.168.2.1 - - 192.168.4.1 - - 192.168.6.1 - """ - - ping_playbook_r4_file = os.path.join(execution_folder, "ping_playbook_R4.yml") - - with open(ping_playbook_r4_file, "w") as file: - file.write(ping_playbook_r4_content) - - # Execute Ansible playbook for pinging prefixes on R4 - ping_inventory_file_r4 = os.path.join(current_path, 'hosts.yml') - ping_result_r4 = execute_ping_playbook(ping_playbook_r4_file, ping_inventory_file_r4) - - # Save the outcome of the pings on R4 as a dictionary - ping_result_r4_file = os.path.join(execution_folder, "ping_result_R4.pkl") - - with open(ping_result_r4_file, "wb") as file: - pickle.dump(ping_result_r4, file) + # Delete the hidden hosts.yml file after the program ends + os.remove(os.path.join(current_path, '.hosts.yml')) \ No newline at end of file diff --git a/chatGPT_prompting.py b/chatGPT_prompting.py new file mode 100644 index 0000000..131c62a --- /dev/null +++ b/chatGPT_prompting.py @@ -0,0 +1,79 @@ +import openai +import time +import threading +import traceback +import openai +from openai.error import AuthenticationError +from playbook_extractor import playbook_extractor_function + + +# Function to validate the provided OpenAI API key +def validate_openai_key(api_key): + # Set the OpenAI API key + openai.api_key = api_key + # Define a test message to check if the API key is correct + messages = [ + {"role": "system", "content": "Test if the API key is correct."}, + ] + success = False + try: + # Test the API key by sending a chat completion request + openai.ChatCompletion.create(model="gpt-4", messages=messages, max_tokens=5) + success = True + except AuthenticationError as e: + # Handle authentication error if the API key is invalid + print("Invalid API key:", e) + except Exception as e: + # Handle other unexpected errors + print("An unexpected error occurred:", e) + return success + + +# Function responsible for prompting ChatGPT to generate playbooks +def chatGPT_prompter(prompt, model_name, conversation_history): + # Define an inner function to make the API call asynchronously + def api_call(): + nonlocal response + try: + # Create messages including the conversation history and user prompt + messages = [ + {"role": "system", "content": "You are a network engineer trying to automate a network."}, + ] + messages.extend(conversation_history) + messages.append({"role": "user", "content": prompt}) + + # Call ChatGPT to generate a response + response = openai.ChatCompletion.create( + model=model_name, + messages=messages, + max_tokens=1500, + ) + except Exception as e: + # Handle errors during playbook generation + print("An error occurred during playbook generation:", e) + traceback.print_exc() + response = None + + response = None + + # Display a waiting message while generating the playbook + print("\n\nPlease wait while a playbook is being generated", end='', flush=True) + + # Start a thread to make the API call asynchronously + animation_thread = threading.Thread(target=api_call) + animation_thread.start() + + # Display an animation while waiting for the response + while animation_thread.is_alive(): + for _ in range(5): + time.sleep(1.0) + print('.', end='', flush=True) + print('\b\b\b \b\b\b', end='', flush=True) # Erase the dots + + # Wait for the animation thread to finish + animation_thread.join() + + # Extract relevant information from the response + text_before_code, code, text_after_code, conversation_history = playbook_extractor_function(response, + conversation_history) + return text_before_code, code, text_after_code, conversation_history diff --git a/hosts.yml b/hosts.yml deleted file mode 100644 index a959f80..0000000 --- a/hosts.yml +++ /dev/null @@ -1,22 +0,0 @@ ---- - -# This is the hosts file. It is in YAML format and contains -# the IP address and credentials of each router on the network -routers: - hosts: - R1: - ansible_host: 172.168.1.17 - R2: - ansible_host: 192.168.2.2 - R3: - ansible_host: 192.168.4.2 - R4: - ansible_host: 192.168.6.2 - - vars: - ansible_connection: network_cli - ansible_network_os: ios - ansible_user: cisco - ansible_ssh_pass: cisco - ansible_action_warnings: False - diff --git a/interface_configuration.py b/interface_configuration.py deleted file mode 100644 index 21d41e1..0000000 --- a/interface_configuration.py +++ /dev/null @@ -1,158 +0,0 @@ -import re -import ipaddress -import readline -import time - - -# This function animates outputs information -def animate_message(message): - for char in message: - print(char, end='', flush=True) - time.sleep(0.05) - # print() - - -def get_router_interface_configuration(protocol_count, interface_count): - router_interface_config = [] - history = [] - - def input_with_history(prompt): - line = input(prompt) - if line: - history.append(line) - return line - - def handle_navigation(current_index): - if current_index < 0: - current_index = 0 - elif current_index >= len(history): - current_index = len(history) - 1 - readline.set_history_length(len(history)) - readline.set_history_item(current_index, history[current_index]) - return history[current_index], current_index - - def navigate_inputs(): - current_index = len(history) - while True: - key = input_with_history("") - if key == "\x1b[A": # Up arrow key - current_index -= 1 - line, current_index = handle_navigation(current_index) - print("\r" + " " * len(line) + "\r" + line) - elif key == "\x1b[B": # Down arrow key - current_index += 1 - line, current_index = handle_navigation(current_index) - print("\r" + " " * len(line) + "\r" + line) - else: - break - - def get_positive_integer_input(prompt): - while True: - try: - value = int(input_with_history(prompt)) - if value > 0: - return value - else: - animate_message("Sorry, that's an invalid input! Please enter a positive integer.\n") - except ValueError: - animate_message("Sorry, that's an invalid input! Please enter a positive integer.\n") - - def validate_cidr_network(cidr_network): - # Validate if the CIDR network format is correct - network_parts = cidr_network.split("/") - if len(network_parts) != 2: - return False - - ip_address = network_parts[0] - subnet_mask = network_parts[1] - if not re.match(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", ip_address): - return False - - try: - ipaddress.IPv4Network(cidr_network, strict=False) - return True - except (ipaddress.AddressValueError, ipaddress.NetmaskValueError, ValueError): - return False - - def convert_cidr_network(cidr_network): - # Convert CIDR network to network address - ip, subnet = cidr_network.split("/") - subnet_mask = (0xFFFFFFFF << (32 - int(subnet))) & 0xFFFFFFFF - subnet_mask = [(subnet_mask >> i) & 0xFF for i in [24, 16, 8, 0]] - ip_parts = ip.split(".") - network_address = ".".join(ip_parts[:4]) + " " + ".".join(str(part) for part in subnet_mask) - return network_address - - def convert_cidr_network_wildcard(cidr_network): - # Convert CIDR network to network address with wildcard mask - ip, subnet = cidr_network.split("/") - subnet_mask = (0xFFFFFFFF << (32 - int(subnet))) & 0xFFFFFFFF - subnet_mask = [(subnet_mask >> i) & 0xFF for i in [24, 16, 8, 0]] - wildcard_mask = [(255 - subnet_mask[i]) for i in range(4)] - ip_parts = ip.split(".") - network_address = ".".join(ip_parts[:4]) + " " + ".".join(str(part) for part in wildcard_mask) - return network_address - - # Input router configurations based on protocol and interface count - for _ in range(protocol_count): - while True: - protocol = input_with_history( - "Please input the network protocol to configure (Choice: OSPF, EIGRP): ").lower() - if protocol.lower() in ['ospf', 'eigrp']: - break - else: - animate_message("Sorry, that's an invalid input! Please enter either 'OSPF' or 'EIGRP'\n") - - if protocol.lower() == 'ospf': - area = get_positive_integer_input("Enter the OSPF Area: ") - process_id = get_positive_integer_input("Enter the OSPF Process ID: ") - num_networks = get_positive_integer_input("Enter the number of networks to advertise: ") - networks = [] - print("Please input the networks to advertise using the Format: x.x.x.x/y ") - for k in range(num_networks): - while True: - network = input_with_history(f"Network {k + 1}: ") - if validate_cidr_network(network): - break - else: - animate_message( - "That's an invalid input! Please enter a valid CIDR network in the format x.x.x.x/y.") - network_address = convert_cidr_network_wildcard(network) - networks.append(network_address) - router_interface_config.append( - {'protocol': protocol, 'area': area, 'process_id': process_id, 'networks': networks}) - elif protocol.lower() == 'eigrp': - as_number = get_positive_integer_input("Enter the AS Number: ") - num_networks = get_positive_integer_input("Enter the number of networks to advertise: ") - networks = [] - print("Please input the networks to advertise using the Format: x.x.x.x/y ") - for k in range(num_networks): - while True: - network = input_with_history(f"Network {k + 1}: ") - if validate_cidr_network(network): - break - else: - animate_message( - "Sorry, that's an invalid input! Please enter a valid CIDR network in the format " - "x.x.x.x/y.\n") - network_address = convert_cidr_network_wildcard(network) - networks.append(network_address) - - router_interface_config.append({'protocol': protocol, 'as_number': as_number, 'networks': networks}) - - for _ in range(interface_count): - # Input interface configurations with IP addresses - interface = input_with_history("Enter the interface name: ").lower() - while True: - ip_network = input_with_history("Enter the IP address and network (Format: x.x.x.x/y): ") - if validate_cidr_network(ip_network): - break - else: - animate_message( - "Sorry, that's an invalid input! Please enter a valid CIDR network in the format x.x.x.x/y.\n") - ip_address = convert_cidr_network(ip_network) - router_interface_config.append({'interface': interface, 'ip': ip_address}) - - navigate_inputs() - - return router_interface_config diff --git a/parameter_input.py b/parameter_input.py new file mode 100644 index 0000000..a7ecff3 --- /dev/null +++ b/parameter_input.py @@ -0,0 +1,244 @@ +import readline +import os +import time +from utility import PDF, validate_cidr_network, convert_cidr_network, convert_cidr_network_wildcard, animate_message, \ + get_positive_integer_input, save_configuration, load_configuration, print_summary +from prompt_generator import prompt_generator_function + +# Function to get router parameters from the user +def parameter_input_function(execution_folder, model_name, elapsed_time, program_start_time): + while True: + animate_message("\n\n\t\t\tLet's Begin Setup of Your Network") # Display welcome message + print("\n\t\t\t===================================\n\n") + input_param_start_time = time.time() # Record start time for input + interface_configurations = load_configuration() # Load existing configurations if available + if interface_configurations is None: # If no configurations found + interface_configurations = [] + router_count = get_positive_integer_input("How many routers do you want to configure: ") # Get number of routers + for i in range(router_count): + print(f"\n\nWe will now take the configuration parameters for Router {i + 1} : \n\n") + router_interface_count = get_positive_integer_input(f"How many interfaces are to be configured?: ") # Get number of interfaces + protocol_count = get_positive_integer_input(f"And how many protocols: ") # Get number of protocols + interface_configurations.append( + get_router_interface_configuration(protocol_count, router_interface_count)) # Get router interface configurations + save_configuration(interface_configurations) # Save configurations + else: # If existing configurations found + response = input("\nWe found a previous configuration. Would you like to load it? (y/n): ") + if response.lower() == "n": + interface_configurations = [] + router_count = get_positive_integer_input("\nHow many routers do you want to configure: ") + for i in range(router_count): + print(f"\n\nWe will now take the configuration parameters for Router {i + 1} : \n\n") + router_interface_count = get_positive_integer_input(f"How many interfaces are to be configured?: ") + protocol_count = get_positive_integer_input(f"...And how many protocols?: ") + interface_configurations.append( + get_router_interface_configuration(protocol_count, router_interface_count)) + save_configuration(interface_configurations) + os.system('clear') # Clear screen + print("\n\n**********Please review and validate your inputs**********") + if validate_inputs(interface_configurations): # Validate user inputs + input_param_end_time = time.time() + input_param_duration = input_param_end_time - input_param_start_time + # Create a PDF object + pdf = PDF() + pdf.set_auto_page_break(auto=True, margin=15) + # Add a chapter for router configurations + pdf.add_page() + pdf.chapter_title("Router Configurations") + pdf.chapter_body("Explanation and details about router configurations.") + generate_playbooks_start_time = time.time() # Start time for generating playbooks + # Call function to generate prompts and save them + generate_playbooks_duration, elapsed_time = prompt_generator_function(pdf, interface_configurations, + execution_folder, model_name, + generate_playbooks_start_time) + # Save the PDF document + pdf.output(os.path.join(execution_folder, "network_configuration_report.pdf"), "F") + + # Print summary + execute_playbooks_duration = elapsed_time + summary_duration = time.time() - program_start_time + print_summary(input_param_duration, generate_playbooks_duration, execute_playbooks_duration, + summary_duration) + experiment_summary_file = f"experiment_summary_{time.strftime('%Y%m%d%H%M%S')}.txt" + with open(experiment_summary_file, "w") as file: + file.write(f"Input Router Duration: {input_param_duration}\n") + file.write(f"Generate Playbooks Duration: {generate_playbooks_duration}\n") + file.write(f"Execute Playbooks Duration: {execute_playbooks_duration}\n") + file.write(f"Total Summary Duration: {summary_duration}\n") + + print() + break + else: + print("\n\nPlease update your inputs:\n") + +# Function to validate user inputs +def validate_inputs(interface_configurations): + for i, router_config in enumerate(interface_configurations): + print(f"Router {i + 1}:") + for protocol_config in router_config: + if 'protocol' in protocol_config: + protocol = protocol_config['protocol'] + print(f"\tProtocol: {protocol}") + if protocol.lower() == 'ospf': + area = protocol_config['area'] + process_id = protocol_config['process_id'] + num_networks = len(protocol_config['networks']) + print(f"\t\tOSPF Area: {area}") + print(f"\t\tOSPF Process ID: {process_id}") + print(f"\t\tNumber of networks to advertise: {num_networks}") + for j, network in enumerate(protocol_config['networks']): + print(f"\t\tNetwork {j + 1}: {network}") + elif protocol.lower() == 'eigrp': + as_number = protocol_config['as_number'] + num_networks = len(protocol_config['networks']) + print(f"\t\tEIGRP AS Number: {as_number}") + print(f"\t\tNumber of networks to advertise: {num_networks}") + for j, network in enumerate(protocol_config['networks']): + print(f"\t\tNetwork {j + 1}: {network}") + for interface_config in router_config: + if 'interface' in interface_config and 'ip' in interface_config: + interface_name = interface_config['interface'] + ip_address = interface_config['ip'] + print(f"\tInterface: {interface_name}") + print(f"\tIP Address: {ip_address}") + print() + + while True: + confirm = input("Do you want to proceed with the above inputs? (y/n): ") + if confirm.lower() == 'y': + return True + elif confirm.lower() == 'n': + while True: + index_to_correct = input( + "\n\nOn which router do you want to make corrections - Enter '1' for Router1 etc: ") + try: + index = int(index_to_correct) - 1 + if 0 <= index < len(interface_configurations): + router_config = interface_configurations[index] + protocol_count = len([config for config in router_config if 'protocol' in config]) + interface_count = len( + [config for config in router_config if 'interface' in config and 'ip' in config]) + print(f"\nCorrecting inputs for Router {index + 1}...\n") + interface_configurations[index] = get_router_interface_configuration(protocol_count, + interface_count) + os.system('clear') + break + else: + print( + f"Invalid input! Please enter a valid router index between 1 and {len(interface_configurations)}.") + except ValueError: + print("Invalid input! Please enter a valid router index number.") + print("\n\nPlease review and validate your corrected inputs:") + return validate_inputs(interface_configurations) + else: + print("Invalid input! Please enter 'y' or 'n'.") + +# Function to get router interface configuration from user +def get_router_interface_configuration(protocol_count, interface_count): + router_interface_config = [] + history = [] + + def input_with_history(prompt): + line = input(prompt) + if line: + history.append(line) + return line + + def handle_navigation(current_index): + if current_index < 0: + current_index = 0 + elif current_index >= len(history): + current_index = len(history) - 1 + readline.set_history_length(len(history)) + readline.set_history_item(current_index, history[current_index]) + return history[current_index], current_index + + def navigate_inputs(): + current_index = len(history) + while True: + key = input_with_history("") + if key == "\x1b[A": # Up arrow key + current_index -= 1 + line, current_index = handle_navigation(current_index) + print("\r" + " " * len(line) + "\r" + line) + elif key == "\x1b[B": # Down arrow key + current_index += 1 + line, current_index = handle_navigation(current_index) + print("\r" + " " * len(line) + "\r" + line) + else: + break + + def get_positive_integer_input(prompt): + while True: + try: + value = int(input_with_history(prompt)) + if value > 0: + return value + else: + animate_message("Sorry, that's an invalid input! Please enter a positive integer.\n") + except ValueError: + animate_message("Sorry, that's an invalid input! Please enter a positive integer.\n") + + # Input router configurations based on protocol and interface count + for _ in range(protocol_count): + while True: + protocol = input_with_history( + "Please input the network protocol to configure (Choice: OSPF, EIGRP): ").lower() + if protocol.lower() in ['ospf', 'eigrp']: + break + else: + animate_message("Sorry, that's an invalid input! Please enter either 'OSPF' or 'EIGRP'\n") + + if protocol.lower() == 'ospf': + area = get_positive_integer_input("Enter the OSPF Area: ") + process_id = get_positive_integer_input("Enter the OSPF Process ID: ") + num_networks = get_positive_integer_input("Enter the number of networks to advertise: ") + networks = [] + print("Please input the networks to advertise using the Format: x.x.x.x/y ") + for k in range(num_networks): + while True: + network = input_with_history(f"Network {k + 1}: ") + if validate_cidr_network(network): + break + else: + animate_message( + "That's an invalid input! Please enter a valid CIDR network in the format x.x.x.x/y.") + network_address = convert_cidr_network_wildcard(network) + networks.append(network_address) + router_interface_config.append( + {'protocol': protocol, 'area': area, 'process_id': process_id, 'networks': networks}) + elif protocol.lower() == 'eigrp': + as_number = get_positive_integer_input("Enter the AS Number: ") + num_networks = get_positive_integer_input("Enter the number of networks to advertise: ") + networks = [] + print("Please input the networks to advertise using the Format: x.x.x.x/y ") + for k in range(num_networks): + while True: + network = input_with_history(f"Network {k + 1}: ") + if validate_cidr_network(network): + break + else: + animate_message( + "Sorry, that's an invalid input! Please enter a valid CIDR network in the format " + "x.x.x.x/y.\n") + network_address = convert_cidr_network_wildcard(network) + networks.append(network_address) + + router_interface_config.append({'protocol': protocol, 'as_number': as_number, 'networks': networks}) + + for _ in range(interface_count): + # Input interface configurations with IP addresses + interface = input_with_history("Enter the interface name: ").lower() + while True: + ip_network = input_with_history("Enter the IP address and network (Format: x.x.x.x/y): ") + if validate_cidr_network(ip_network): + break + else: + animate_message( + "Sorry, that's an invalid input! Please enter a valid CIDR network in the format x.x.x.x/y.\n") + ip_address = convert_cidr_network(ip_network) + router_interface_config.append({'interface': interface, 'ip': ip_address}) + + navigate_inputs() # Enable input navigation + + return router_interface_config diff --git a/pingtest.yml b/pingtest.yml deleted file mode 100644 index 544df5b..0000000 --- a/pingtest.yml +++ /dev/null @@ -1,19 +0,0 @@ -- name: Ping prefixes on R1 - hosts: R1 - gather_facts: false - tasks: - - name: Ping prefixes on R1 - command: ping -c 4 "{{ item }}" - with_items: - - 192.168.10.1 - - 192.168.20.1 - - 192.168.30.1 - - 192.168.40.1 - - 192.168.50.1 - - 192.168.60.1 - - 192.168.70.1 - - 192.168.80.1 - - 172.168.1.17 - - 192.168.2.1 - - 192.168.4.2 - - 192.168.6.2 diff --git a/playbook_executor.py b/playbook_executor.py index e69de29..8c5a2f7 100644 --- a/playbook_executor.py +++ b/playbook_executor.py @@ -0,0 +1,79 @@ +import os +import pickle +from ansible_runner import run +from tenacity import retry, stop_after_attempt, wait_random_exponential +from cryptography.fernet import Fernet +from utility import print_blue, print_colored_code + +# Get the path of the current script or module +current_path = os.path.dirname(os.path.abspath(__file__)) + + +# This is the function to execute the generated playbooks which were dynamically stored. +@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(10)) +def execute_playbook(playbook_file, model_name, router_number, text_before_code, text_after_code, conversation_history): + try: + while True: + with open(playbook_file, "r") as file: + playbook_content = file.read() + + print(f"\n\nGenerated Playbook for Router {router_number} has been Saved To: {playbook_file}\n") + print_blue(text_before_code) + print() + print_colored_code(playbook_content) + print() + print_blue(text_after_code) + print() + + # Dynamically decrypt ansible_user and ansible_ssh_pass before running the playbook + decrypt_ansible_credentials() + + r = run( + playbook=playbook_file, + inventory=os.path.join(current_path, '.hosts.yml'), + private_data_dir=current_path, + quiet=False + ) + print() + return playbook_content + + except Exception as e: + print("Error executing Ansible playbook:", e) + + +# Function to decrypt ansible credentials +def decrypt_ansible_credentials(): + try: + # Read the encrypted credentials from the hosts.yml file + with open(os.path.join(current_path, '.hosts.yml'), 'r') as hosts_file: + encrypted_hosts_content = hosts_file.read() + + # Load user_config from pickle file + with open(os.path.join(current_path, "user_config.pkl"), "rb") as decrypt_source: + user_config = pickle.load(decrypt_source) + + # Load fernet key from user_config + fernet_key = user_config.get("fernet_key") + + if fernet_key: + cipher_suite = Fernet(fernet_key) + + # Decode and decrypt the credentials + decrypted_username = cipher_suite.decrypt(user_config["ansible_user"]).decode('utf-8') + decrypted_password = cipher_suite.decrypt(user_config["ansible_ssh_pass"]).decode('utf-8') + else: + print("Fernet key not found in user_config!") + return + + decrypted_hosts_content = encrypted_hosts_content.replace(f"b'{user_config['ansible_user'].decode()}'", + decrypted_username + ).replace( + f"b'{user_config['ansible_ssh_pass'].decode()}'", decrypted_password) + + # Write the decrypted content back to the hosts.yml file + with open(os.path.join(current_path, '.hosts.yml'), 'w') as hosts_file: + hosts_file.write(decrypted_hosts_content) + + except Exception as e: + print("Error decrypting ansible credentials:", e) + diff --git a/playbook_extractor.py b/playbook_extractor.py index e69de29..f31e3f3 100644 --- a/playbook_extractor.py +++ b/playbook_extractor.py @@ -0,0 +1,34 @@ +import traceback + +# Function to extract playbook from raw response +def playbook_extractor_function(response, conversation_history): + if response is None: # If response is None, playbook generation failed + print("Playbook generation failed.") + return "", "", "", conversation_history + + try: + full_response = response['choices'][0]['message']['content'] # Get full response from API + + # Append assistant's response to conversation history + conversation_history.append({'role': 'assistant', 'content': full_response}) + + # Find start and end index of code block + code_start_index = full_response.find('---') + code_end_index = full_response.find('...', code_start_index) + + if code_start_index != -1 and code_end_index != -1: # If code block found + # Extract text before code block, code block, and text after code block + text_before_code = full_response[:code_start_index] + code = full_response[code_start_index:code_end_index + 3] # Include '---' and '...' in the code + text_after_code = full_response[code_end_index + 3:] + else: # If code block not found + text_before_code = full_response + code = "" + text_after_code = "" + + return text_before_code.strip(), code.strip(), text_after_code.strip(), conversation_history + + except Exception as e: # Handle exceptions + print("An error occurred while processing the generated playbook:", e) + traceback.print_exc() # Print traceback for debugging + return "", "", "", conversation_history diff --git a/playbook_generation.py b/playbook_generation.py deleted file mode 100644 index 9144529..0000000 --- a/playbook_generation.py +++ /dev/null @@ -1,54 +0,0 @@ - -import openai -import time -import threading - -# This function is responsible for prompting chatGPT to generate playbooks -def generate_playbook(prompt, model_name, conversation_history): - def api_call(): - nonlocal response - messages = [ - {"role": "system", "content": "You are a network engineer trying to automate a network."}, - ] - messages.extend(conversation_history) - messages.append({"role": "user", "content": prompt}) - - response = openai.ChatCompletion.create( - model=model_name, - messages=messages, - max_tokens=1500, - ) - - response = None - - print("\n\nPlease wait while a playbook is being generated", end='', flush=True) - animation_thread = threading.Thread(target=api_call) - animation_thread.start() - - while animation_thread.is_alive(): - for _ in range(5): - time.sleep(1.0) - print('.', end='', flush=True) - print('\b\b\b \b\b\b', end='', flush=True) # Erase the dots - - animation_thread.join() - - # Lines 38 to 54 are responsible for extracting the generated playbooks from the raw response from ChatGPT. - - full_response = response['choices'][0]['message']['content'] - - conversation_history.append({'role': 'assistant', 'content': full_response}) - - code_start_index = full_response.find('---') - code_end_index = full_response.find('...', code_start_index) - - if code_start_index != -1 and code_end_index != -1: - text_before_code = full_response[:code_start_index] - code = full_response[code_start_index:code_end_index + 3] # Include '---' and '...' in the code - text_after_code = full_response[code_end_index + 3:] - else: - text_before_code = full_response - code = "" - text_after_code = "" - - return text_before_code.strip(), code.strip(), text_after_code.strip(), conversation_history diff --git a/prompt_composer.py b/prompt_composer.py deleted file mode 100644 index e69de29..0000000 diff --git a/prompt_generator.py b/prompt_generator.py new file mode 100644 index 0000000..e0f3785 --- /dev/null +++ b/prompt_generator.py @@ -0,0 +1,128 @@ +from playbook_executor import execute_playbook +import os +from chatGPT_prompting import chatGPT_prompter +import time + + +# Function to generate prompts and execute playbooks for each router configuration +def prompt_generator_function(pdf, interface_configurations, execution_folder, model_name, + generate_playbooks_start_time): + # Iterate over each router configuration + for i, router_config in enumerate(interface_configurations): + conversation_history = [] # Initialize conversation history for the current router + + # Add a subheading for the current router in the PDF report + pdf.set_font('Arial', 'B', 12) + pdf.cell(0, 10, f"Router {i + 1} Configurations", 0, 1, 'L') + pdf.ln(5) + + # Define the initial prompt for generating the playbook + prompt = ("Requirements: Strictly adhere to the following explicitly stated requirements; Write a " + "simple Ansible playbook with separate tasks for each protocol and interface configurations " + "with the following details;") + + prompt += f" hosts: R{i + 1}" # Add host information to the prompt + + prompt += " Do not worry about the inventory file;" # Additional requirements + # Add more requirements to the prompt + prompt += " Ensure each Task is named;" + prompt += " Never provide explanations for the generated playbook;" + prompt += " Do not use variables and templates to generate the playbooks;" + prompt += ("Ensure all generated playbooks adhere to yaml's rule of always starting a playbook with " + "`---` and ending the playbook with a new line containing `...`;") + prompt += " Always use ios_config module and ensure unsupported parameters are not generated;;" + prompt += " Use `parents` argument to implement stanzas;" + prompt += ("when configuring interfaces, ensure you generate codes for only provided interfaces and " + "always implement 'No Shutdown' for each interface;") + prompt += ("when configuring routing protocols, ensure you generate codes for only provided protocols " + "and that the protocol is initialized only under the parents argument using the format " + "`router protocol-type xx`. Also, DO NOT configure router id;") + prompt += " set `replace` argument to block. `replace` argument should always be child to `ios_config`;" + + redistribute_required = False # Flag to track if redistribution is required + + # Iterate over protocol configurations for the current router + for protocol_config in router_config: + if 'protocol' in protocol_config: + protocol = protocol_config['protocol'] + prompt += f" Protocol: {protocol}" # Add protocol information to the prompt + if protocol.lower() == 'ospf': + # Extract OSPF configuration details + area = protocol_config['area'] + process_id = protocol_config['process_id'] + num_networks = len(protocol_config['networks']) + # Add OSPF configuration details to the prompt + prompt += f" OSPF Area: {area}, Process ID: {process_id}, Number of networks to advertise: {num_networks}" + # Add network information to the PDF report + for j, network in enumerate(protocol_config['networks']): + pdf.set_font('Arial', '', 12) + pdf.cell(0, 10, f"network: {network}", 0, 1, 'L') + pdf.ln(5) + prompt += f" network{j + 1}: {network}" + elif protocol.lower() == 'eigrp': + # Extract EIGRP configuration details + as_number = protocol_config['as_number'] + num_networks = len(protocol_config['networks']) + # Add EIGRP configuration details to the prompt + prompt += f" EIGRP AS Number: {as_number}, Number of networks to advertise: {num_networks}" + # Add network information to the PDF report + for j, network in enumerate(protocol_config['networks']): + pdf.set_font('Arial', '', 12) + pdf.cell(0, 10, f"network: {network}", 0, 1, 'L') + pdf.ln(5) + prompt += f" network{j + 1}: {network}" + + # Check if both EIGRP and OSPF are configured + if protocol.lower() == 'eigrp': + for other_protocol_config in router_config: + if 'protocol' in other_protocol_config and other_protocol_config['protocol'].lower() == 'ospf': + redistribute_required = True + + # Add redistribution tasks to the prompt if required + if redistribute_required: + prompt += (f"; Using dedicated tasks, Please redistribute the routing protocols using " + f"'redistribute ospf {process_id} metric 1000 33 255 1 1500' for redistributing OSPF " + f"into EIGRP and 'redistribute eigrp {as_number} subnets' for redistributing EIGRP " + f"into OSPF;") + prompt += ("The redistribution tasks, should be generated after the routing protocol configuration " + "tasks have been generated;") + + # Add interface configurations to the prompt and PDF report + for interface_config in router_config: + if 'interface' in interface_config and 'ip' in interface_config: + interface_name = interface_config['interface'] + ip_address = interface_config['ip'] + pdf.set_font('Arial', '', 12) + pdf.cell(0, 10, f"Interface: {interface_name}, IP: {ip_address}", 0, 1, 'L') + pdf.ln(5) + prompt += f" Interface: {interface_name}, IP: {ip_address}" + + prompt = prompt.rstrip(";") # Remove the trailing semicolon + + # Add a chapter for the prompt used to generate each playbook in the PDF report + pdf.add_page() + pdf.chapter_title(f"Prompt for Router {i + 1}") + pdf.chapter_body(prompt) + + # Generate playbook using ChatGPT based on the prompt + text_before_code, playbook, text_after_code, conversation_history = chatGPT_prompter(prompt, model_name, + conversation_history) + generate_playbooks_duration = time.time() - generate_playbooks_start_time + + # Add a chapter for the generated playbook in the PDF report + pdf.add_page() + pdf.chapter_title(f"Generated Playbook for Router {i + 1}") + pdf.chapter_body(playbook) + + # Save generated playbook to a file + playbook_file = os.path.join(execution_folder, f"Router_{i + 1}_Playbook.yml") + with open(playbook_file, "w") as file: + file.write(playbook) + + # Execute the generated playbook + start_time = time.time() + execute_playbook(playbook_file, model_name, i + 1, text_before_code, text_after_code, conversation_history) + + elapsed_time = time.time() - start_time + + return generate_playbooks_duration, elapsed_time diff --git a/requirements.txt b/requirements.txt index b893942..d3ed86b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ backcall==0.2.0 certifi==2023.5.7 cffi==1.15.1 charset-normalizer==3.1.0 +colorama==0.4.6 contourpy==1.1.0 cryptography==40.0.2 cycler==0.11.0 diff --git a/ssh_ip_addresses.txt b/ssh_ip_addresses.txt new file mode 100644 index 0000000..cd85785 --- /dev/null +++ b/ssh_ip_addresses.txt @@ -0,0 +1,5 @@ +172.168.1.17 +192.168.2.2 +192.168.4.2 +192.168.6.2 + diff --git a/utility.py b/utility.py new file mode 100644 index 0000000..53a64d0 --- /dev/null +++ b/utility.py @@ -0,0 +1,150 @@ +import shutil +import pickle +from pygments import highlight +from pygments.lexers import get_lexer_by_name +from pygments.formatters import TerminalFormatter +import re +import ipaddress +import os +from fpdf import FPDF + +# Get the current directory path +current_path = os.path.dirname(os.path.abspath(__file__)) + +# Load or create user configuration +def load_user_config(): + existing_config_file = os.path.join(current_path, "user_config.pkl") + if os.path.exists(existing_config_file): + with open(existing_config_file, "rb") as file: + return pickle.load(file) + else: + return {} + +def save_user_config(config): + existing_config_file = os.path.join(current_path, "user_config.pkl") + with open(existing_config_file, "wb") as file: + pickle.dump(config, file) + +# Encrypt or load OpenAI key +def encrypt_key(key, cipher_suite): + return cipher_suite.encrypt(key.encode()) + +def decrypt_key(encrypted_key, cipher_suite): + return cipher_suite.decrypt(encrypted_key).decode() + +def animate_message(message): + # Animate the message to be printed one character at a time + for char in message: + print(char, end='', flush=True) + +# Function to get the terminal width +def get_terminal_width(): + try: + _, columns = shutil.get_terminal_size() + return columns + except: + return 80 + +# Function to print a line with "=" based on the terminal width +def print_line(): + width = get_terminal_width() + print("=" * width) + +# Function to get a positive integer input from the user +def get_positive_integer_input(prompte): + while True: + try: + value = int(input(prompte)) + if value > 0: + return value + else: + animate_message("Sorry, that is an invalid input! Please enter a positive integer\n") + except ValueError: + animate_message("Sorry, that is an invalid input! Please enter a valid integer\n") + +# Function to save router configurations +def save_configuration(configurations): + config_file = os.path.join(current_path, "router_configurations.pkl") + with open(config_file, "wb") as file: + pickle.dump(configurations, file) + +# Function to load router configurations +def load_configuration(): + config_file = os.path.join(current_path, "router_configurations.pkl") + if os.path.exists(config_file): + with open(config_file, "rb") as file: + return pickle.load(file) + else: + return None + +# Function to print text in blue color +def print_blue(text): + animate_message("\033[94m{}\033[0m".format(text)) + +# Function to format and print colored code +def print_colored_code(code): + lexer = get_lexer_by_name("yaml") # Set the appropriate lexer for the code language + formatter = TerminalFormatter() + colored_code = highlight(code, lexer, formatter) + print(colored_code) + +# Function to print a summary of execution times +def print_summary(input_router_duration, generate_playbooks_duration, execute_playbooks_duration, summary_duration): + print_blue("\n\n********** Below is a Summary of How Long It Took Us To Setup Your Network **********") + print_blue("\n1. We spent {:.2f} seconds to input the various router parameters.".format(input_router_duration)) + print_blue("\n2. The process for generating all the playbooks took {:.2f} seconds.".format(generate_playbooks_duration)) + print_blue("\n3. We spent {:.2f} seconds to execute all the generated playbooks".format(execute_playbooks_duration)) + print_blue("\n4. In Total, it has taken {:.2f} seconds to automate your network from start to finish".format(summary_duration)) + +# Function to validate CIDR network format +def validate_cidr_network(cidr_network): + # Validate if the CIDR network format is correct + network_parts = cidr_network.split("/") + if len(network_parts) != 2: + return False + + ip_address = network_parts[0] + subnet_mask = network_parts[1] + if not re.match(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", ip_address): + return False + + try: + ipaddress.IPv4Network(cidr_network, strict=False) + return True + except (ipaddress.AddressValueError, ipaddress.NetmaskValueError, ValueError): + return False + +# Function to convert CIDR network to network address +def convert_cidr_network(cidr_network): + ip, subnet = cidr_network.split("/") + subnet_mask = (0xFFFFFFFF << (32 - int(subnet))) & 0xFFFFFFFF + subnet_mask = [(subnet_mask >> i) & 0xFF for i in [24, 16, 8, 0]] + ip_parts = ip.split(".") + network_address = ".".join(ip_parts[:4]) + " " + ".".join(str(part) for part in subnet_mask) + return network_address + +# Function to convert CIDR network to network address with wildcard mask +def convert_cidr_network_wildcard(cidr_network): + ip, subnet = cidr_network.split("/") + subnet_mask = (0xFFFFFFFF << (32 - int(subnet))) & 0xFFFFFFFF + subnet_mask = [(subnet_mask >> i) & 0xFF for i in [24, 16, 8, 0]] + wildcard_mask = [(255 - subnet_mask[i]) for i in range(4)] + ip_parts = ip.split(".") + network_address = ".".join(ip_parts[:4]) + " " + ".".join(str(part) for part in wildcard_mask) + return network_address + +# Class to format the PDF report +class PDF(FPDF): + def header(self): + self.set_font('Arial', 'B', 12) + self.cell(0, 10, 'Network Configuration Report', 0, 1, 'C') + + def chapter_title(self, title): + self.set_font('Arial', 'B', 14) + self.cell(0, 10, title, 0, 1, 'L') + self.ln(5) + + def chapter_body(self, content): + self.set_font('Arial', '', 12) + self.multi_cell(0, 10, content) + self.ln(10)