Changeset - 49f356923784
[Not reviewed]
Merge default
0 8 0
Branko Majic (branko) - 11 years ago 2013-07-30 21:04:18
branko@majic.rs
CONNT-10: Merging the implementation.
8 files changed with 434 insertions and 4 deletions:
0 comments (0 inline, 0 general)
.hgignore
Show inline comments
 
@@ -4,3 +4,4 @@ syntax: glob
 
projtest/projtest.db
 
projtest/south
 
docs/_build
 
tmp/
conntrackt/templates/conntrackt/project_detail.html
Show inline comments
 
@@ -37,6 +37,12 @@
 
  </div>
 
  {% endfor %}
 
</div>
 
<div class="row">
 
  <div class="span12">
 
    <h2>Communications diagram</h2>
 
    <img src="{% url "project_diagram" project.id %}" width="100%">
 
  </div>
 
</div>
 
{% endif %}
 
{% endblock %}
 

	
conntrackt/tests/test_utils.py
Show inline comments
 
# Django imports.
 
from django.test import TestCase
 

	
 
# Third-party Python library imports.
 
import palette
 
import pydot
 

	
 
# Application imports.
 
from conntrackt.models import Entity
 
from conntrackt.models import Entity, Project, Communication
 
from conntrackt import utils
 

	
 

	
 
@@ -47,3 +51,204 @@ COMMIT
 
COMMIT
 
