"""SOAP API client for AlgoSec **FireFlow**.
Examples:
Once initiated, the client is used by calling any of its public functions::
from algosec.api_clients.fire_flow import FireFlowAPIClient
client = FireFlowAPIClient(ip, username, password)
change_request = get_change_request_by_id(change_request_id)
If the API call you were looking for is not yet implemented, you can send authenticated custom API call
to the server using the client's ``session`` property.
Please see specific API Client documentations to find out how.
"""
import logging
import requests
import six.moves.urllib as urllib
from algosec.api_clients.base import SoapAPIClient, APIClient
from algosec.errors import AlgoSecLoginError, AlgoSecAPIError, UnauthorizedUserException
from algosec.helpers import report_soap_failure
from algosec.constants import *
from zeep.exceptions import Fault
logger = logging.getLogger(__name__)
[docs]class FireFlowAPIClient(SoapAPIClient):
"""*FireFlow* SOAP API client.
Used by initiating and calling its public methods or by sending custom calls using the ``client`` property.
Client implementation is strictly based on AlgoSec's official API guide.
Example:
Using the public methods to send an API call::
from algosec.api_clients.fire_flow import FireFlowAPIClient
client = FireFlowAPIClient(ip, username, password)
change_request = client.get_change_request_by_id(change_request_id)
Args:
server_ip (str): IP address of the AlgoSec server.
user (str): Username used to log in to AlgoSec.
password (str): The user's password, similar to the one used to log in to the UI.
verify_ssl (bool): Turn on/off the connection's SSL certificate verification. Defaults to True.
"""
@property
def _wsdl_url_path(self):
return 'https://{}/WebServices/FireFlow.wsdl'.format(self.server_ip)
# might be useful to create another zeep ProxyService as done in afa client.
@property
def _soap_service_location(self):
return 'https://{}/WebServices/WSDispatcher.pl'.format(self.server_ip)
# default ffwsheader to avoid zeep exceptions where header is required.
@property
def _default_ffwsheader(self):
return {"version":"", "opaque":""}
@property
def _users_list_url(self):
return 'https://{}/FireFlow/REST/1.0/search/users?hide_privileged=0&get_extra_info=1'.format(self.server_ip)
def _initiate_client(self):
"""Return a connected zeep client and save the new session id to ``self._session_id``
Raises:
AlgoSecLoginError: If login using the username/password failed.
Returns:
zeep.Client
"""
self.algobot_login_user_defined = False
client = self._get_soap_client(self._wsdl_url_path, location=self._soap_service_location)
with report_soap_failure(AlgoSecLoginError):
authenticate = client.service.authenticate(
FFWSHeader=self._default_ffwsheader,
username=self.user,
password=self.password,
)
try:
if self.algobot_login_user is not None and self.algobot_login_password is not None:
client.service.authenticate(
FFWSHeader=self._default_ffwsheader,
username=self.algobot_login_user,
password=self.algobot_login_password,
)
self.algobot_login_user_defined = True
logger.debug("AlgoBot login user successfully logged in to FireFlow")
except Fault as e:
logger.debug("AlgoBot login user failed to login to FireFlow")
self._session_id = authenticate.sessionId
return client
def _create_soap_traffic_line(self, traffic_line):
"""
Create new FireFlow traffic line based on TrafficLine object.
Args:
traffic_line (algosec.models.ChangeRequestTrafficLine): The traffic line to create.
Returns: Soap traffic line object
"""
factory = self.client.type_factory('ns0')
soap_traffic_line = factory.trafficLine()
soap_traffic_line.action = traffic_line.action.value.api_value
for source in traffic_line.sources:
traffic_address = factory.trafficAddress()
traffic_address.address = source
soap_traffic_line.trafficSource.append(traffic_address)
for dest in traffic_line.destinations:
traffic_address = factory.trafficAddress()
traffic_address.address = dest
soap_traffic_line.trafficDestination.append(traffic_address)
for service in traffic_line.services:
traffic_service = factory.trafficService()
traffic_service.service = service
soap_traffic_line.trafficService.append(traffic_service)
if traffic_line.applications:
for application_name in traffic_line.applications:
traffic_application = factory.trafficApplication()
traffic_application.application = application_name
soap_traffic_line.trafficApplication.append(traffic_application)
return soap_traffic_line
[docs] def create_change_request(
self,
subject,
requestor_name,
email,
traffic_lines,
description="",
template = DEFAULT_TICKET_TEMPLATE,
):
"""Create a new change request.
Args:
subject (str): The ticket subject, will be shown on FireFlow.
requestor_name (str): The ticket creator name, will be shown on FireFlow.
email (str): The email address of the requestor.
traffic_lines (list[algosec.models.ChangeRequestTrafficLine]): List of traffic lines each describing its
sources, destinations and services.
description (str): description for the ticket, will be shown on FireFlow.
template (str): When different than None, this template will be passed on to FireFlow to be used
as the template for the new change requets.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If change request creation failed.
Returns:
str: The URL for the newley create change request on FireFlow
"""
# Create ticket and traffic lines objects
factory = self.client.type_factory('ns0')
ticket = factory.ticket()
ticket.description = description
ticket.requestor = '{} {}'.format(requestor_name, email)
ticket.subject = subject
ticket.template = DEFAULT_TICKET_TEMPLATE if template is None else template
for traffic_line in traffic_lines:
ticket.trafficLines.append(self._create_soap_traffic_line(traffic_line))
logger.debug(self._api_info_string.format(
"Create Change Request",
self._wsdl_url_path + " op_name: createTicket",
ticket,
))
# Actually create the ticket
with report_soap_failure(AlgoSecAPIError):
ticket_added = self.client.service.createTicket(FFWSHeader=self._default_ffwsheader, sessionId=self._session_id, ticket=ticket)
logger.debug("response: {}".format(ticket_added or API_CALL_FAILED_RESPONSE))
ticket_url = ticket_added.ticketDisplayURL
# normalize ticket url hostname that is sometimes incorrect from the FireFlow server (which uses it's own
# internal IP to build this url.
url = list(urllib.parse.urlsplit(ticket_url))
url[1] = self.server_ip
return urllib.parse.urlunsplit(url)
[docs] def get_change_request_by_id(self, change_request_id):
"""Get a change request by its ID.
Useful for checking the status of a change request you opened through the API.
Args:
change_request_id: The ID of the change request to fetch.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If the change request was not found on the server or another
error occurred while fetching the change request.
Returns:
The change request ticket object.
"""
logger.debug(self._api_info_string.format(
"Change Request Status",
self._wsdl_url_path + " op_name: getTicket",
change_request_id,
))
with report_soap_failure(AlgoSecAPIError):
response = self.client.service.getTicket(FFWSHeader=self._default_ffwsheader,
sessionId=self._session_id, ticketId=change_request_id)
if self.user_email and response.ticket.requestorEmail and self.user_email != response.ticket.requestorEmail:
user_email = self.user_email or PLACEHOLDER_EMAIL
ticket_requestor = response.ticket.requestor or ""
ticket_requestor_email = response.ticket.requestorEmail or ""
# get the list of fireflow users.
cookie = {FIREFLOW_COOKIE_NAME: self._session_id}
r = requests.get(self._users_list_url, cookies=cookie, verify=False)
user_lines = r.text.splitlines()
headers = user_lines[0].replace('"', "").split(',')
full_name_index = headers.index('FullName')
email_index = headers.index('Email')
is_privileged_index = headers.index('isPrivileged')
username_index = headers.index('UserName')
logger.debug("Current user email is " + user_email + " and ticket requestor is " + ticket_requestor)
possible_emails = []
# go thorugh all users. (start from index 1 since first line is headers.)
for user_line in user_lines[1:len(user_lines)]:
delimited_line = user_line.replace('"', "").split(',')
if (
len(delimited_line) == len(headers) and
(delimited_line[full_name_index] == ticket_requestor
or delimited_line[email_index] == ticket_requestor
or delimited_line[username_index] == ticket_requestor
or delimited_line[is_privileged_index])
):
possible_emails.append(delimited_line[email_index])
logger.debug("Possible users: " + str(possible_emails))
if user_email not in possible_emails:
# if there is no algobot user defined in configuration file raise Unauthorized exception.
if not self.algobot_login_user_defined:
raise UnauthorizedUserException(PERMISSION_ERROR_MSG,
GET_TICKET_WRONG_REQUESTOR.format(PERMISSION_ERROR_MSG,
ticket_requestor_email,
user_email))
logger.debug("response: {}".format(response or API_CALL_FAILED_RESPONSE))
return response.ticket