Source code for pyedgeconnect

# MIT License
# (C) Copyright 2021 Hewlett Packard Enterprise Development LP.

import logging
import os
import sys
import traceback
import warnings

import requests
from urllib3.exceptions import InsecureRequestWarning


class HttpCommon:
    """Class to leverage common HTTP functions and handling responses"""

    # BASE HTTP REQUESTS

    def _req_post(
        self,
        url: str,
        data,
        files,
    ) -> requests.Response:
        """Assemble and send Requests request for HTTP POST method

        :param url: Full URL to use in HTTP request
        :type url: str
        :param data: Data to pass in request body
        :type data: str, list, dict
        :return: Requests Response object
        :rtype: requests.Response
        """
        apiSrcStr = self.apiSrcId if ("?" not in url) else self.apiSrcId2
        return self.session.post(
            self.url_prefix + url + apiSrcStr,
            json=data,
            files=files,
            verify=self.verify,
            timeout=self.timeout,
            headers=self.headers,
        )

    def _req_get(
        self,
        url: str,
    ) -> requests.Response:
        """Assemble and send Requests request for HTTP GET method

        :param url: Full URL to use in HTTP request
        :type url: str
        :return: Requests Response object
        :rtype: requests.Response
        """
        apiSrcStr = self.apiSrcId if ("?" not in url) else self.apiSrcId2
        return self.session.get(
            self.url_prefix + url + apiSrcStr,
            verify=self.verify,
            timeout=self.timeout,
            headers=self.headers,
        )

    def _req_delete(
        self,
        url: str,
    ) -> requests.Response:
        """Assemble and send Requests request for HTTP DELETE method

        :param url: Full URL to use in HTTP request
        :type url: str
        :return: Requests Response object
        :rtype: requests.Response
        """
        apiSrcStr = self.apiSrcId if ("?" not in url) else self.apiSrcId2
        return self.session.delete(
            self.url_prefix + url + apiSrcStr,
            verify=self.verify,
            timeout=self.timeout,
            headers=self.headers,
        )

    def _req_put(
        self,
        url: str,
        data,
    ) -> requests.Response:
        """Assemble and send Requests request for HTTP PUT method

        :param url: Full URL to use in HTTP request
        :type url: str
        :param data: Data to pass in request body
        :type data: str, list, dict
        :return: Requests Response object
        :rtype: requests.Response
        """
        apiSrcStr = self.apiSrcId if ("?" not in url) else self.apiSrcId2
        return self.session.put(
            self.url_prefix + url + apiSrcStr,
            json=data,
            verify=self.verify,
            timeout=self.timeout,
            headers=self.headers,
        )

    # HTTP RESPONSE HANDLER

    def _handle_response(
        self,
        api_path: str,
        response: requests.models.Response,
        expected_status: list,
        return_type: str,
    ):
        """Handle response from API call

        :param api_path: API path that was used for API call
            (not including url_prefix)
        :type api_path: str
        :param response: Requests response object to obtain status code
        :type response: requests.Response
        :param expected_status: List of expected HTTP status codes of
            response, e.g. [200] or [200,204]
        :type expected_status: list
        :param return_type: Option for data to return back to original
            function, e.g. "json" "text" "bool" "full_response"
        :type return_type: str
        :return: Requests Response object
        :rtype: requests.Response
        """
        response_method = (
            str(response.request)
            .replace("<PreparedRequest ", "")
            .replace(">", "")
        )
        if response.status_code not in expected_status:
            self.logger.error(
                f"{response_method} {api_path} | Received HTTP "
                f"{response.status_code} | Response text: {response.text}"
            )
            # return formatted data for the source method
            # for JSON data, return a dictionary with the details of
            # the response
            if return_type == "json":
                return {
                    "request": response_method,
                    "api_path": api_path,
                    "status_code": response.status_code,
                    "text": response.text,
                }
            elif return_type == "text":
                return response.text
            elif return_type == "bool":
                return False
            elif return_type == "full_response":
                return response

        # If Orchestrator set with log_success == True, include response
        # text in log messages. Default behavior is to omit response
        # text from log messages for successful API calls.
        if self.log_success:
            self.logger.info(
                f"{response_method} {api_path} | Received HTTP "
                f"{response.status_code} | Response text: {response.text}"
            )
        else:
            # Log successful call, omit response text in case sensitive
            # data in response text
            self.logger.info(
                f"{response_method} {api_path} | Received HTTP "
                f"{response.status_code} "
                "| Response omitted to avoid logging sensitive data"
            )

        # return formatted data for the source method
        if return_type == "json":
            return response.json()
        elif return_type == "text":
            return response.text
        elif return_type == "bool":
            return True
        elif return_type == "full_response":
            return response

    # HTTP REQUESTS CALLED BY METHODS

    def _post(
        self,
        api_path: str,
        data="",
        files={},
        expected_status: list = [200],
        return_type: str = "json",
    ):
        """Setup HTTP POST request and send results to _handle_response
        method. Catches Exceptions and logs to log file

        :param api_path: API path to append to url_prefix
        :type api_path: str
        :param data: Data to pass in request body, defaults to ""
        :type data: str, list, dict, optional
        :param expected_status: List of expected HTTP status codes of
            response, defaults to [200]
        :type expected_status: list, optional
        :param return_type: Filter for data to include in response to
            function call, accepted values are "json" "text" "bool"
            "full_response", defaults to "json"
        :type return_type: str, optional
        :return: Returns False on exceptions, otherwise passes return
            through _handle_response method for processing Requests
            response
        :rtype: bool, _handle_response method
        """
        if return_type not in ["json", "text", "bool", "full_response"]:
            self.logger.error(
                "Called POST {} with unknown return type '{}'".format(
                    api_path, return_type
                )
            )
        try:
            response = self._req_post(api_path, data, files)
            return self._handle_response(
                api_path, response, expected_status, return_type
            )

        except requests.exceptions.ConnectTimeout:
            self.logger.error(
                f"POST {api_path} | Request Timed Out - "
                f"Timeout values (connect/read): {self.timeout}"
            )
        except Exception as ex:
            self.logger.error(
                "Exception {} when calling POST {}. Traceback: {}".format(
                    type(ex), api_path, traceback.format_exc()
                )
            )
            return False

    def _get(
        self,
        api_path: str,
        expected_status: list = [200],
        return_type: str = "json",
    ):
        """Setup HTTP GET request and send results to _handle_response
        method. Catches Exceptions and logs to log file

        :param api_path: API path to append to url_prefix
        :type api_path: str
        :param expected_status: List of expected HTTP status codes of
            response, defaults to [200]
        :type expected_status: list, optional
        :param return_type: Filter for data to include in response to
            function call, accepted values are "json" "text" "bool"
            "full_response", defaults to "json"
        :type return_type: str, optional
        :return: Returns False on exceptions, otherwise passes return
            through _handle_response method for processing Requests
            response
        :rtype: bool, _handle_response method
        """
        if return_type not in ["json", "text", "bool", "full_response"]:
            self.logger.error(
                "Called GET {} with unknown return type '{}'".format(
                    api_path, return_type
                )
            )
        try:
            response = self._req_get(api_path)
            return self._handle_response(
                api_path, response, expected_status, return_type
            )
        except requests.exceptions.ConnectTimeout:
            self.logger.error(
                f"GET {api_path} | Request Timed Out - "
                f"Timeout values (connect/read): {self.timeout}"
            )
        except Exception as ex:
            self.logger.error(
                "Exception {} when calling GET {}. Traceback: {}".format(
                    type(ex), api_path, traceback.format_exc()
                )
            )
            return False

    def _delete(
        self,
        api_path: str,
        expected_status: list = [200],
        return_type: str = "json",
    ):
        """Setup HTTP DELETE request and send results to
        _handle_response method. Catches Exceptions and logs to log file

        :param api_path: API path to append to url_prefix
        :type api_path: str
        :param expected_status: List of expected HTTP status codes of
            response, defaults to [200]
        :type expected_status: list, optional
        :param return_type: Filter for data to include in response to
            function call, accepted values are "json" "text" "bool"
            "full_response", defaults to "json"
        :type return_type: str, optional
        :return: Returns False on exceptions, otherwise passes return
            through _handle_response method for processing Requests
            response
        :rtype: bool, _handle_response method
        """
        if return_type not in ["json", "text", "bool", "full_response"]:
            self.logger.error(
                "Called DELETE {} with unknown return type '{}'".format(
                    api_path, return_type
                )
            )
        try:
            response = self._req_delete(api_path)
            return self._handle_response(
                api_path, response, expected_status, return_type
            )
        except requests.exceptions.ConnectTimeout:
            self.logger.error(
                f"DELETE {api_path} | Request Timed Out - "
                f"Timeout values (connect/read): {self.timeout}"
            )
        except Exception as ex:
            self.logger.error(
                "Exception {} when calling DELETE {}. Traceback: {}".format(
                    type(ex), api_path, traceback.format_exc()
                )
            )
            return False

    def _put(
        self,
        api_path: str,
        data="",
        expected_status: list = [200],
        return_type: str = "json",
    ):
        """Setup HTTP PUT request and send results to _handle_response
        method. Catches Exceptions and logs to log file

        :param api_path: API path to append to url_prefix
        :type api_path: str
        :param data: Data to pass in request body, defaults to ""
        :type data: str, list, dict, optional
        :param expected_status: List of expected HTTP status codes of
            response, defaults to [200]
        :type expected_status: list, optional
        :param return_type: Filter for data to include in response to
            function call, accepted values are "json" "text" "bool"
            "full_response", defaults to "json"
        :type return_type: str, optional
        :return: Returns False on exceptions, otherwise passes return
            through _handle_response method for processing Requests
            response
        :rtype: bool, _handle_response method
        """
        if return_type not in ["json", "text", "bool", "full_response"]:
            self.logger.error(
                "Called PUT {} with unknown return type '{}'".format(
                    api_path, return_type
                )
            )
        try:
            response = self._req_put(api_path, data)
            return self._handle_response(
                api_path, response, expected_status, return_type
            )
        except requests.exceptions.ConnectTimeout:
            self.logger.error(
                f"PUT {api_path} | Request Timed Out - "
                f"Timeout values (connect/read): {self.timeout}"
            )
        except Exception as ex:
            self.logger.error(
                "Exception {} when calling PUT {}. Traceback: {}".format(
                    type(ex), api_path, traceback.format_exc()
                )
            )
            return False