"""
 
        self.assertEqual(generated, expected)
 

	
 

	
 
class GetDistinctColorsTest(TestCase):
 
    """
 
    Tests covering the get_distinct_colors function.
 
    """
 

	
 
    def test_count(self):
 
        """
 
        Tests if correct number of distinct colours are returned.
 
        """
 

	
 
        colors = utils.get_distinct_colors(13)
 

	
 
        self.assertEqual(len(colors), 13)
 

	
 
        colors = utils.get_distinct_colors(123)
 

	
 
        self.assertEqual(len(colors), 123)
 

	
 
    def test_start(self):
 
        """
 
        Tests if the passed start colour is returned as part of generated
 
        colours.
 
        """
 

	
 
        start = palette.Color("#AA3311")
 

	
 
        colors = utils.get_distinct_colors(10, start)
 

	
 
        self.assertEqual(start.hex, colors[0].hex)
 

	
 
    def test_color_distance(self):
 
        """
 
        Tests if the generated colous all have proper distance between
 
        each-other.
 
        """
 

	
 
        colors = utils.get_distinct_colors(13)
 

	
 
        # Set allowed margin of difference to 0.1%
 
        delta = (1 / 13.) * 0.001
 

	
 
        # Calculate diffs between colours.
 
        diffs = [colors[i + 1].hsl["h"] - colors[i].hsl["h"] for i in range(12)]
 

	
 
        # Take first diff as reference point.
 
        reference = diffs[0]
 

	
 
        # Create list that contains True/False for diffs depending on whether
 
        # they're in delta-surrounding of reference point.
 
        equal = [(abs(diff - reference) < delta) for diff in diffs]
 

	
 
        # There should be 12 True values.
 
        self.assertEqual(equal.count(True), 12)
 

	
 
        # Check the difference between first and last colour.
 
        equal = abs(colors[0].hsl["h"] + 1 - colors[12].hsl["h"] - reference) < delta
 
        self.assertEqual(True, equal)
 

	
 

	
 
class GenerateProjectDiagramTest(TestCase):
 
    """
 
    Tests the generate_project_diagram function.
 
    """
 

	
 
    fixtures = ["test-data.json"]
 

	
 
    def test_unique_entity_colors(self):
 
        """
 
        Tests if each node/entity in the graph will have a unique colour.
 
        """
 

	
 
        # Get diagram for project
 
        project = Project.objects.get(pk=1)
 
        diagram = utils.generate_project_diagram(project)
 

	
 
        # Extract all nodes
 
        clusters = diagram.get_subgraphs()
 
        nodes = []
 
        for cluster in clusters:
 
            nodes.extend(cluster.get_nodes())
 

	
 
        # Get the node colours.
 
        colors = [n.get_color() for n in nodes]
 

	
 
        # Verify they're all unique colours.
 
        self.assertEqual(len(colors), len(set(colors)))
 

	
 
    def test_edge_colours(self):
 
        """
 
        Tests if the edge colours match with source node/entity colour.
 
        """
 

	
 
        # Get diagram for project
 
        project = Project.objects.get(pk=1)
 
        diagram = utils.generate_project_diagram(project)
 

	
 
        # Extract all nodes and edges.
 
        clusters = diagram.get_subgraphs()
 
        nodes = {}
 
        for cluster in clusters:
 
            for node in cluster.get_nodes():
 
                nodes[node.get_name()] = node
 
        edges = diagram.get_edges()
 

	
 
        # Validate that edges have same colour as the source nodes.
 
        for edge in edges:
 
            self.assertEqual(nodes[edge.get_source()].get_color(), edge.get_color())
 

	
 
    def test_entities_present(self):
 
        """
 
        Tests if all (and only) specific project entities are in the graph.
 
        """
 

	
 
        # Get diagram for project
 
        project = Project.objects.get(pk=1)
 
        diagram = utils.generate_project_diagram(project)
 

	
 
        # Set-up expected node names.
 
        expected_node_names = ["Test Entity 1", "Test Entity 2", "Test Entity 3", "Test Subnet"]
 

	
 
        # Get all nodes from diagram.
 
        clusters = diagram.get_subgraphs()
 
        nodes = []
 
        for cluster in clusters:
 
            nodes.extend(cluster.get_nodes())
 

	
 
        # Get the node names, strip the quotes from them.
 
        node_names = [n.get_name().replace('"', '') for n in nodes]
 

	
 
        # Validate that the two lists contain same elements.
 
        self.assertEqual(sorted(expected_node_names), sorted(node_names))
 

	
 
    def test_communications_present(self):
 
        """
 
        Tests if all (and only) specific project communications are in the
 
        graph.
 
        """
 

	
 
        # Get diagram for project
 
        project = Project.objects.get(pk=1)
 
        diagram = utils.generate_project_diagram(project)
 

	
 
        # Get all edges from the diagram.
 
        edges = diagram.get_edges()
 

	
 
        # Create list of edge labels.
 
        edge_labels = ["%s -> %s (%s)" % (e.get_source().replace('"', ''),
 
                                           e.get_destination().replace('"', ''),
 
                                           e.get_label().replace('"', '')) for e in edges]
 

	
 
        # Create list of expected edge labels
 
        expected_edge_labels = ['Test Entity 1 -> Test Entity 2 (UDP:123)', 'Test Entity 1 -> Test Entity 3 (UDP:53)',
 
                          'Test Entity 2 -> Test Entity 1 (ICMP:8)', 'Test Entity 2 -> Test Entity 1 (TCP:22)',
 
                          'Test Entity 3 -> Test Entity 1 (TCP:3306)', 'Test Subnet -> Test Entity 1 (TCP:22)']
 

	
 
        self.assertEqual(sorted(expected_edge_labels), sorted(edge_labels))
 

	
 
    def test_locations_present(self):
 
        """
 
        Tests if all (and only) specific project locations are in the graph (as
 
        clusters).
 
        """
 

	
 
        # Get diagram for project.
 
        project = Project.objects.get(pk=1)
 
        diagram = utils.generate_project_diagram(project)
 

	
 
        # Set-up expected cluster names (based on locations).
 
        expected_cluster_names = ["cluster_test_location_1", "cluster_test_location_2"]
 

	
 
        # Get cluster names.
 
        cluster_names = [s.get_name() for s in diagram.get_subgraphs()]
 

	
 
        self.assertEqual(sorted(expected_cluster_names), sorted(cluster_names))
 

	
 
    def test_return_type(self):
 
        """
 
        Tests if a correct object type is returned.
 
        """
 

	
 
        # Get diagram for project.
 
        project = Project.objects.get(pk=1)
 
        diagram = utils.generate_project_diagram(project)
 

	
 
        self.assertEqual(type(diagram), pydot.Dot)
 

	
 
    def test_graph_properties(self):
 
        """
 
        Tests if graph properties have been set-up properly.
 
        """
 

	
 
        # Get diagram for project.
 
        project = Project.objects.get(pk=1)
 
        diagram = utils.generate_project_diagram(project)
 

	
 
        self.assertEqual("digraph", diagram.get_graph_type())
 
        self.assertEqual("transparent", diagram.get_bgcolor())
 
        self.assertEqual("1.5", diagram.get_nodesep())
 
        self.assertEqual([{"shape": "record"}], diagram.get_node_defaults())
conntrackt/tests/test_views.py
Show inline comments
 
@@ -15,7 +15,7 @@ from django.test import TestCase
 
from conntrackt.models import Project, Location, Entity, Interface, Communication
 

	
 
from conntrackt.views import IndexView
 
from conntrackt.views import entity_iptables, project_iptables
 
from conntrackt.views import entity_iptables, project_iptables, project_diagram
 

	
 
from conntrackt.views import ProjectView, ProjectCreateView, ProjectUpdateView, ProjectDeleteView
 
from conntrackt.views import LocationCreateView, LocationUpdateView, LocationDeleteView
 
@@ -1332,3 +1332,53 @@ class CommunicationDeleteViewTest(Permis
 
        response = view(request, pk=1)
 

	
 
        self.assertEqual(response["Location"], reverse("entity", args=(communication.source.entity.pk,)))
 

	
 

	
 
class ProjectDiagramTest(PermissionTestMixin, TestCase):
 

	
 
    fixtures = ['test-data.json']
 

	
 
    view_function = staticmethod(project_diagram)
 
    sufficient_permissions = ("view",)
 
    permission_test_view_kwargs = {"pk": "1"}
 

	
 
    def test_invalid_project(self):
 
        """
 
        Tests if a 404 is returned if no project was found (invalid ID).
 
        """
 

	
 
        # Set-up a request.
 
        request = create_get_request()
 

	
 
        # Get the view.
 
        view = project_diagram
 

	
 
        # Validate the response.
 
        self.assertRaises(Http404, view, request, pk=200)
 

	
 
    def test_content_type(self):
 
        """
 
        Test if correct content type is being returned by the response.
 
        """
 

	
 
        # Get the view.
 
        view = project_diagram
 

	
 
        # Get the response.
 
        response = generate_get_response(view, pk=1)
 

	
 
        self.assertEqual(response['Content-Type'], "image/svg+xml")
 

	
 
    def test_content(self):
 
        """
 
        Tests content produced by the view.
 
        """
 

	
 
        # Get the view.
 
        view = project_diagram
 

	
 
        # Get the response.
 
        response = generate_get_response(view, pk=1)
 

	
 
        self.assertContains(response, '"-//W3C//DTD SVG 1.1//EN"')
 
        self.assertContains(response, "Test Project 1")
conntrackt/urls.py
Show inline comments
 
@@ -3,7 +3,7 @@ from django.conf.urls import patterns, u
 
from django.contrib.auth.views import login, logout
 

	
 
# Application imports.
 
from .views import IndexView, EntityView, entity_iptables, project_iptables
 
from .views import IndexView, EntityView, entity_iptables, project_iptables, project_diagram
 
from .views import ProjectView, ProjectCreateView, ProjectUpdateView, ProjectDeleteView
 
from .views import LocationCreateView, LocationUpdateView, LocationDeleteView
 
from .views import EntityCreateView, EntityUpdateView, EntityDeleteView
 
@@ -64,6 +64,9 @@ urlpatterns = patterns(
 
    # View for rendering zip file with iptables rules for all entities in a project for a specific location.
 
    url(r'^project/(?P<project_id>\d+)/location/(?P<location_id>\d+)/iptables/$', project_iptables, name="project_location_iptables"),
 

	
 
    # View for showing project communications in a diagram.
 
    url(r'^project/(?P<pk>\d+)/diagram/$', project_diagram, name="project_diagram"),
 

	
 
    # Views for logging-in/out the users.
 
    url(r'^login/$', login, {'template_name': 'conntrackt/login.html'}, name="login"),
 
    url(r'^logout/$', logout, name="logout"),
conntrackt/utils.py
Show inline comments
 
# Standard library imports.
 
import re
 
import itertools
 

	
 
# Third-party Python library imports.
 
import palette
 
import pydot
 

	
 
# Django imports.
 
from django.template import Context, loader
 

	
 
# Application imports.
 
import iptables
 
from .models import Communication
 

	
 

	
 
def generate_entity_iptables(entity):
 
@@ -52,3 +58,125 @@ def generate_entity_iptables(entity):
 
    content = "%s%s" % (filter, nat)
 

	
 
    return content
 

	
 

	
 
def get_distinct_colors(count, start=palette.Color("#AE1111")):
 
    """
 
    Generates a number of distinct colours, and returns them as a list. The
 
    colours are generated using the HSL (hue, saturation, lightness) model,
 
    where saturation and lightness is kept the same for all colours, with
 
    differing hue. The hue difference between each subsequent color in the list
 
    is kept the same.
 

	
 
    Arguments:
 

	
 
        count - Total number of colours that should be generated.
 

	
 
        start - First colour that should be taken as a start point. All colours
 
        are generated relative to this colour by increasing the hue. Should be
 
        an instance of palette.Color class. Defaults to RGB colour "#AE1111".
 

	
 
    Return:
 

	
 
        List of distinct palette.Color instances.
 
    """
 

	
 
    # Read the HSL from provided Color.
 
    hue, sat, lum = start.hsl["h"], start.hsl["s"], start.hsl["l"]
 

	
 
    # Calculate the step increase.
 
    step = 1 / float(count)
 

	
 
    # Initiate an empty list that will store the generated colours.
 
    colors = []
 

	
 
    # Generate new colour by increasing the hue as long as we haven't generated
 
    # the requested number of colours.
 
    while len(colors) < count:
 
        colors.append(palette.Color(hsl=(hue, sat, lum)))
 
        hue += step
 

	
 
    return colors
 

	
 

	
 
def generate_project_diagram(project):
 
    """
 
    Generates communication diagram for provided project.
 

	
 
    Arguments:
 

	
 
        project - Project for which the diagram should be generated. Instance of
 
        conntrackt.models.Project class.
 

	
 
    Returns:
 

	
 
        Dot diagram (digraph) representing all of the communications in a
 
        project.
 
    """
 

	
 
    # Set-up the graph object.
 
    graph = pydot.Dot(graph_name=project.name, graph_type="digraph", bgcolor="transparent", nodesep="1.5")
 
    # Set-up defaults for the graph nodes.
 
    graph.set_node_defaults(shape="record")
 

	
 
    # Obtain list of all entities in a project.
 
    entities = project.entity_set.all()
 

	
 
    # Set-up dictinary that will contains clusters of entities belonging to same
 
    # location.
 
    clusters = {}
 

	
 
    # Dictinoary for storing mapping between nodes and colours.
 
    node_colors = {}
 

	
 
    # Get distinct colours, one for each node/entity.
 
    colors = get_distinct_colors(entities.count())
 

	
 
    # Created nodes based on entities, and put them into correct cluster.
 
    for entity in entities:
 

	
 
        # Try to get the existing cluster based on location name.
 
        location = entity.location
 
        cluster_name = location.name.replace(" ", "_").lower()
 
        cluster = clusters.get(cluster_name, None)
 

	
 
        # Set-up a new cluster for location encountered for the first time.
 
        if cluster is None:
 
            cluster = pydot.Cluster(graph_name=cluster_name, label=location.name)
 
            clusters[cluster_name] = cluster
 

	
 
        # Fetch a colour that will be associated with the node/entity.
 
        node_color = colors.pop()
 
        node_colors[entity.id] = node_color.hex
 

	
 
        # Determine whether the node label should be black or white based on brightness of node colour.
 
        node_color_brightness = 1 - (node_color.rgb["r"] * 0.299 + node_color.rgb["g"] * 0.587 + node_color.rgb["b"] * 0.114)
 

	
 
        if node_color_brightness < 0.5:
 
            font_color = "black"
 
        else:
 
            font_color = "white"
 

	
 
        # Finally create the node, and add it to location cluster.
 
        node = pydot.Node(entity.name, style="filled", color=node_color.hex, fontcolor=font_color)
 
        cluster.add_node(node)
 

	
 
    # Add clusters to the graph.
 
    for cluster in clusters.values():
 
        graph.add_subgraph(cluster)
 

	
 
    # Get all project communications.
 
    communications = Communication.objects.filter(source__entity__project=project)
 

	
 
    # Add the edges (lines) representing communications, drawing them with same
 
    # colour as the source node/entity.
 
    for comm in communications:
 
        edge_color = node_colors[comm.source.entity.id]
 

	
 
        label = '"%s:%s"' % (comm.protocol, str(comm.port))
 

	
 
        edge = pydot.Edge(comm.source.entity.name, comm.destination.entity.name, label=label, color=edge_color)
 

	
 
        graph.add_edge(edge)
 

	
 
    return graph
conntrackt/views.py
Show inline comments
 
@@ -16,7 +16,7 @@ from braces.views import MultiplePermiss
 
# Application imports.
 
from .forms import ProjectForm, LocationForm, EntityForm, InterfaceForm, CommunicationForm
 
from .models import Project, Entity, Location, Interface, Communication
 
from .utils import generate_entity_iptables
 
from .utils import generate_entity_iptables, generate_project_diagram
 

	
 

	
 
class IndexView(MultiplePermissionsRequiredMixin, TemplateView):
 
@@ -880,3 +880,38 @@ class CommunicationDeleteView(SetHeadlin
 
        """
 

	
 
        return "Delete communication %s" % self.object
 

	
 

	
 
@permission_required("conntrackt.view", raise_exception=True)
 
def project_diagram(request, pk):
 
    """
 
    Custom view that returns response containing diagram of project
 
    communications.
 

	
 
    The diagram will include coloured entities, with directional lines
 
    connecting the source and destination end entities.
 

	
 
    The output format is SVG.
 

	
 
    Arguments:
 

	
 
        request - Request object.
 

	
 
        pk - Project ID for which the diagram should be generated.
 

	
 
    Returns:
 

	
 
        Response object that contains the project diagram rendered as SVG.
 
    """
 

	
 
    # Fetch the project.
 
    project = get_object_or_404(Project, pk=pk)
 

	
 
    # Generate the diagram.
 
    content = generate_project_diagram(project).create_svg()
 

	
 
    # Set the mime type.
 
    response = HttpResponse(content, mimetype='image/svg+xml')
 

	
 
    # Return the response object.
 
    return response
requirements/base.txt
Show inline comments
 
@@ -2,3 +2,5 @@ django>=1.5
 
South
 
django-braces
 
django-crispy-forms
 
pyparsing==1.5.7
 
pydot
0 comments (0 inline, 0 general)