"""REST API client for AlgoSec **BusinessFlow**."""
import logging
import requests
from requests import status_codes
from six.moves.urllib.parse import quote_plus
from algosec.api_clients.base import RESTAPIClient, APIClient
from algosec.errors import AlgoSecLoginError, AlgoSecAPIError, EmptyFlowSearch, UnauthorizedUserException
from algosec.helpers import mount_adapter_on_session, is_ip_or_subnet, IPHelper
from algosec.models import NetworkObjectSearchTypes, NetworkObjectType
from algosec.constants import API_CALL_FAILED_RESPONSE, APP_UNAUTHORIZED, PERMISSION_ERROR_MSG, \
LOGIN_FAILED_IMPERSONATION_MSG, LOGIN_FAILED_IMPERSONATION_DETAILS
logger = logging.getLogger(__name__)
[docs]class BusinessFlowAPIClient(RESTAPIClient):
"""*BusinessFlow* RESTful API client.
Used by initiating and calling its public methods or by sending custom calls using the ``session`` property.
Client implementation is strictly based on AlgoSec's official API guide.
To ease the usability for custom API calls, a bunch of base urls were added as properties to this class
(see example below).
Examples:
Using the public methods to send an API call::
from algosec.api_clients.business_flow import BusinessFlowAPIClient
client = BusinessFlowAPIClient(ip, username, password)
application_revision_id = client.get_application_revision_id_by_name("ApplicationName")
Sending a custom API Call::
from algosec.api_clients.business_flow import BusinessFlowAPIClient
client = BusinessFlowAPIClient(ip, username, password)
response = client.session.get(
"{}/name/{}".format(client.applications_base_url, application_name)
)
Args:
server_ip (str): IP address of the AlgoSec server.
user (str): Username used to log in to AlgoSec.
password (str): The user's password, similar to the one used to log in to the UI.
verify_ssl (bool): Turn on/off the connection's SSL certificate verification. Defaults to True.
"""
ABF_APPLICATION_DASHBOARD_URL = "/#application/{}/dashboard"
ASSOCIATED_APPLICATIONS_UI_QUERY = (
"/#applications/query?q=%7B%22addresses%22%3A%5B%7B%22"
"address%22%3A%22{}%22%7D%5D%2C%22devices%22%3A%5B%5D%7D"
)
def _initiate_session(self):
"""Return an authenticated session to the AlgoSec server.
Raises:
AlgoSecLoginError: If login using the username/password failed.
Returns:
requests.session.Session: An authenticated session with the server.
"""
# if afa_sess_id is None, raise UnauthorizedUserException.
if self.afa_sess_id is None:
raise UnauthorizedUserException(LOGIN_FAILED_IMPERSONATION_MSG,
LOGIN_FAILED_IMPERSONATION_DETAILS.format(self.user_email))
session = requests.session()
mount_adapter_on_session(session, self._session_adapter)
url = "{}/rest/v1/login".format(self.business_flow_base_url, self.server_ip)
get_user_name_url = "https://{}/afa/api/v1/session/{}"
logger.debug("logging in to AlgoSec servers: {}".format(url))
session.verify = self.verify_ssl
# initiate PHPSESSID in cookies
cookies = {"PHPSESSID": self.afa_sess_id}
user_name = ''
try:
# if impersonation succeeded try logging as the impersonated user.
if APIClient._impersonation_success:
user_name_response = session.get(get_user_name_url.format(self.server_ip, self.afa_sess_id),
cookies=cookies)
user_name = user_name_response.json().get("user")
login_data = {"afaSessionID": self.afa_sess_id, "afaSessionToken": self.afa_sess_id,
"afaSessionPHP": self.afa_sess_id, "username": user_name}
response = session.post(url, params=login_data)
# impersonation failed but afa_session isn't none so log in as AlgoBot Login User.
elif self.algobot_login_user is not None and self.algobot_login_password is not None:
response = session.get(url, auth=(self.algobot_login_user, self.algobot_login_password))
except Exception:
raise AlgoSecLoginError(
"Unable to login into AlgoSec server at {} with session id {} and username {}."
.format(url, self.afa_sess_id, user_name)
)
if response.status_code == status_codes.codes.OK:
return session
else:
raise AlgoSecLoginError(
"Unable to login into AlgoSec server at {}. HTTP Code: {}".format(
url, response.status_code
)
)
@property
def business_flow_base_url(self):
"""str: Return the base url for BusinessFlow."""
return "https://{}/BusinessFlow".format(self.server_ip)
@property
def api_base_url(self):
"""str: Return the base url for all API calls."""
return "{}/rest/v1".format(self.business_flow_base_url, self.server_ip)
@property
def applications_base_url(self):
"""str: Return the base url for all application related API calls."""
return "{}/applications".format(self.api_base_url)
@property
def network_objects_base_url(self):
"""str: Return the base url for all objects related API calls."""
return "{}/network_objects".format(self.api_base_url)
@property
def network_services_base_url(self):
"""str: Return the base url for all services related API calls."""
return "{}/network_services".format(self.api_base_url)
[docs] def get_network_service_by_name(self, service_name):
"""Get a network service object by its name.
Args:
service_name (str): The name of the service.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If no such network service could be found by name.
Returns:
dict: NetworkObject as defined on the API Guide.
"""
response = self.session.get(
"{}/service_name/{}".format(
self.network_services_base_url, quote_plus(service_name)
)
)
self._check_api_response(response)
return response.json()
[docs] def create_network_service(self, service_name, content, custom_fields=None):
"""Create a network service.
Args:
service_name (str): The service object's service_name
content (list[(str,int)]): List of (port, proto) pairs defining the services
custom_fields: The custom fields to include for the object.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If network service creation failed.
Returns:
dict: The created NetworkService object as defined in the API Guide.
"""
custom_fields = [] if custom_fields is None else custom_fields
content = [{"protocol": service[0], "port": service[1]} for service in content]
response = self.session.post(
"{}/new".format(self.network_services_base_url),
json=dict(name=service_name, content=content, custom_fields=custom_fields),
)
self._check_api_response(response)
return response.json()
[docs] def get_application_by_name(self, app_name):
"""Return the latest revision of an application by its name.
Args:
app_name (str): The application name to look for.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If no application matching the given name was found.
Returns:
dict: Json of the latest application revision.
"""
app_by_name_url = "{}/name/{}".format(self.applications_base_url, app_name)
logger.debug(self._api_info_string.format(
"Application Name",
app_by_name_url,
app_name,
))
response = self.session.get(
app_by_name_url
)
logger.debug("{}:\n{}".format(response, response.json()) or API_CALL_FAILED_RESPONSE)
try:
self._check_api_response(response)
except AlgoSecAPIError as e:
if response.status_code == 401:
raise UnauthorizedUserException(PERMISSION_ERROR_MSG, APP_UNAUTHORIZED.format(
PERMISSION_ERROR_MSG, self.user_email, response.status_code))
raise e
return response.json()
[docs] def get_application_revision_id_by_name(self, app_name):
"""Return the latest revision id of an application by its name.
Args:
app_name (str): The application name to look for.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If no application matching the given name was found.
Returns:
int: The latest application revision ID.
"""
return self.get_application_by_name(app_name)["revisionID"]
[docs] def search_network_objects(self, ip_or_subnet, search_type):
"""Return network objects related to a given IP or subnet.
Args:
ip_or_subnet (str): The IP address or hostname of the object, or a subnet. (e.g: 192.1.1.1, 192.168.0.0/16)
search_type (algosec.models.NetworkObjectSearchTypes): The enum for search type to perform.
Could be one of :
* *INTERSECT* - Search objects which their definition intersect with the given IP or subnet.
* *CONTAINED* - Search for objects which the given IP or subnet is contained in.
* *CONTAINING* - Search for objects contained within the given IP or subnet.
* *EXACT* - Search the object which is defined exactly by (and only by) the given IP or subnet.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If an error occurred during the object search.
Returns:
list[dict]: List of network objects matching the given obj and search type.
Each of the objects is a NetworkObject as defined in the API Guide.
"""
response = self.session.get(
"{}/find".format(self.network_objects_base_url),
params=dict(address=ip_or_subnet, type=search_type.value),
)
self._check_api_response(response)
# TODO: This check is being performed as currently the ABF api return weird response when no objects found
# TODO: Should be removed once the API is fixed to return an empty list when no object are found
if not isinstance(response.json(), list):
logger.warning(
"search_network_objects: unsupported api response. Return empty result. (reponse: {})".format(
response.json()
)
)
return []
return response.json()
[docs] def get_network_object_by_name(self, object_name):
"""Return a network object by its name.
Args:
object_name (str): The object name to be searched.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If no network object matching the given name could be found.
Returns:
dict: The NetworkObject object matching the name lookup.
"""
response = self.session.get(
"{}/name/{}".format(self.network_objects_base_url, object_name)
)
self._check_api_response(response)
result = response.json()
if isinstance(result, dict):
return result
elif isinstance(result, list) and len(result) == 1:
# TODO: Currently there is a bug in the API that returns a list of one object instead of the object itself
return result[0]
else:
raise AlgoSecAPIError(
"Unable to get one network object by name. Server response was: {}".format(
result
)
)
[docs] def create_network_object(self, type, content, name):
"""Create a new network object.
Args:
type (algosec.models.NetworkObjectType): The network object type
content (str|list): Define the newly created network object. Content depend upon the selected type:
- :class:`~algosec.models.NetworkObjectType.HOST`: Content is the IP address of the object.
- :class:`~algosec.models.NetworkObjectType.RANGE`: Content is IP range or CIDR.
- :class:`~algosec.models.NetworkObjectType.GROUP`: Content is a list of *ExistingNetworkObject* or
*NewNetworkObject* objects as defined in the API Guide.
- :class:`~algosec.models.NetworkObjectType.ABSTRACT`: Content is None or an empty string.
name (str): Name of the new network object
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If the network object creation failed.
Returns:
dict: The newly created ExistingNetworkObject object.
"""
response = self.session.post(
"{}/new".format(self.network_objects_base_url),
json=dict(type=type.value, name=name, content=content),
)
self._check_api_response(response)
return response.json()
[docs] def create_missing_network_objects(self, all_network_objects):
"""Create network objects if they are not already defined on the server.
Args:
all_network_objects (collections.Iterable[str]): List of the network objects to create if
missing from the server.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If the one of the network objects creation failed.
Returns:
list[dict]: List of the created network objects.
Note:
If one of the given objects is not a valid IP address or subnet string, the object won't be created.
"""
# Calculate which network objects we need to create before creating the flow
objects_missing_from_algosec = []
for obj in all_network_objects:
if not is_ip_or_subnet(obj):
continue
search_objects = self.search_network_objects(
obj, NetworkObjectSearchTypes.EXACT
)
if search_objects:
# EXACT object search is by content, not by name.
# Therefore, we make check if the exact object name was found
# Even if the object exists under a different name, we want to make sure it is
# marked for re-creation here.
object_names = [
search_object.get("name") for search_object in search_objects
]
if obj not in object_names:
objects_missing_from_algosec.append(obj)
else:
# No object was found, mark for creation
objects_missing_from_algosec.append(obj)
return [
self.create_network_object(NetworkObjectType.HOST, obj, obj)
for obj in objects_missing_from_algosec
]
[docs] def get_flow_by_name(self, app_revision_id, flow_name):
"""Return application flow by its name
Args:
app_revision_id (int|str): The application revision ID to fetch the flow from.
flow_name (str): The name of the flow to fetch.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If fetching the full list of flows for the application
revision failed
:class:`~algosec.errors.EmptyFlowSearch`: If no flow matching that name could be found
Returns:
dict: Flow object as defined in the API Guide.
"""
for flow in self.get_application_flows(app_revision_id):
if flow["name"] == flow_name:
return flow
raise EmptyFlowSearch("Unable to locate flow ID by name: {}".format(flow_name))
[docs] def delete_flow_by_id(self, app_revision_id, flow_id):
"""Delete an application flow given its id.
Args:
app_revision_id (int|str): The revision ID of the application to delete the flow from.
flow_id (int|str): The ID of the flow to delete.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If the flow deletion failed.
Returns:
None
"""
response = self.session.delete(
"{}/{}/flows/{}".format(
self.applications_base_url, app_revision_id, flow_id
)
)
self._check_api_response(response)
[docs] def delete_flow_by_name(self, app_revision_id, flow_name):
"""Delete an application flow given its name.
Args:
app_revision_id (int|str): The revision ID of the application to delete the flow from.
flow_name (str): The name of the flow to delete.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If the flow deletion failed.
:class:`~algosec.errors.EmptyFlowSearch`: If no flow matching that name could be found.
Returns:
None
"""
flow_id = self.get_flow_by_name(app_revision_id, flow_name)["flowID"]
self.delete_flow_by_id(app_revision_id, flow_id)
[docs] def get_application_flows(self, app_revision_id):
"""Return all flows of the application revision.
Note:
Only flows with ``flowType`` of ``APPLICATION_FLOW`` are returned.
The rest of the flows (e.g shared flows) are filtered out.
Args:
app_revision_id (str|int): The ID of the application revision to fetch the flows for
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If application flows list could not be fetched.
Returns:
list[dict]: List of Flow objects as defined in the API Guide.
"""
response = self.session.get(
"{}/{}/flows".format(self.applications_base_url, app_revision_id)
)
self._check_api_response(response)
return [
flow for flow in response.json() if flow["flowType"] == "APPLICATION_FLOW"
]
[docs] def get_flow_connectivity(self, app_revision_id, flow_id):
"""Return a flow connectivity object for a flow given its ID.
Args:
app_revision_id (int|str): The ID of the application revision to lookup the flow in.
flow_id (int|str): The ID of the flow to fetch ``FlowConnectivity`` for.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If error occurred while fetching the flow connectivity object.
Returns:
dict: FlowConnectivity object as defined in the API Guide.
"""
response = self.session.post(
"{}/{}/flows/{}/check_connectivity".format(
self.applications_base_url, app_revision_id, flow_id
)
)
self._check_api_response(response)
return response.json()
[docs] def create_application_flow(self, app_revision_id, requested_flow):
"""Create an application flow.
Creates network services that were defined in the flow but
are not currently exist on ABF.
Args:
app_revision_id (int): The application revision id as defined on ABF to create this flow on
requested_flow(algosec.models.RequestedFlow): The flow to be created
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If application flow creation failed.
Returns:
dict: An Application object as defined in the API Guide.
"""
all_network_objects = set(requested_flow.destinations + requested_flow.sources)
self.create_missing_network_objects(all_network_objects)
response = self.session.post(
"{}/{}/flows/new".format(self.applications_base_url, app_revision_id),
# We send a list since the API is looking for a list on NewFlows
json=[requested_flow.get_json_flow_definition()],
)
self._check_api_response(response)
return response.json()[0]
[docs] def apply_application_draft(self, app_revision_id):
"""Apply an application draft and automatically create a FireFlow change request.
Args:
app_revision_id (int|str): The revision ID of the application to apply the draft for.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If error occurred while trying to apply the application draft.
Returns:
requests.models.Response: The API call response.
"""
response = self.session.post(
"{}/{}/apply".format(self.applications_base_url, app_revision_id)
)
self._check_api_response(response)
[docs] def get_abf_application_dashboard_url(self, application_revision_id):
"""
Return URL for the application dashboard.
This is the applications's dashboard on AlgoSec BusinessFlow and it can be viewed in the browser.
Args:
application_revision_id: The application revision ID to return the dashboard URL for.
Returns:
str: URL for the application dashboard on the AlgoSec BusinessFlow. An Example would look like that:
https://10.0.0.12/BusinessFlow/#application/293/dashboard
"""
return self.business_flow_base_url + self.ABF_APPLICATION_DASHBOARD_URL.format(
application_revision_id
)
[docs] def get_associated_applications_ui_query(self, queried_ip_address):
"""
Return URL that can be used in the browser to view the associated applications query.
Args:
queried_ip_address: The IP address we wish to find associated applications for.
Returns:
str: URL for ssociated applications query that can be viewed in the browser.
"""
return (
self.business_flow_base_url
+ self.ASSOCIATED_APPLICATIONS_UI_QUERY.format(queried_ip_address)
)
[docs] @staticmethod
def is_application_critical(application_json):
"""
Return True if the application's json has the critical label set.
Args:
application_json: The application Json as returned from AlgoSec BusinessFlow APIs.
Returns:
bool: True if the application is marked as a critical application
"""
return any(
label["name"] == "Critical" for label in application_json.get("labels", [])
)
[docs] def get_associated_applications(self, network_item):
"""
Return all applications containing network objects related to IP addresses.
Args:
network_item (str): The network address or network object to search associated applications for
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If error occurred while trying to fetch associated applications.
Returns:
list: List of dictionaries each representing an associated application.
"""
app_status_url = "{}/find/applications?address={}".format(
self.network_objects_base_url, network_item
)
if IPHelper.is_network_address(network_item):
logger.debug(self._api_info_string.format(
"Associated Applications",
app_status_url,
network_item
))
response = self.session.get(
app_status_url
)
logger.debug("{}:\n{}".format(response,
response.json()) or API_CALL_FAILED_RESPONSE)
else:
network_object_by_name_url = "{}/name/{}".format(
self.network_objects_base_url, network_item
)
logger.debug(self._api_info_string.format(
"Associated Applications",
network_object_by_name_url,
network_item
))
network_object_by_name_response = self.session.get(network_object_by_name_url)
logger.debug("{}:\n{}".format(network_object_by_name_response, network_object_by_name_response.json()) or API_CALL_FAILED_RESPONSE)
self._check_api_response(network_object_by_name_response)
nw_id = network_object_by_name_response.json()[0]['revisionID']
applications_by_id = "{}/{}/applications".format(self.network_objects_base_url, nw_id)
logger.debug(self._api_info_string.format(
"Associated Applications",
app_status_url,
nw_id
))
response = self.session.get(applications_by_id)
logger.debug("{}:\n{}".format(response, response.json()) or API_CALL_FAILED_RESPONSE)
self._check_api_response(response)
return response.json()