# Aruba Orchestrator
[docs] class Orchestrator(HttpCommon): """Orchestrator setup and imports related methods for making API calls to Orchestrator. Child class of :class:`HttpCommon` """ # noqa RST304 def __init__( self, url: str, api_key: str = "", auth_mode: str = "local", log_file: bool = False, log_console: bool = False, log_success: bool = False, verify_ssl: bool = True, timeout: tuple = (9.15, 12), ): """Setup Orchestrator instance .. warning:: If ``log_file`` and ``log_success`` are set to ``True`` response text from successful API calls will be logged to the local log file. Some responses can include sensitive data that you may not wish to retain in the log files. :param url: IP address or URL to Orchestrator server :type url: str :param api_key: API Key for non-user-based authentication :type api_key: str, optional :param auth_mode: Reference what form of user-based authentication is in use, accepts ``local``, ``radius``, and ``tacacs``, defaults to "local" :type auth_mode: str, optional :param log_file: Enable logging messages to local log file, defaults to False :type log_file: bool, optional :param log_console: Enable logging messages to stdout, defaults to False :type log_console: bool, optional :param log_success: Enable logging response text of successful API calls in log messages. Will print a warning to the user if set to ``True`` while ``log_file`` is also ``True`` to raise awareness that sensitive data may be stored in the log file. :type log_success: bool, optional :param verify_ssl: Set to ``False`` to ignore certificate warnings within requests, defaults to ``True`` :type verify_ssl: bool, optional :param timeout: Timeout values (in seconds) for requests, first value is for initial connection, second value is for read timeouts. Defaults to ``(9.15, 12)``, 9.15 seconds for connection, 12 seconds for server data response. :type timeout: tuple, optional :raises ValueError: If Orchestrator auth_mode specified not in supported_auth_modes """ supported_auth_modes = ["local", "radius", "tacacs"] if auth_mode not in supported_auth_modes: raise ValueError( "Orchestrator auth_mode must be one of %r." % supported_auth_modes ) self.url_prefix = "https://" + url + "/gms/rest" self.timeout = timeout self.session = requests.Session() if api_key != "": self.headers = {"X-Auth-Token": api_key} else: self.headers = {} # for API calls w/ just source as query param self.apiSrcId = "?source=menu_rest_apis_id" # for API calls w/ multiple query params self.apiSrcId2 = "&source=menu_rest_apis_id" # remote authentication modes supported via this helper module self.supportedAuthModes = supported_auth_modes # change authMode to the desired auth mode before # invoking login() function self.authMode = auth_mode self.authenticated = False # disable certificate warning messages / errors self.verify = verify_ssl if verify_ssl is False: requests.packages.urllib3.disable_warnings( category=InsecureRequestWarning ) # Allow logging successful response text/data if log_success # is set to True, warn user if logging to local file is also # enabled self.log_success = log_success if self.log_success and log_file: print( """ NOTE: Logging Response text from successful API calls has been enabled. Some responses can include sensitive data that you may not wish to retain in the log files. """ ) # Setup general log settings for messages and errors self.logger = logging.getLogger(f"orch_{url}") self.formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) self.logger.setLevel(logging.INFO) # Setup logging file if log_file set to True if log_file: local_log_directory = "logging/" if not os.path.exists(local_log_directory): os.makedirs(local_log_directory) self.file_handler = logging.FileHandler( "{}sp_orch.log".format(local_log_directory) ) self.file_handler.setFormatter(self.formatter) self.file_handler.setLevel(logging.INFO) self.logger.addHandler(self.file_handler) # Setup logging to console if log_console set to True if log_console: self.console_handler = logging.StreamHandler(sys.stdout) self.console_handler.setFormatter(self.formatter) self.console_handler.setLevel(logging.DEBUG) self.logger.setLevel(logging.DEBUG) self.logger.addHandler(self.console_handler) # Check if Orchestrator version is 9.3+ if API Key provided if api_key != "": try: orch_info = self.get_orchestrator_server_brief() release = orch_info["release"] major = int(release.split(".")[0]) minor = int(release.split(".")[1]) self.orch_version = major + minor / 10 except Exception as e: print(e) print( """ Attempt to retrieve Orchestrator version failed Defaulting logic to pre-9.3 API endpoints """ ) # Orch Version not found, default to pre-9.3 self.orch_version = 0.0 else: pass # Imported methods from .orch._acls import get_appliance_acls from .orch._action_log import ( cancel_audit_log_task, get_audit_log, get_audit_log_task_status, ) from .orch._active_sessions import get_active_sessions_orchestrator from .orch._admin_distance import get_appliance_admin_distance from .orch._advanced_properties import ( get_orchestrator_advanced_properties, get_orchestrator_advanced_properties_metadata, update_orchestrator_advanced_properties, ) from .orch._aggregate_stats import ( get_aggregate_stats_active_flows, get_aggregate_stats_appliances, get_aggregate_stats_appliances_ne_pk_list, get_aggregate_stats_appliances_single_appliance, get_aggregate_stats_application2_ne_pk_tunnels, get_aggregate_stats_application_ne_pk_tunnels, get_aggregate_stats_applications, get_aggregate_stats_applications_ne_pk_list, get_aggregate_stats_applications_single_appliance, get_aggregate_stats_boost_ne_pk_list, get_aggregate_stats_boost_single_appliance, get_aggregate_stats_dns_ne_pk_list, get_aggregate_stats_dns_ne_pk_tunnels, get_aggregate_stats_dns_single_appliance, get_aggregate_stats_drc, get_aggregate_stats_drc_ne_pk_list, get_aggregate_stats_drc_single_appliance, get_aggregate_stats_dscp, get_aggregate_stats_dscp_ne_pk_list, get_aggregate_stats_dscp_single_appliance, get_aggregate_stats_flows, get_aggregate_stats_flows_ne_pk_list, get_aggregate_stats_flows_single_appliance, get_aggregate_stats_interface, get_aggregate_stats_interface_ne_pk_list, get_aggregate_stats_interface_overlay_transport_ne_pk_list, get_aggregate_stats_interface_overlay_transport_ne_pk_tunnels, get_aggregate_stats_jitter, get_aggregate_stats_jitter_ne_pk_list, get_aggregate_stats_jitter_single_appliance, get_aggregate_stats_mos_ne_pk_list, get_aggregate_stats_mos_ne_pk_tunnels, get_aggregate_stats_mos_single_appliance, get_aggregate_stats_overlay_bandwidth_ne_pk_tunnels, get_aggregate_stats_ports, get_aggregate_stats_ports_ne_pk_list, get_aggregate_stats_ports_ne_pk_tunnels, get_aggregate_stats_ports_single_appliance, get_aggregate_stats_security_policy_ne_pk_list, get_aggregate_stats_security_policy_single_appliance, get_aggregate_stats_shaper_ne_pk_list, get_aggregate_stats_top_talkers, get_aggregate_stats_top_talkers_ne_pk_list, get_aggregate_stats_top_talkers_ne_pk_tunnels, get_aggregate_stats_top_talkers_single_appliance, get_aggregate_stats_top_talkers_split_single_appliance, get_aggregate_stats_traffic_behavior, get_aggregate_stats_traffic_behavior_ne_pk_list, get_aggregate_stats_traffic_behavior_single_appliance, get_aggregate_stats_traffic_class, get_aggregate_stats_traffic_class_ne_pk_list, get_aggregate_stats_traffic_class_single_appliance, get_aggregate_stats_tunnels, get_aggregate_stats_tunnels_ne_pk_list, get_aggregate_stats_tunnels_ne_pk_tunnels, get_aggregate_stats_tunnels_single_appliance, ) from .orch._alarm import ( acknolwedge_alarms_from_appliance, acknowledge_alarms_from_orchestrator, add_note_to_appliance_alarm, clear_alarms_from_appliance, clear_alarms_from_orchestrator, delete_alarm_email_delay, delete_all_customized_alarm_severity, delete_customized_alarm_severity_for_type, delete_supressed_alarms, get_alarm_count_all_appliances, get_alarm_count_from_appliance, get_alarm_count_orchestrator_and_appliances, get_alarm_count_orchestrator_or_appliances, get_alarm_descriptions, get_alarm_email_delay, get_alarm_notification_status, get_alarms_from_appliances, get_alarms_from_orchestrator, get_customized_alarm_severity, get_customized_alarm_severity_for_type, get_supressed_alarms, set_alarm_email_delay, set_alarm_notification_status, set_customized_alarm_severity, set_supressed_alarms, update_alarm_email_delay, update_customized_alarm_severity, ) from .orch._api_key import ( add_api_key, delete_api_key, get_api_key, get_api_keys, update_api_key, ) from .orch._app_system_deploy_info import ( get_appliance_system_deployment_info, get_discovered_appliance_system_deployment_info, ) from .orch._app_system_state_info import get_appliance_system_state_info from .orch._appliance import ( add_and_approve_discovered_appliances, add_discovered_appliances, appliance_delete_api, appliance_get_api, appliance_post_api, change_appliance_credentials, change_appliance_group, default_appliance_stats_config, delete_appliance, delete_appliance_for_rediscovery, delete_denied_appliances, deny_appliance, get_all_approved, get_all_denied_appliances, get_all_discovered, get_appliance_dns_cache_config, get_appliance_info, get_appliance_stats_config, get_appliances, get_appliances_queued_for_deletion, modify_appliance, modify_appliance_stats_config, rediscover_denied_appliance, update_discovered_appliances, ) from .orch._appliance_backup import ( backup_appliance_config, delete_appliance_backup, get_appliance_backup_history, restore_appliance_from_backup, ) from .orch._appliance_crash_history import appliance_crash_history from .orch._appliance_extra_info import ( delete_appliance_extra_info, get_appliance_extra_info, set_appliance_extra_info, ) from .orch._appliance_preconfig import ( apply_preconfig_to_existing, approve_and_apply_preconfig, create_preconfig, delete_preconfig, find_matching_preconfig, get_all_preconfig, get_apply_preconfig_status, get_default_preconfig, get_preconfig, modify_preconfig, validate_preconfig, ) from .orch._appliance_reboot_history import get_appliance_reboot_history from .orch._appliance_resync import appliance_resync from .orch._appliance_upgrade import delete_ecos_image, get_ecos_images from .orch._appliances_software_versions import ( get_appliance_software_version, ) from .orch._application_definition import ( delete_user_defined_app_address_map, delete_user_defined_app_dns_classification, delete_user_defined_app_port_protocol, get_application_modification_times, get_user_defined_app_address_map, get_user_defined_app_dns_classification, get_user_defined_app_groups, get_user_defined_app_port_protocol, post_user_defined_app_address_map, update_user_defined_app_dns_classification, update_user_defined_app_groups, update_user_defined_app_port_protocol, ) from .orch._authentication import get_appliance_auth_information from .orch._avc_mode import get_avc_mode from .orch._banners import get_appliance_login_banners from .orch._bgp import ( get_appliance_bgp_config, get_appliance_bgp_config_all_vrfs, get_appliance_bgp_neighbors, get_appliance_bgp_neighbors_all_vrfs, get_appliance_bgp_state, get_appliance_bgp_state_all_vrfs, ) from .orch._bonded_tunnels_configuration import ( get_bonded_tunnel_details, get_bonded_tunnel_details_for_appliance, get_bonded_tunnel_details_for_appliance_tunnel, get_bonded_tunnels_for_physical_tunnel, get_bonded_tunnels_state, ) from .orch._bridge_interface_state import ( get_appliance_bridge_interface_state, ) from .orch._broadcast_cli import broadcast_cli from .orch._built_in_policies import get_built_in_policies from .orch._custom_appliance_tags import get_custom_appliance_tags from .orch._custom_certs import ( check_custom_certs_appliances_to_portal, check_custom_certs_orchestrator_to_portal, delete_custom_cert, get_custom_certs, get_custom_certs_enabled, set_custom_certs_enabled, update_custom_certs, verify_custom_cert, ) from .orch._db_partition import delete_db_partition, get_db_partition from .orch._debug_files import ( cancel_debug_file_download, delete_debug_file_from_appliance, delete_debug_file_from_orchestrator, generate_appliance_sysdump, get_debug_file_proxy_settings, get_debug_files_from_appliance, set_debug_file_proxy_settings, upload_appliance_debug_files_to_orchestrator, upload_appliance_debug_files_to_support, ) from .orch._deployment import ( get_all_appliance_deployment, get_appliance_deployment, get_single_appliance_deployment, ) from .orch._discovery import ( get_appliance_discovery_emails, set_appliance_discovery_emails, ) from .orch._disks import get_appliance_disk_information from .orch._dns import get_appliance_dns from .orch._dns_proxy import get_dns_proxy from .orch._exception import ( create_tunnel_exceptions, delete_all_tunnel_exceptions, delete_single_tunnel_exception, delete_tunnel_exceptions_list, get_tunnel_exceptions, update_single_tunnel_exception, update_tunnel_exceptions, ) from .orch._flow import ( get_appliance_flow_bandwidth_stats, get_appliance_flow_details, get_appliance_flow_details_verbose, get_appliance_flows, reclassify_flows, reset_flows, ) from .orch._gms_backup import ( add_or_update_orchestrator_backup_config, create_orchestrator_blueprint_template, get_orchestrator_backup_config, test_orchestrator_backup_config, ) from .orch._gms_notification import ( delete_gms_notification, get_gms_notification, update_gms_notification, ) from .orch._gms_registration import ( get_orchestrator_registration_setting, set_orchestrator_registration_setting, ) from .orch._gms_server import ( get_orchestrator_hello, get_orchestrator_server_brief, get_orchestrator_server_info, get_orchestrator_server_os, get_orchestrator_server_ping, get_orchestrator_server_versions, ) from .orch._gms_smtp import ( delete_gms_smtp_settings, delete_unverified_email_addresses, get_gms_smtp_settings, get_unverified_email_addresses, send_verification_email, set_gms_smtp_settings, test_gms_smtp_settings, verify_email_address, ) from .orch._gms_stats_collection import ( get_gms_stats_collection, get_gms_stats_collection_defaults, update_gms_stats_collection, ) from .orch._grnode import ( get_all_appliance_locations, get_appliance_location, update_appliance_location_grnodepk, update_appliance_location_nepk, ) from .orch._group import ( add_gms_group, delete_gms_group, get_gms_group, get_gms_groups, get_root_gms_group, update_gms_group, ) from .orch._ha_groups import get_ha_groups, modify_ha_groups from .orch._health import ( get_health_alarm_summary, get_health_appliance_summary, get_health_jitter, get_health_latency, get_health_loss, get_health_mos, get_health_threshold_config, set_health_threshold_config, ) from .orch._hostname import get_orchestrator_hostname from .orch._idle_time import clear_idle_time, increment_idle_time from .orch._ikeless import ( get_ipsec_udp_key_config, get_ipsec_udp_key_history, get_ipsec_udp_key_status, update_ipsec_udp_key_config, ) from .orch._inbound_shaper import get_appliance_inbound_shaper from .orch._interface_labels import ( get_all_interface_labels, get_interface_labels_by_type, push_interface_labels_to_appliance, update_interface_labels, ) from .orch._interface_state import get_appliance_interface_state from .orch._internal_subnets import ( get_internal_subnets, update_internal_subnets, ) from .orch._ip_allow_list import ( get_ip_allow_list, get_ip_allow_list_drops, update_ip_allow_list, ) from .orch._ip_objects import ( bulk_upload_address_group, bulk_upload_service_group, create_address_group, create_service_group, delete_address_group, delete_service_group, get_address_group, get_all_address_groups, get_all_service_groups, get_service_group, merge_address_groups, merge_service_groups, update_address_group, update_service_group, ) from .orch._license import ( change_appliance_license, delete_appliance_license_token, get_nx_licensed_appliances, get_portal_licensed_appliances, get_portal_licensed_summary, get_vx_licensed_appliances, grant_appliance_base_license, revoke_appliance_base_license, ) from .orch._link_aggregation import get_link_aggregation_data from .orch._link_integrity import ( get_link_integrity_test_result, link_integrity_test, ) from .orch._location import get_location_coordinates_from_address from .orch._logging import get_appliance_syslog_config from .orch._login import login, logout, send_mfa from .orch._loopback import get_loopback_interfaes from .orch._loopback_orch import ( get_deleted_loopback_orchestration_ips, get_loopback_orchestration, get_loopback_orchestration_pool_detail, reclaim_delete_loopback_orchestration_ips, reclaim_single_deleted_loopback_orchestration_ip, set_loopback_orchestration, ) from .orch._maintenance_mode import ( get_maintenance_mode_appliances, update_maintenance_mode_appliances, ) from .orch._mgmt_services import get_mgmt_services from .orch._multicast import ( get_appliance_multicast_config, get_appliance_multicast_enabled, get_appliance_multicast_interfaces, get_appliance_multicast_neighbors, get_appliance_multicast_routes, ) from .orch._nat import ( get_appliance_nat_config, get_appliance_nat_maps, get_appliance_nat_pools, ) from .orch._nat_policy import ( get_nat_policy, get_nat_policy_dynamic, get_nat_policy_inbound_outbound, ) from .orch._net_flow import get_net_flow_configuration from .orch._network_memory import erase_appliance_network_memory from .orch._network_role_and_site import ( get_appliance_network_role_and_site, update_appliance_network_role_and_site, ) from .orch._optimization_policy import get_optimization_policy from .orch._ospf import ( get_appliance_ospf_config, get_appliance_ospf_interfaces_config, get_appliance_ospf_interfaces_state, get_appliance_ospf_neighbors_state, get_appliance_ospf_state, ) from .orch._overlay_association import ( add_appliance_overlay_association, get_all_appliance_overlay_association, get_appliance_overlay_association, remove_appliance_overlay_association, remove_single_appliance_overlay_association, ) from .orch._overlays import ( configure_new_overlay, configure_regionalized_overlay, delete_overlay, get_all_overlays_config, get_all_overlays_config_keyed, get_appliance_overlays_association, get_max_overlays, get_overlay_config, get_overlay_config_for_region, get_overlays_priorities, modify_overlay_config, modify_overlay_config_for_region, modify_regionalized_overlay, set_overlays_priorities, ) from .orch._pause_orchestration import ( get_pause_orchestration, set_pause_orchestration, ) from .orch._peer_priority import get_peer_priority_configuration from .orch._port_forwarding import get_appliance_port_fowarding from .orch._qos_policy import get_qos_policy from .orch._rbac_appliance_access_group import ( delete_appliance_access_group, get_all_appliance_access_groups, get_appliance_access_group, update_appliance_access_group, ) from .orch._rbac_assignment import ( delete_rbac_user_assignment, get_rbac_assignments, get_rbac_user_assignment, update_rbac_assignment, ) from .orch._rbac_role import ( delete_rbac_role, get_all_rbac_roles, get_rbac_role, get_rbac_role_assigned, update_rbac_role, ) from .orch._reachability import ( get_reachability_status_appliance, get_reachability_status_orchestrator, ) from .orch._realtime_stats import get_realtime_stats from .orch._regions import ( create_region, delete_region, get_all_regions, get_region, get_region_appliance_association, get_region_appliance_association_by_nepk, get_region_appliance_association_by_region_id, set_region_appliance_association, update_region_appliance_association, update_region_name, ) from .orch._releases import ( delay_release_notification, dismiss_release_notification, get_releases_for_orchestrator_and_ecos, get_releases_notifications, ) from .orch._rest_api_config import get_rest_api_config, set_rest_api_config from .orch._rest_request_time_stats import ( get_appliance_rest_stats, get_appliance_rest_stats_by_method, ) from .orch._route_policy import get_route_policy from .orch._save_changes import ( save_changes_ne_pk_list, save_changes_single_appliance, ) from .orch._schedule_timezone import ( get_schedule_timezone, set_schedule_timezone, ) from .orch._security_maps import get_appliance_security_maps from .orch._security_settings import ( get_security_settings, set_security_settings, ) from .orch._services import ( get_gms_internet_policy_services, get_gms_third_party_services, update_gms_internet_policy_services, ) from .orch._session import get_orchestrator_sessions from .orch._session_timeout import ( get_orch_session_timeout, set_orch_session_timeout, ) from .orch._shaper import get_appliance_shaper from .orch._shell import get_shell_access_setting, set_shell_access_setting from .orch._snmp import get_appliance_snmp from .orch._sp_portal import ( assign_account_license_ecsp, create_case_with_portal, delete_old_account_key, geo_locate_multiple_ips, geo_locate_single_ip, get_account_key_change_count, get_account_key_change_status, get_account_license_ecsp_status, get_account_license_feature, get_account_license_type, get_all_saas_apps, get_app_definition_data, get_app_definition_total, get_app_groups, get_app_groups_hash, get_appliance_orch_portal_status, get_cloud_portal_broadcast_message, get_compound_classification, get_compound_classification_hash, get_count_of_saas_apps, get_dns_classification, get_dns_classification_hash, get_flow_classification, get_flow_classification_hash, get_ip_protocol_numbers, get_orchestrator_to_cloud_portal_status, get_port_protocol_classification, get_port_protocol_classification_hash, get_portal_registration_config, get_portal_registration_status, get_portal_services_status, get_portal_top_sites, get_saas_classification, get_saas_classification_hash, get_service_id_to_country_mapping, get_service_id_to_service_mapping, get_tcp_udp_port_data, get_traffic_behavior, get_traffic_behavior_hash, get_update_time_for_app_definitions, request_new_account_key, search_app_definition_data, unassign_account_license_ecsp, update_portal_registration_config, update_portal_registration_status, ) from .orch._ssl import get_appliance_ssl_certs from .orch._ssl_substitute_cert import ( get_appliance_ssl_substitute_certs, validate_ssl_substitute_cert, ) from .orch._stats_retention import ( get_all_nonstats_retention, get_all_stats_collection, get_all_stats_retention, get_stats_approximate_disk_space, update_nonstats_retention, update_stats_collection, update_stats_retention, ) from .orch._subnets import ( get_appliance_subnets, get_discovered_appliance_subnets, set_appliance_subnet_sharing_options, ) from .orch._tca import get_appliance_tca, get_appliance_tunnel_tca from .orch._tcpdump import ( tcpdump_run, tcpdump_status_all, tcpdump_status_appliance, ) from .orch._template import ( associate_template_group_to_appliance, create_template_group, delete_template_group, get_all_template_groups, get_appliance_applied_template_goups, get_appliance_template_groups_association, get_appliance_template_history, get_selected_templates_in_template_group, get_template_group, get_template_group_association_all_appliances, get_template_groups_priorities, post_template_group, select_templates_for_template_group, set_template_groups_priorities, ) from .orch._third_party_services import ( add_new_service_orchestration, add_service_orchestration_remote_endpoints, central_add_subscription, central_assign_appliance_to_site, central_delete_subscription, central_get_site_mapping, central_get_subscription, clearpass_add_account, clearpass_delete_account, clearpass_filter_events, clearpass_get_configured_account, clearpass_get_configured_account_details, clearpass_get_configured_accounts, clearpass_get_connectivity, clearpass_get_pause_orchestration_status, clearpass_get_service_endpoint_status, clearpass_get_user_roles_for_ip, clearpass_pause_individual_orchestration, clearpass_post_login_event, clearpass_post_logout_event, clearpass_reset_service_endpoint, clearpass_set_pause_orchestration_status, clearpass_update_account, delete_service_orchestration, delete_service_orchestration_remote_endpoints, get_service_orchestration_all_names_to_ids, get_service_orchestration_all_services, get_service_orchestration_appliance_association, get_service_orchestration_breakout_state, get_service_orchestration_config_entries, get_service_orchestration_ipsla_settings, get_service_orchestration_labels, get_service_orchestration_remote_endpoints, get_service_orchestration_tunnel_identifiers, get_service_orchestration_tunnel_settings, set_service_orchestration_appliance_association, set_service_orchestration_breakout_state, set_service_orchestration_ipsla_settings, set_service_orchestration_labels, set_service_orchestration_remote_endpoints, set_service_orchestration_tunnel_settings, ) from .orch._third_party_tunnels_configuration import ( get_passthrough_tunnel_details, get_passthrough_tunnel_details_for_appliance, get_passthrough_tunnel_details_for_appliance_tunnel, get_passthrough_tunnels_state, ) from .orch._timeseries_stats import ( get_timeseries_stats_appliance_process_state, get_timeseries_stats_appliances, get_timeseries_stats_appliances_ne_pk_list, get_timeseries_stats_appliances_single_appliance, get_timeseries_stats_application, get_timeseries_stats_application_ne_pk_list, get_timeseries_stats_application_single_appliance, get_timeseries_stats_boost_single_appliance, get_timeseries_stats_drc, get_timeseries_stats_drc_ne_pk_list, get_timeseries_stats_drc_single_appliance, get_timeseries_stats_dscp, get_timeseries_stats_dscp_ne_pk_list, get_timeseries_stats_dscp_single_appliance, get_timeseries_stats_flow, get_timeseries_stats_flow_ne_pk_list, get_timeseries_stats_flow_single_appliance, get_timeseries_stats_interface_overlay_single_appliance, get_timeseries_stats_interface_single_appliance, get_timeseries_stats_internal_drops_single_appliance, get_timeseries_stats_jitter_single_appliance, get_timeseries_stats_mos_single_appliance, get_timeseries_stats_orchestrator_memory, get_timeseries_stats_security_policy_single_appliance, get_timeseries_stats_shaper, get_timeseries_stats_shaper_ne_pk_list, get_timeseries_stats_traffic_class, get_timeseries_stats_traffic_class_ne_pk_list, get_timeseries_stats_traffic_class_single_appliance, get_timeseries_stats_tunnel_single_appliance, ) from .orch._tunnels_configuration import ( get_appliance_tunnel_ids, get_batch_appliance_tunnels_config, get_batch_appliance_tunnels_state, get_physical_tunnel_details, get_physical_tunnel_details_for_appliance, get_physical_tunnel_details_for_appliance_tunnel, get_total_tunnel_count, get_tunnel_count_for_appliances, get_tunnel_traceroute, get_tunnels_between_appliances, get_tunnels_between_appliances_config_data, initiate_tunnel_traceroute, ) from .orch._ui_usage_stats import add_ui_usage_count from .orch._upgrade_appliances import ( upgrade_appliances, validate_appliance_upgrade, ) from .orch._user import ( change_user_password, create_or_update_user, delete_user, get_all_users, get_new_two_factor_key, get_user, reset_user_password, user_forgot_password, ) from .orch._user_account import get_appliance_user_accounts from .orch._vrf import ( add_routing_segmentation_segment, delete_routing_segmentation_maps_from_source_segment, delete_routing_segmentation_segment_by_id, get_routing_segmentation_enable_status, get_routing_segmentation_list_of_security_maps, get_routing_segmentation_maps, get_routing_segmentation_maps_from_source_segment, get_routing_segmentation_security_policy, get_routing_segmentation_segment_by_id, get_routing_segmentation_segments, get_routing_segmentation_snat_maps, update_routing_segmentation_enable_status, update_routing_segmentation_maps_from_source_segment, update_routing_segmentation_security_policy, update_routing_segmentation_segment_by_id, update_routing_segmentation_snat_maps, ) from .orch._vrf_dnat_maps import get_dnat_maps from .orch._vrf_snat_maps import get_snat_maps from .orch._vrrp import get_vrrp_interfaces from .orch._vti import get_vti_interfaes from .orch._vxoa_hostname import update_appliance_hostname from .orch._wan_next_hop_health import get_wan_next_hop_health_config from .orch._wccp import ( get_wccp_service_group_settings, get_wccp_state, get_wccp_system_settings, ) from .orch._zones import ( get_zone_next_id, get_zones, get_zones_end_to_end_state, get_zones_vrf_mapping, set_zone_next_id, update_zones, update_zones_end_to_end_state, )
# Aruba Edge Connect
[docs] class EdgeConnect(HttpCommon): """Edge Connect setup and imports related methods for making API calls to Edge Connect appliances. Child class of :class:`HttpCommon` """ # noqa RST304 def __init__( self, url: str, log_file: bool = False, log_console: bool = False, log_success: bool = False, verify_ssl: bool = True, timeout: tuple = (9.15, 12), ): """Setup Edge Connect instance .. warning:: If ``log_file`` and ``log_success`` are set to ``True`` response text from successful API calls will be logged to the local log file. Some responses can include sensitive data that you may not wish to retain in the log files. :param url: IP address or URL to Edge Connect appliance :type url: str :param api_key: API Key for non-user-based authentication :type api_key: str, optional :param auth_mode: Reference what form of user-based authentication is in use, accepts ``local``, ``radius``, and ``tacacs``, defaults to "local" :type auth_mode: str, optional :param log_file: Enable logging messages to local log file, defaults to False :type log_file: bool, optional :param log_console: Enable logging messages to stdout, defaults to False :type log_console: bool, optional :param log_success: Enable logging response text of successful API calls in log messages. Will print a warning to the user if set to ``True`` while ``log_file`` is also ``True`` to raise awareness that sensitive data may be stored in the log file. :type log_success: bool, optional :param verify_ssl: Set to ``False`` to ignore certificate warnings within requests, defaults to ``True`` :param timeout: Timeout values (in seconds) for requests, first value is for initial connection, second value is for read timeouts. Defaults to ``(9.15, 12)``, 9.15 seconds for connection, 12 seconds for server data response. :type timeout: tuple, optional :type verify_ssl: bool, optional """ self.url_prefix = "https://" + url + ":443/rest/json" self.timeout = timeout self.session = requests.Session() self.headers = {} # for API calls w/ just source as query param self.apiSrcId = "?source=menu_rest_apis_id" # for API calls w/ multiple query params self.apiSrcId2 = "&source=menu_rest_apis_id" # disable certificate warning messages / errors self.verify = verify_ssl if verify_ssl is False: requests.packages.urllib3.disable_warnings( category=InsecureRequestWarning ) # Allow logging successful response text/data if log_success # is set to True, warn user if logging to local file is also # enabled self.log_success = log_success if self.log_success and log_file: print( """ NOTE: Logging Response text from successful API calls has been enabled. Some responses can include sensitive data that you may not wish to retain in the log files. """ ) # Setup general log settings for messages and errors self.logger = logging.getLogger(f"ecos_{url}") self.formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) self.logger.setLevel(logging.INFO) # Setup logging file if log_file set to True if log_file: local_log_directory = "logging/" if not os.path.exists(local_log_directory): os.makedirs(local_log_directory) self.file_handler = logging.FileHandler( "{}sp_ecos.log".format(local_log_directory) ) self.file_handler.setFormatter(self.formatter) self.file_handler.setLevel(logging.INFO) self.logger.addHandler(self.file_handler) # Setup logging to console if log_console set to True if log_console: self.console_handler = logging.StreamHandler(sys.stdout) self.console_handler.setFormatter(self.formatter) self.console_handler.setLevel(logging.DEBUG) self.logger.setLevel(logging.DEBUG) self.logger.addHandler(self.console_handler) # Imported methods from .ecos._alarm import ( acknowledge_appliance_alarms, add_note_appliance_alarms, clear_appliance_alarms, delete_appliance_alarms, get_appliance_alarm_descriptions, get_appliance_alarms, ) from .ecos._bonded_tunnel import ( configure_appliance_all_bonded_tunnels, delete_appliance_multiple_bonded_tunnels, delete_appliance_single_bonded_tunnel, get_appliance_all_bonded_tunnel_ids, get_appliance_bonded_tunnel_aliases, get_appliance_bonded_tunnel_live_view_info, get_appliance_bonded_tunnels_config, get_appliance_bonded_tunnels_state, get_appliance_multiple_bonded_tunnels_config, get_appliance_multiple_bonded_tunnels_state, get_appliance_single_bonded_tunnel_config, ) from .ecos._cli import ( perform_appliance_cli_command, perform_appliance_multiple_cli_command, ) from .ecos._cpu import get_appliance_cpu from .ecos._deployment import get_appliance_deployment from .ecos._disk_usage import get_appliance_disk_usage from .ecos._dns import get_appliance_dns_config, set_appliance_dns_config from .ecos._flows import ( get_appliance_flow_bandwidth_stats, get_appliance_flow_details, get_appliance_flow_details_verbose, get_appliance_flows, reclassify_flows, reset_flows, ) from .ecos._gms import assign_orchestrator, get_orchestrator from .ecos._interfaces import get_appliance_interfaces from .ecos._license import is_reboot_required from .ecos._local_subnets import ( add_appliance_locally_configured_routes, appliance_find_preferred_route, delete_appliance_locally_configured_routes, get_appliance_locally_configured_subnets, get_appliance_locally_configured_subnets_single_vrf, get_appliance_routing_peers_info, get_appliance_subnets, get_appliance_subnets_all_vrfs, get_appliance_subnets_single_vrf, update_appliance_all_locally_configured_subnets, update_appliance_all_locally_configured_subnets_single_vrf, ) from .ecos._login import login, logout from .ecos._memory import get_appliance_memory from .ecos._network_interfaces import ( get_appliance_network_interfaces, modify_network_interfaces, ) from .ecos._nexthops import get_appliance_nexthops from .ecos._peers import get_appliance_peers, get_appliance_peers_ec_only from .ecos._ping_trace import ( get_ping_or_traceroute, run_ping_or_traceroute, stop_ping_or_traceroute, ) from .ecos._port_forwarding import ( get_port_forwarding_rules, set_gms_marked_port_forwarding_rules, set_port_forwarding_rules, ) from .ecos._reboot import request_reboot from .ecos._save_changes import save_changes from .ecos._security_maps import ( configure_appliance_security_policies, delete_appliance_security_policy_rule, delete_appliance_security_policy_zone_pair, get_appliance_security_policies, get_appliance_security_policy_map, get_appliance_security_policy_settings, get_appliance_security_policy_settings_by_map_name, get_appliance_security_policy_zone_pair, set_appliance_security_policy_settings, ) from .ecos._sp_portal import register_sp_portal, register_sp_portal_status from .ecos._statistics import ( get_appliance_realtime_stats, get_appliance_stats_minute_file, get_appliance_stats_minute_range, ) from .ecos._system_info import get_appliance_system_info from .ecos._third_party_tunnel import ( configure_appliance_multiple_3rdparty_tunnels, delete_appliance_multiple_3rdparty_tunnels, delete_appliance_single_3rdparty_tunnel, get_appliance_3rdparty_tunnel_aliases, get_appliance_3rdparty_tunnels_config, get_appliance_3rdparty_tunnels_state, get_appliance_all_3rdparty_tunnel_ids, get_appliance_multiple_3rdparty_tunnels_config, get_appliance_multiple_3rdparty_tunnels_state, get_appliance_single_3rdparty_tunnel_config, ) from .ecos._time import get_appliance_time from .ecos._traffic_class import ( get_traffic_class_names, set_traffic_class_names, ) from .ecos._tunnel import ( apply_appliance_tunnel_template, configure_appliance_all_tunnels, configure_appliance_multiple_tunnels, configure_appliance_single_tunnel, delete_appliance_multiple_tunnels, delete_appliance_single_tunnel, get_appliance_all_tunnel_ids, get_appliance_multiple_tunnels_config, get_appliance_multiple_tunnels_state, get_appliance_passthrough_tunnel_source_endpoints, get_appliance_single_tunnel_config, get_appliance_tunnel_aliases, get_appliance_tunnel_source_endpoints, get_appliance_tunnels_config, get_appliance_tunnels_config_and_state, set_appliance_tunnels_ipsec_psk, start_appliance_tunnel_mtu_discovery, )