"""SOAP API client for AlgoSec **FireFlow**.
Examples:
Once initiated, the client is used by calling any of its public functions::
from algosec.api_clients.fire_flow import FireFlowAPIClient
client = FireFlowAPIClient(ip, username, password)
change_request = get_change_request_by_id(change_request_id)
If the API call you were looking for is not yet implemented, you can send authenticated custom API call
to the server using the client's ``session`` property.
Please see specific API Client documentations to find out how.
"""
import logging
import six.moves.urllib as urllib
from algosec.api_clients.base import SoapAPIClient
from algosec.errors import AlgoSecLoginError, AlgoSecAPIError
from algosec.helpers import report_soap_failure
logger = logging.getLogger(__name__)
[docs]class FireFlowAPIClient(SoapAPIClient):
"""*FireFlow* SOAP API client.
Used by initiating and calling its public methods or by sending custom calls using the ``client`` property.
Client implementation is strictly based on AlgoSec's official API guide.
Example:
Using the public methods to send an API call::
from algosec.api_clients.fire_flow import FireFlowAPIClient
client = FireFlowAPIClient(ip, username, password)
change_request = client.get_change_request_by_id(change_request_id)
Args:
server_ip (str): IP address of the AlgoSec server.
user (str): Username used to log in to AlgoSec.
password (str): The user's password, similar to the one used to log in to the UI.
verify_ssl (bool): Turn on/off the connection's SSL certificate verification. Defaults to True.
"""
@property
def _wsdl_url_path(self):
return 'https://{}/WebServices/FireFlow.wsdl'.format(self.server_ip)
@property
def _soap_service_location(self):
return 'https://{}/WebServices/WSDispatcher.pl'.format(self.server_ip)
def _initiate_client(self):
"""Return a connected suds client and save the new session id to ``self._session_id``
Raises:
AlgoSecLoginError: If login using the username/password failed.
Returns:
suds.client.Client
"""
client = self._get_soap_client(self._wsdl_url_path, location=self._soap_service_location)
with report_soap_failure(AlgoSecLoginError):
authenticate = client.service.authenticate(
username=self.user,
password=self.password,
)
self._session_id = authenticate.sessionId
return client
def _create_soap_traffic_line(self, traffic_line):
"""
Create new FireFlow traffic line based on TrafficLine object.
Args:
traffic_line (algosec.models.ChangeRequestTrafficLine): The traffic line to create.
Returns: Soap traffic line object
"""
soap_traffic_line = self.client.factory.create('trafficLine')
soap_traffic_line.action = traffic_line.action.value.api_value
for source in traffic_line.sources:
traffic_address = self.client.factory.create('trafficAddress')
traffic_address.address = source
soap_traffic_line.trafficSource.append(traffic_address)
for dest in traffic_line.destinations:
traffic_address = self.client.factory.create('trafficAddress')
traffic_address.address = dest
soap_traffic_line.trafficDestination.append(traffic_address)
for service in traffic_line.services:
traffic_service = self.client.factory.create('trafficService')
traffic_service.service = service
soap_traffic_line.trafficService.append(traffic_service)
if traffic_line.applications:
for application_name in traffic_line.applications:
traffic_application = self.client.factory.create('trafficApplication')
traffic_application.application = application_name
soap_traffic_line.trafficApplication.append(traffic_application)
return soap_traffic_line
[docs] def create_change_request(
self,
subject,
requestor_name,
email,
traffic_lines,
description="",
template=None,
):
"""Create a new change request.
Args:
subject (str): The ticket subject, will be shown on FireFlow.
requestor_name (str): The ticket creator name, will be shown on FireFlow.
email (str): The email address of the requestor.
traffic_lines (list[algosec.models.ChangeRequestTrafficLine]): List of traffic lines each describing its
sources, destinations and services.
description (str): description for the ticket, will be shown on FireFlow.
template (str): When different than None, this template will be passed on to FireFlow to be used
as the template for the new change requets.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If change request creation failed.
Returns:
str: The URL for the newley create change request on FireFlow
"""
# Create ticket and traffic lines objects
ticket = self.client.factory.create('ticket')
ticket.description = description
ticket.requestor = '{} {}'.format(requestor_name, email)
ticket.subject = subject
if template is not None:
ticket.template = template
for traffic_line in traffic_lines:
ticket.trafficLines.append(self._create_soap_traffic_line(traffic_line))
# Actually create the ticket
with report_soap_failure(AlgoSecAPIError):
ticket_added = self.client.service.createTicket(sessionId=self._session_id, ticket=ticket)
ticket_url = ticket_added.ticketDisplayURL
# normalize ticket url hostname that is sometimes incorrect from the FireFlow server (which uses it's own
# internal IP to build this url.
url = list(urllib.parse.urlsplit(ticket_url))
url[1] = self.server_ip
return urllib.parse.urlunsplit(url)
[docs] def get_change_request_by_id(self, change_request_id):
"""Get a change request by its ID.
Useful for checking the status of a change request you opened through the API.
Args:
change_request_id: The ID of the change request to fetch.
Raises:
:class:`~algosec.errors.AlgoSecAPIError`: If the change request was not found on the server or another
error occurred while fetching the change request.
Returns:
The change request ticket object.
"""
with report_soap_failure(AlgoSecAPIError):
response = self.client.service.getTicket(sessionId=self._session_id, ticketId=change_request_id)
return response.ticket