Changeset - 421171af6c75
[Not reviewed]
default
0 11 0
Branko Majic (branko) - 8 years ago 2017-12-22 10:33:56
branko@majic.rs
CONNT-25: Updating application and project to use Django 1.11.x:

- Bumped Django version in both the development requirements and setup
script to 1.11.x.
- Bumped Django Crispy Forms to version 1.7.0 in both development
requirements and setup script.
- Updated import of URL-related modules to use the new path (previous
one is deprecated).
- Added explicit on_delete = models.CASCADE option to all foreign key
fields (old implicit behaviour will be deprecated in Django 2.0).
- Fixed the custom change_list.html template used in Django Admin to
render without errors.
11 files changed with 22 insertions and 23 deletions:
0 comments (0 inline, 0 general)
conntrackt/admin.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
#
 
# Copyright (C) 2013 Branko Majic
 
#
 
# This file is part of Django Conntrackt.
 
#
 
# Django Conntrackt is free software: you can redistribute it and/or modify it
 
# under the terms of the GNU General Public License as published by the Free
 
# Software Foundation, either version 3 of the License, or (at your option) any
 
# later version.
 
#
 
# Django Conntrackt is distributed in the hope that it will be useful, but
 
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 
# details.
 
#
 
# You should have received a copy of the GNU General Public License along with
 
# Django Conntrackt.  If not, see <http://www.gnu.org/licenses/>.
 
#
 

	
 

	
 
# -*- coding: utf-8 -*-
 
#
 
# Copyright (C) 2013 Branko Majic
 
#
 
# This file is part of Django Conntrackt.
 
#
 
# Django Conntrackt is free software: you can redistribute it and/or modify it
 
# under the terms of the GNU General Public License as published by the Free
 
# Software Foundation, either version 3 of the License, or (at your option) any
 
# later version.
 
#
 
# Django Conntrackt is distributed in the hope that it will be useful, but
 
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 
# details.
 
#
 
# You should have received a copy of the GNU General Public License along with
 
# Django Conntrackt.  If not, see <http://www.gnu.org/licenses/>.
 
#
 

	
 

	
 
# Django imports.
 
from django.contrib import admin
 
from django.core.urlresolvers import resolve
 
from django.urls import resolve
 

	
 
# Application imports.
 
from .models import Project, Location, Entity, Interface, Communication
 

	
 

	
 
class InterfaceInline(admin.StackedInline):
 
    """
 
    This class implements the inline admin view of the Interface instances. This
 
    is used when adding the entities (since it's easier to specify interface for
 
    an entity right away).
 

	
 
    Properties:
 

	
 
      - model - Model that this admin class refers to.
 
      - extra - Number of interfaces that should be show inline for
 
        adding/editing.
 
    """
 

	
 
    model = Interface
 
    extra = 1
 

	
 

	
 
class CommunicationProjectListFilter(admin.SimpleListFilter):
 
    """
 
    This class implements a project-based filter for the communications list
 
    view. The filter is applied on both the source and destination field of a
 
    communication.
 

	
 
    The filter assumes that the communication belongs to a project by following
 
    the relationships through source and destination field towards interface,
 
    then entity, and then finally entity's project.
 

	
 
    Both source and destination must fullfil the requirement of belonging to the
 
    same project in order for the communication to be part of the resulting
 
    queryset.
 
    """
 

	
 
    # Set-up the filter title and parameter name that will be used for GET
 
    # request.
 
    title = "project"
 
    parameter_name = "project"
 

	
 
    def lookups(self, request, model_admin):
 
        """
 
        Returns a list of tuples that provide possible filter values that can be
 
        applied.
 

	
 
        Arguments:
 

	
 
          request - Request associated with the calling view.
 

	
 
          model_admin - Modem admin that can be used for accessing the model
 
          data.
 

	
 
        Returns:
 

	
 
          List of (project_id, project_object) tuples.
 
        """
 

	
 
        return [(p.id, p) for p in Project.objects.all()]
 

	
 
    def queryset(self, request, queryset):
 
        """
 
        Applies filtering by project ID on the provided communication queryset.
 

	
 
        Arguments:
 

	
 
          request - Request associated with the calling view.
 

	
 
          queryset - Current queryset used for displaying the information in the
 
          view.
 

	
 
        Returns:
 

	
 
          Queryset with applied filtering by object (if any). If no filtering
 
          needs to be done, returns original queryset.
 
        """
 

	
 
        # Apply the project filter on source and destination entity's project
 
        # ID, if it was specified.
 
        if self.value():
 
            return queryset.filter(source__entity__project=self.value(),
 
                                   destination__entity__project=self.value())
 

	
 
        return queryset
 

	
 

	
 
class CommunicationLocationListFilter(admin.SimpleListFilter):
 
    """
 
    This class implements a location-based filter for the communications list
 
    view. The filter is applied on both the source and destination field of a
 
    communication.
 

	
 
    The filter assumes that the communication belongs to a location by following
 
    the relationships through source and destination field towards interface,
 
    then entity, and then finally entity's location.
 

	
 
    Both source and destination must fullfil the requirement of belonging to the
 
    same location in order for the communication to be part of the resulting
 
    queryset.
 
    """
 

	
 
    # Set-up the filter title and parameter name that will be used for GET
 
    # request.
 
    title = "location"
 
    parameter_name = "location"
 

	
 
    def lookups(self, request, model_admin):
 
        """
 
        Returns a list of tuples that provide possible filter values that can be
 
        applied.
 

	
 
        Arguments:
 

	
 
          request - Request associated with the calling view.
 

	
 
          model_admin - Modem admin that can be used for accessing the model
 
          data.
 

	
 
        Returns:
 

	
 
          List of (project_id, project_object) tuples.
 
        """
 

	
 
        return [(p.id, p) for p in Location.objects.all()]
 

	
 
    def queryset(self, request, queryset):
 
        """
 
        Applies filtering by project ID on the provided communication queryset.
 

	
 
        Arguments:
 

	
 
          request - Request associated with the calling view.
 

	
 
          queryset - Current queryset used for displaying the information in the
 
          view.
 

	
 
        Returns:
 

	
 
          Queryset with applied filtering by object (if any). If no filtering
 
          needs to be done, returns original queryset.
 
        """
 

	
 
        # Apply the location filter on source and destination entity's project
 
        # ID, if it was specified.
 
        if self.value():
 
            return queryset.filter(source__entity__location=self.value(),
 
                                   destination__entity__location=self.value())
 

	
 
        return queryset
 

	
 

	
 
class CommunicationAdmin(admin.ModelAdmin):
 
    """
 
    Modifies the default admin class for the Communication class. The
 
    communication class needs to be modified in a number of ways in order to
 
    cater for easier adding of communication links, letting us limit the
 
    interfaces being shown as source/destination to specific project and/or
 
    site.
 
    """
 

	
 
    # Show standard fields of the model, and also include a separate edit link
 
    # so that other fields can be editable.
 
    list_display = ('source', 'destination', 'protocol', 'port', 'edit_link')
 
    # Make the extra edit link the main link for bringing-up admin page for
 
    # editing the communication.
 
    list_display_links = ('edit_link',)
 
    # All of the fields should be editable inline for ease-of-use purposes.
 
    list_editable = ('source', 'destination', 'protocol', 'port')
 
    # Add filters for project/location.
 
    list_filter = (CommunicationProjectListFilter, CommunicationLocationListFilter)
 

	
 
    def formfield_for_foreignkey(self, db_field, request, **kwargs):
 
        """
 
        Overrides the default queryset for the foreign key fields. This lets us
 
        limit the specification of communication to specific project and/or
 
        location. These are in turn passed through the GET parameters.
 

	
 
        Arguments:
 

	
 
          db_field - Name of the model field for which this method is called.
 

	
 
          request - Request associated with the calling view.
 

	
 
          kwargs - Additional keyword arguments
 
        """
 

	
 
        # Resolve the view name based on the request's path.
 
        view_name = resolve(request.path).view_name
 

	
 
        # Only process the source and destination fields that point to
 
        # interfaces.
 
        if db_field.name == "source" or db_field.name == "destination":
 
            # Perform no filtering by default.
 
            interface_filter = {}
 

	
 
            # If project was specified in GET requests, add it as a filter.
 
            if 'project' in request.GET:
 
                interface_filter['entity__project'] = request.GET['project']
 
            # If location was specified in GET request, add it as a filter.
 
            if 'location' in request.GET:
 
                interface_filter['entity__location'] = request.GET['location']
 
            # If there are any filtering options for the show interfaces, apply them.
 
            if interface_filter:
 
                kwargs["queryset"] = Interface.objects.filter(**interface_filter)
 

	
 
        # Call the parent's method so it would do any of its magic.
 
        return super(CommunicationAdmin, self).formfield_for_foreignkey(db_field, request, **kwargs)
 

	
 

	
 
class EntityAdmin(admin.ModelAdmin):
 
    """
 
    This class implements the admin view of the entity instances. It adds some
 
    inline capability that can be edited for the entity, and also adds inline
 
    editing of interfaces related to the entity.
 
    """
 

	
 
    # Show the interfaces inline when editing an entity.
 
    inlines = [InterfaceInline]
 
    # Specify what should be viewed in a list display.
 
    list_display = ('name', 'project', 'location')
 
    # Allow the user to change project and location directly in the list.
 
    list_editable = ('project', 'location')
 
    # Enable filtering based on project and location.
 
    list_filter = ['project', 'location']
 

	
 

	
 
class InterfaceAdmin(admin.ModelAdmin):
 
    """
 
    This class implements the admin view of the interface instances. It allows
 
    editing the IP address and netmask of an interface directly in the listing.
 

	
 
    It also adds some filtering capability based on project and/or location.
 
    """
 

	
 
    # Specify fields that should be visible in the list view.
 
    list_display = ('entity', 'address', 'netmask')
 
    # Allow changing of IP address and netmask directly in the list view.
 
    list_editable = ('address', 'netmask')
 
    # Enable filtering based on project and location.
 
    list_filter = ['entity__project', 'entity__location']
 

	
 

	
 
# Register our admin classes.
 
admin.site.register(Project)
 
admin.site.register(Location)
 
admin.site.register(Entity, EntityAdmin)
 
admin.site.register(Interface, InterfaceAdmin)
 
admin.site.register(Communication, CommunicationAdmin)
conntrackt/models.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
#
 
# Copyright (C) 2013 Branko Majic
 
#
 
# This file is part of Django Conntrackt.
 
#
 
# Django Conntrackt is free software: you can redistribute it and/or modify it
 
# under the terms of the GNU General Public License as published by the Free
 
# Software Foundation, either version 3 of the License, or (at your option) any
 
# later version.
 
#
 
# Django Conntrackt is distributed in the hope that it will be useful, but
 
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 
# details.
 
#
 
# You should have received a copy of the GNU General Public License along with
 
# Django Conntrackt.  If not, see <http://www.gnu.org/licenses/>.
 
#
 

	
 

	
 
# Django imports.
 
from django.contrib.admin.utils import NestedObjects
 
from django.core.exceptions import ValidationError
 
from django.core.urlresolvers import reverse
 
from django.urls import reverse
 
from django.db import models
 
from django.db.models.query_utils import Q
 

	
 
# Application imports.
 
from .utils import list_formatter_callback, get_distinct_colors
 

	
 

	
 
class SearchManager(models.Manager):
 
    """
 
    Custom model manager that implements search for model instances that contain
 
    a specific string (search term) in fields "name" or "description".
 
    """
 

	
 
    def search(self, search_term):
 
        """
 
        Performs a search for model instances that contain the provided search
 
        term in fields "name" or "description". The search is case-insensitive.
 

	
 
        Arguments:
 
          search_term - String to search the name and description for.
 

	
 
        Returns:
 
          Query set with model instances that matched the search.
 
        """
 

	
 
        return self.filter(Q(name__icontains=search_term) | Q(description__icontains=search_term))
 

	
 

	
 
class RelatedCollectorMixin(object):
 
    """
 
    Implements model mixin for easily obtainning related items of a model
 
    instance.
 

	
 
    The mixin can be used for obtaining all model objects that are directly or
 
    indirectly linked to the calling model object (through foreign key
 
    relationships).
 

	
 
    This mixin is very useful in delete views for warning the user about all of
 
    the related items that will be deleted if the calling item is deleted as
 
    well.
 
    """
 

	
 
    def get_dependant_objects(self):
 
        """
 
        Creates a list of model objects that depend (reference), both directly
 
        and indirectly, on calling model object. The calling model object is
 
        included as the first element of the list. This method call can be used
 
        in order to obtain a list of model objects that would get deleted in
 
        case the calling model object gets deleted.
 

	
 
        Returns:
 
          Nested list of model objects that depend (reference) calling model
 
          object.
 
        """
 

	
 
        collector = NestedObjects(using='default')
 

	
 
        collector.collect([self])
 

	
 
        return collector.nested()
 

	
 
    def get_dependant_objects_representation(self):
 
        """
 
        Creates a nested list of object representations that depend (reference),
 
        both directly and indirectly, calling model object. This method call can
 
        be used in order to obtain a list of string representations of model
 
        objects that would get deleted in case the calling model object gets
 
        deleted.
 

	
 
        The resulting nested list can be shown to the user for
 
        warning/notification purposes using the unordered_list template tag.
 

	
 
        Each non-list element will be a string generated using the
 
        conntrackt.utils.list_formatter_callback function.
 

	
 
        Returns:
 
          Nested list of representations of model objects that depend
 
          (reference) calling model object.
 
        """
 

	
 
        collector = NestedObjects(using='default')
 

	
 
        collector.collect([self])
 

	
 
        return collector.nested(list_formatter_callback)
 

	
 

	
 
class Project(RelatedCollectorMixin, models.Model):
 
    """
 
    Implements a model with information about a project. A project has some
 
    basic settings, and mainly serves the purpose of grouping entities for
 
    easier handling and administration.
 

	
 
    Fields:
 

	
 
      name - String denoting the project name.
 
      description - Free-form description of the project.
 
    """
 

	
 
    name = models.CharField(max_length=100, unique=True)
 
    description = models.TextField(blank=True)
 
    objects = SearchManager()
 
    deletion_collect_models = ["Entity", "Interface"]
 

	
 
    class Meta:
 
        permissions = (("view", "Can view information"),)
 

	
 
    def __unicode__(self):
 
        """
 
        Returns:
 
          String representation of a project.
 
        """
 

	
 
        return self.name
 

	
 
    def get_absolute_url(self):
 
        """
 
        Return absolute URL for viewing a single project.
 
        """
 

	
 
        return reverse("project", kwargs={'pk': self.pk})
 

	
 
    def get_project_communications_summary(self):
 
        """
 
        Creates a list of dictionaries where each dictionary provides summary of
 
        a communication. Each dictionary will contain the following keys:
 

	
 
        - source - Source of the communication.
 
        - source_color - Color that is associated with the source of
 
          communication. The color will match with color used in the project
 
          communications diagram.
 
        - destination - Destination of the communication.
 
        - destination_color - Color that is associated with the destination of
 
          communication. The color will match with color used in the project
 
          communications diagram.
 
        - protocol - Protocol used for communication.
 
        - port - Port used for communication.
 

	
 
        Returns:
 
          List of dictionaries, where each dictionary describes a single communication.
 
        """
 

	
 
        # Obtain the list of colors.
 
        colors = get_distinct_colors(self.entity_set.count())
 

	
 
        # Fetch ID's of each end entity.
 
        entity_ids = self.entity_set.values_list("pk", flat=True).order_by("pk")
 

	
 
        # Map the entity ID's to generated colors.
 
        entity_colors = dict(zip(entity_ids, colors))
 

	
 
        # Set-up an empty list where the resulting data will be stored.
 
        communications = []
 

	
 
        # Process each communication, and add the information to result.
 
        for communication in Communication.objects.filter(source__entity__project=self).select_related():
 
            communications.append({"source": communication.source.entity.name,
 
                                   "source_color": entity_colors[communication.source.entity.pk],
 
                                   "destination": communication.destination.entity.name,
 
                                   "destination_color": entity_colors[communication.destination.entity.pk],
 
                                   "protocol": communication.protocol,
 
                                   "port": communication.port})
 

	
 
        # Finally return the result.
 
        return communications
 

	
 

	
 
class Location(RelatedCollectorMixin, models.Model):
 
    """
 
    Implements a model with information about location. Locations can further be
 
    assigned to entities, letting the user group different servers and equipment
 
    based on location.
 

	
 
    Locations are not tied to specific project, and they do not have to be
 
    actual physical locations. Such generic locations are therefore reusable
 
    accross multiple projects.
 

	
 
    For example, locations can be:
 

	
 
      - Main site
 
      - Backup site
 
      - Disaster recovery site
 
      - Belgrade
 
      - Stockholm
 

	
 
    Fields:
 

	
 
      name - String denoting the location name.
 
      description - Free-form description of a location.
 
    """
 

	
 
    name = models.CharField(max_length=100, unique=True)
 
    description = models.TextField(blank=True)
 

	
 
    def __unicode__(self):
 
        """
 
        Returns:
 
          String representation of a location.
 
        """
 

	
 
        return self.name
 

	
 

	
 
class Entity(RelatedCollectorMixin, models.Model):
 
    """
 
    Models an entity in a project. An entity can be a server, router, or any
 
    other piece of networking equipment that has its own IP address.
 

	
 
    Entities can also be used for representing subnets etc. This is useful when
 
    the communication restrictions need to be applied across a subnet.
 

	
 
    Entities are tied to specific projects and locations.
 

	
 
    Fields:
 

	
 
      name - String denoting the entity name.
 
      description - Free-form description of an entity.
 
      project - Foreign key pointing to the project to which the entity
 
      belongs.
 
      location - Foreign key pointing to the location at which the entity is
 
      located.
 
    """
 

	
 
    name = models.CharField(max_length=100)
 
    description = models.TextField(blank=True)
 
    project = models.ForeignKey(Project)
 
    location = models.ForeignKey(Location)
 
    project = models.ForeignKey(Project, on_delete = models.CASCADE)
 
    location = models.ForeignKey(Location, on_delete = models.CASCADE)
 
    objects = SearchManager()
 

	
 
    class Meta:
 
        # Fix the plural form used by Django.
 
        verbose_name_plural = 'entities'
 
        # Enforce uniqueness of entity name in a project.
 
        unique_together = ("name", "project")
 

	
 
    def __unicode__(self):
 
        """
 
        Returns:
 
          String representation of an entity. This identifier contains name of
 
          entity, its project name, and location name.
 
        """
 

	
 
        return "%s (%s - %s)" % (self.name, self.project, self.location)
 

	
 
    def incoming_communications(self):
 
        """
 
        Returns:
 
          List of incoming communications for an entity.
 
        """
 

	
 
        communications = []
 

	
 
        for interface in self.interface_set.all():
 
            for communication in interface.destination_set.all():
 
                communications.append(communication)
 

	
 
        return communications
 

	
 
    def outgoing_communications(self):
 
        """
 
        Returns:
 
          List of outgoing communications for an entity.
 
        """
 

	
 
        communications = []
 

	
 
        for interface in self.interface_set.all():
 
            for communication in interface.source_set.all():
 
                communications.append(communication)
 

	
 
        return communications
 

	
 
    def get_absolute_url(self):
 
        """
 
        Return absolute URL for viewing a single entity.
 
        """
 

	
 
        return reverse("entity", kwargs={'pk': self.pk})
 

	
 
    def clean(self):
 
        """
 
        Performs additional validation checks on the submitted data. It will
 
        verify the following:
 

	
 
          - That entity is not linked to any other entity in case of project
 
            change.
 
        """
 

	
 
        # Perform the check if entity is being updated.
 
        if self.pk:
 
            # Fetch the old data from database.
 
            # @TODO: Is it better to do copying during __init__ instead?
 
            old_object = Entity.objects.get(pk=1)
 

	
 
            # Make sure that entity has no communications in current project if
 
            # moving it around.
 
            if self.project != old_object.project and (self.incoming_communications() or self.outgoing_communications()):
 
                raise ValidationError("The entity cannot be moved to different project as long as it has valid communications with entities in current project.")
 

	
 

	
 
class Interface(RelatedCollectorMixin, models.Model):
 
    """
 
    Models a representation of an interface on an entity. It can be used for
 
    representing the subnets as well.
 

	
 
    Each interface is coupled with a specific Entity.
 

	
 
    Fields:
 
      name - String denoting the interface name. For example 'eth0', 'eth1'
 
      etc.
 
      description - Free-form description of an interface.
 
      entity - Foreign key pointing to the entity to which the interface
 
      belongs.
 
      address - IP address of an interface. It's possible to store network
 
      address in it as well.
 
      netmask - Netmask of the interface. By default this is /32
 
      (255.255.255.255), but in case of subnet entities this can be used for
 
      denoting the network netmask.
 
    """
 

	
 
    name = models.CharField(max_length=100, default='eth0')
 
    description = models.TextField(blank=True, default='Main network interface.')
 
    entity = models.ForeignKey(Entity)
 
    entity = models.ForeignKey(Entity, on_delete = models.CASCADE)
 
    address = models.GenericIPAddressField()
 
    netmask = models.GenericIPAddressField(default='255.255.255.255')
 

	
 
    class Meta:
 
        # Enforce uniqueness of interface name in an entity. Enforce uniqueness
 
        # of IP address in a subnet for an entity.
 
        unique_together = (("name", "entity"),
 
                           ("entity", "address", "netmask"),)
 

	
 
    def __unicode__(self):
 
        """
 
        Returns:
 
          String representation of an interface. In case of single IP this will
 
          simply be the interface name and IP address. In case of subnet it will
 
          include the netmask as well.
 
        """
 

	
 
        if self.netmask == '255.255.255.255':
 
            return '%s (%s)' % (self.entity.name, self.address)
 
        else:
 
            return '%s (%s/%s)' % (self.entity.name, self.address, self.netmask)
 

	
 

	
 
class Communication(RelatedCollectorMixin, models.Model):
 
    """
 
    Models a representation of allowed network communication. This lets the user
 
    display the possible network connections that should be allowed. Information
 
    from the communication instances is also used for generating the iptables
 
    rules for the entities.
 

	
 
    Communication instances allow the user to specify one of the three possible
 
    protocols and related information:
 

	
 
      - TCP, along with the TCP port.
 
      - UDP, along with the UDP port.
 
      - ICMP, along with the ICMP type.
 

	
 
    Allowed communication is always represented as combination of source
 
    interface, destination interface, protocol, and port/ICMP type.
 

	
 
    Fields:
 
      source - Foreign key to the source (originating) interface. The
 
      communication is expected to come _from_ the source.
 
      destination - Foreign key to the destination interface. The destination
 
      interface is expected to be able to accept incoming connections
 
      (i.e. entity's servers are listening on those).
 
      protocol - Textual field denoting the protocol that is used for
 
      communication. This can be 'TCP', 'UDP', or 'ICMP'.
 
      port - Port number used by the protocol. In case of ICMP, this is an ICMP
 
      type (in numeric form).
 
      description - Free-form text that can be used to describe the
 
      communication. This is also used when generating the iptables rules for
 
      documenting the rules.
 
    """
 

	
 
    PROTOCOL_CHOICES = (
 
        ('TCP', 'TCP'),
 
        ('UDP', 'UDP'),
 
        ('ICMP', 'ICMP'),
 
        )
 

	
 
    source = models.ForeignKey(Interface, related_name='source_set')
 
    destination = models.ForeignKey(Interface, related_name='destination_set')
 
    source = models.ForeignKey(Interface, related_name='source_set', on_delete = models.CASCADE)
 
    destination = models.ForeignKey(Interface, related_name='destination_set', on_delete = models.CASCADE)
 
    protocol = models.CharField(max_length=10, choices=PROTOCOL_CHOICES)
 
    port = models.IntegerField(default=0)
 
    description = models.TextField(blank=True)
 

	
 
    class Meta:
 
        # Enforce uniqueness of communication.
 
        unique_together = ("source", "destination", "protocol", "port")
 

	
 
    def __unicode__(self):
 
        """
 
        Returns:
 
          String representation of an interface. This involves showing the
 
          source and destination _entity_ name, protocol, and port.
 
        """
 

	
 
        return "%s -> %s (%s:%s)" % (self.source.entity.name, self.destination.entity.name, self.protocol, self.port)
 

	
 
    def clean(self):
 
        """
 
        Performs additional validation checks on the submitted data. It will
 
        verify the following:
 

	
 
          - That source and destination interface belongs to distinct entities.
 
          - That the specified protocol is supported.
 
        """
 

	
 
        if self.source.entity == self.destination.entity:
 
            raise ValidationError('Source and destination entities are identical.')
 

	
 
        if self.source.entity.project != self.destination.entity.project:
 
            raise ValidationError('Source and destination entities do not belong to the same project')
 

	
 
        if (self.protocol.upper(), self.protocol.upper()) not in self.PROTOCOL_CHOICES:
 
            raise ValidationError('%s is not a supported protocol.' % self.protocol)
 

	
 
    def edit_link(self):
 
        """
 
        This method is used for providing an additional 'Edit' link in the admin
 
        site for the communication instances (for the display_list).
 

	
 
        This provides ability to let all of the other fields of a communication
 
        instance to be editable.
 
        """
 

	
 
        return "Edit"
 

	
 
    def source_representation(self):
 
        """
 
        Produces string representation of communication that includes only the
 
        source interface information.
 

	
 
        The method is useful where the destination context is well known.
 

	
 
        Returns:
 
            Communication representation that includes only information about
 
            the source interface.
 
        """
 

	
 
        return "%s - %s: %d" % (self.source, self.protocol, self.port)
 

	
 
    def destination_representation(self):
 
        """
 
        Produces string representation of communication that includes only the
 
        destination interface information.
 

	
 
        The method is useful where the source context is well known.
 

	
 
        Returns:
 
            Communication representation that includes only information about
 
            the destination interface.
 
        """
 

	
 
        return "%s - %s: %d" % (self.destination, self.protocol, self.port)
conntrackt/templates/admin/conntrackt/communication/change_list.html
Show inline comments
 
{% extends "admin/change_list.html" %}
 
{% load i18n admin_static admin_list %}
 
{% load url from future %}
 
{% load admin_urls %}
 

	
 
{% block object-tools-items %}
 
            <li>
 
              {# Add the GET parameters from the admin's filter to 'Add entity' button #}
 
              {# so we can perform some filtering on source/destination interfaces. #}
 
              <a href="{% url cl.opts|admin_urlname:'add' %}?{% if is_popup %}_popup=1{% endif %}{% if 'location' in cl.params %}&amp;location={{ cl.params.location }}{% endif %}{% if 'project' in cl.params %}&amp;project={{ cl.params.project }}{% endif %}" class="addlink">
 
                {% blocktrans with cl.opts.verbose_name as name %}Add {{ name }}{% endblocktrans %}
 
              </a>
 
            </li>
 
{% endblock %}
 

	
conntrackt/templatetags/conntrackt_tags.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
#
 
# Copyright (C) 2013 Branko Majic
 
#
 
# This file is part of Django Conntrackt.
 
#
 
# Django Conntrackt is free software: you can redistribute it and/or modify it
 
# under the terms of the GNU General Public License as published by the Free
 
# Software Foundation, either version 3 of the License, or (at your option) any
 
# later version.
 
#
 
# Django Conntrackt is distributed in the hope that it will be useful, but
 
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 
# details.
 
#
 
# You should have received a copy of the GNU General Public License along with
 
# Django Conntrackt.  If not, see <http://www.gnu.org/licenses/>.
 
#
 

	
 

	
 
# Django imports.
 
from django import template
 
from django.core import urlresolvers
 
from django import urls
 
from django.utils.html import format_html
 

	
 

	
 
# Get an instance of Django's template library.
 
register = template.Library()
 

	
 

	
 
@register.simple_tag(takes_context=False)
 
def html_link(text, view, *args, **kwargs):
 
    """
 
    A small wrapper for showing HTML links.
 

	
 
    Positional arguments:
 

	
 
        text - Text that should be used as a link.
 

	
 
        view - View name for which the URL should be shown
 

	
 
        args - Additional positional arguments that will be passed to resolver
 
        for creating the URL.
 

	
 
    Keyword arguments:
 

	
 
        id - Identifier for the <a> HTML element.
 

	
 
        class - Class(es) for the <a> HTML element.
 

	
 
        title - Title for the HTML <a> element.
 

	
 
        get - Additional GET parameter that should be appended to the URL.
 

	
 
    """
 

	
 
    # Verify the passed-in keyword arguments first.
 
    for key in kwargs.keys():
 
        if key not in ("get", "class", "title", "id"):
 
            raise template.TemplateSyntaxError("Unknown argument for 'html_link' tag: %r" % key)
 

	
 
    # Generate the URL by using the supplied view name and arguments that should
 
    # be passed to the view.
 
    url = urlresolvers.reverse(view, args=args)
 
    url = urls.reverse(view, args=args)
 

	
 
    # Set-up the base pattern (url, parameters, text).
 
    if 'get' in kwargs:
 
        pattern = '<a href="{url}?{get}"'
 
    else:
 
        pattern = '<a href="{url}"'
 

	
 
    if 'class' in kwargs:
 
        pattern += ' class="{class}"'
 

	
 
    if 'title' in kwargs:
 
        pattern += ' title="{title}"'
 

	
 
    if 'id' in kwargs:
 
        pattern += ' id="{id}"'
 

	
 
    pattern += '>{text}</a>'
 

	
 
    # Render the tag.
 
    return format_html(pattern, url=url, text=text, **kwargs)
 

	
 

	
 
@register.simple_tag(takes_context=True)
 
def active_link(context, url_name, return_value='active', **kwargs):
 
    """
 
    This template tag can be used to check if the provided URL matches against
 
    the path from the request or not.
 

	
 
    Arguments:
 

	
 
      context - Context of the current view being called.
 

	
 
      url_name - Name of the URL that's being checked against current path from
 
      request.
 
    """
 

	
 
    matches = current_url_equals(context, url_name, **kwargs)
 

	
 
    return return_value if matches else ''
 

	
 

	
 
def current_url_equals(context, url_name, **kwargs):
 
    """
 
    Helper function for checking if the specified URL corresponds to the current
 
    request path in the context.
 

	
 
    Arguments:
 

	
 
      - context - Context of the view being rendered.
 

	
 
      - url_name - Name of the URL against which the context request path is
 
      being checked.
 
    """
 

	
 
    # Assume that we have not been able to resolve the request path to an URL.
 
    resolved = False
 
    try:
 
        # Use the request path, and resolve it to a URL name.
 
        resolved = urlresolvers.resolve(context.get('request').path)
 
    except urlresolvers.Resolver404:
 
        resolved = urls.resolve(context.get('request').path)
 
    except urls.Resolver404:
 
        # This means we haven't been able to resolve the path from request.
 
        pass
 

	
 
    # If the request was resolved and URL names match, verify that the kwargs
 
    # match as well.
 
    matches = resolved and resolved.url_name == url_name
 
    if matches and kwargs:
 
        for key in kwargs:
 
            kwarg = kwargs.get(key)
 
            resolved_kwarg = resolved.kwargs.get(key)
 
            if not resolved_kwarg or kwarg != resolved_kwarg:
 
                return False
 

	
 
    return matches
conntrackt/tests/test_tags.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
#
 
# Copyright (C) 2017 Branko Majic
 
#
 
# This file is part of Django Conntrackt.
 
#
 
# Django Conntrackt is free software: you can redistribute it and/or modify it
 
# under the terms of the GNU General Public License as published by the Free
 
# Software Foundation, either version 3 of the License, or (at your option) any
 
# later version.
 
#
 
# Django Conntrackt is distributed in the hope that it will be useful, but
 
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 
# details.
 
#
 
# You should have received a copy of the GNU General Public License along with
 
# Django Conntrackt.  If not, see <http://www.gnu.org/licenses/>.
 
#
 

	
 

	
 
# Standard library imports.
 
import json
 
from StringIO import StringIO
 
from zipfile import ZipFile, ZIP_DEFLATED
 

	
 
# Python third-party library imports.
 
import mock
 

	
 
# Django imports.
 
from django.template import Context, Template, TemplateSyntaxError
 
from django.test import TestCase
 

	
 
# Application imports
 
from conntrackt.templatetags.conntrackt_tags import html_link, active_link, current_url_equals
 

	
 

	
 
@mock.patch('conntrackt.templatetags.conntrackt_tags.urlresolvers.reverse')
 
@mock.patch('conntrackt.templatetags.conntrackt_tags.urls.reverse')
 
class HtmlLinkTest(TestCase):
 

	
 
    def test_url_reverse_called_with_passed_in_args(self, mock_reverse):
 
        """
 
        Tests if URL reversing is performed using the correct set of
 
        passed-in arguments.
 
        """
 

	
 
        html_link("My link", "my_view", 'arg1', 'arg2', 'arg3')
 

	
 
        mock_reverse.assert_called_once_with("my_view", args=('arg1', 'arg2', 'arg3'))
 

	
 
    def test_url_reverse_called_without_args_if_they_are_not_passed_in(self, mock_reverse):
 
        """
 
        Tests if URL reverse is performed without using any positional
 
        arguments if they are not specified.
 
        """
 

	
 
        kwargs = {
 
            "id": "myid",
 
            "class": "myclass",
 
            "title": "mytitle",
 
        }
 

	
 

	
 
        html_link("My link", "my_view", **kwargs)
 

	
 
        mock_reverse.assert_called_once_with("my_view", args=())
 

	
 
    def test_html_id_applied_to_output_element(self, mock_reverse):
 
        """
 
        Tests if id attribute is filled-in correctly in the HTML tag.
 
        """
 

	
 
        # Mock a view we want to reverse.
 
        mock_reverse.return_value = "/my/url"
 
        kwargs = {
 
            'id': "my_id",
 
        }
 

	
 
        link = html_link("My link", "my_view", **kwargs)
 

	
 
        self.assertIn('id="my_id"', link)
 

	
 
    def test_html_class_applied_to_output_element(self, mock_reverse):
 
        """
 
        Tests if class attribute is filled-in correctly in the HTML tag.
 
        """
 

	
 
        # Mock a view we want to reverse.
 
        mock_reverse.return_value = "/my/url"
 
        kwargs = {
 
            'class': "class1,class2,class3",
 
        }
 

	
 
        link = html_link("My link", "my_view", **kwargs)
 

	
 
        self.assertIn('class="class1,class2,class3"', link)
 

	
 
    def test_html_title_applied_to_output_element(self, mock_reverse):
 
        """
 
        Tests if title attribute is filled-in correctly in the HTML tag.
 
        """
 

	
 
        # Mock a view we want to reverse.
 
        mock_reverse.return_value = "/my/url"
 
        kwargs = {
 
            'title': "My title",
 
        }
 

	
 
        link = html_link("My link", "my_view", **kwargs)
 

	
 
        self.assertIn('title="My title"', link)
 

	
 
    def test_get_parameter_applied_to_output_element_link(self, mock_reverse):
 
        """
 
        Tests if generated URL contains the passed-in get argument.
 
        """
 

	
 
        # Mock a view we want to reverse.
 
        mock_reverse.return_value = "/my/url"
 
        kwargs = {
 
            'get': "MyGetParameter",
 
        }
 

	
 
        link = html_link("My link", "my_view", **kwargs)
 

	
 
        self.assertIn('href="/my/url?MyGetParameter"', link)
 

	
 
    def test_rendered_output_format(self, mock_reverse):
 
        """
 
        Tests if the rendered output format is correct.
 
        """
 

	
 
        mock_reverse.return_value = "/my/url"
 

	
 
        link = html_link(
 
            "My link",
 
            "my_view",
 
            **{
 
                "id": "my_id",
 
                "class": "my_class",
 
                "title": "my_title",
 
                "get": "MyGetParameter=20",
 
            }
 
        )
 

	
 
        self.assertEqual(link, '<a href="/my/url?MyGetParameter=20" class="my_class" title="my_title" id="my_id">My link</a>')
 

	
 
    def test_invalid_passed_in_keyword_argument_raises_exception(self, mock_reverse):
 
        """
 
        Tests if passing-in a non-supported keyword argument raises an
 
        exception (if parameter validation works correctly).
 
        """
 

	
 
        with self.assertRaises(TemplateSyntaxError):
 
            html_link("My link", "my_view", invalid_keyword="some-value")
 

	
 
    def test_rendered_output_not_escaped(self, mock_reverse):
 
        """
 
        Tests if rendered output is not double-escaped by Django.
 
        """
 

	
 
        mock_reverse.return_value = "/my/url"
 

	
 
        template = Template('{% load conntrackt_tags %}{% html_link "My link" "my_view" class="my_class" title="my_title" id="my_id" get="MyGetParameter=20" %}')
 
        context = Context()
 
        rendered_output = template.render(context)
 

	
 
        self.assertEqual(rendered_output, '<a href="/my/url?MyGetParameter=20" class="my_class" title="my_title" id="my_id">My link</a>')
 

	
 
    def test_html_escapes_passed_in_values(self, mock_reverse):
 
        """
 
        Tests if values passed-in as keyword arguments are escaped within
 
        resulting output.
 
        """
 

	
 
        mock_reverse.return_value = "/my/url"
 

	
 
        link = html_link(
 
            "My </a> link",
 
            "my_view",
 
            **{
 
                "id": "my</a>_id",
 
                "class": "my</a>_class",
 
                "title": "my</a>_title",
 
                "get": "MyGetParameter=</a>",
 
            }
 
        )
 

	
 
        self.assertEqual(link, '<a href="/my/url?MyGetParameter=&lt;/a&gt;" class="my&lt;/a&gt;_class" title="my&lt;/a&gt;_title" id="my&lt;/a&gt;_id">My &lt;/a&gt; link</a>')
conntrackt/tests/test_views.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
#
 
# Copyright (C) 2013 Branko Majic
 
#
 
# This file is part of Django Conntrackt.
 
#
 
# Django Conntrackt is free software: you can redistribute it and/or modify it
 
# under the terms of the GNU General Public License as published by the Free
 
# Software Foundation, either version 3 of the License, or (at your option) any
 
# later version.
 
#
 
# Django Conntrackt is distributed in the hope that it will be useful, but
 
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 
# details.
 
#
 
# You should have received a copy of the GNU General Public License along with
 
# Django Conntrackt.  If not, see <http://www.gnu.org/licenses/>.
 
#
 

	
 

	
 
# Standard library imports.
 
import json
 
from StringIO import StringIO
 
from zipfile import ZipFile, ZIP_DEFLATED
 

	
 
# Python third-party library imports.
 
import mock
 

	
 
# Django imports.
 
from django.core.exceptions import ValidationError
 
from django.core.urlresolvers import reverse
 
from django.urls import reverse
 
from django.http import Http404
 
from django.test import RequestFactory
 
from django.test import TestCase
 
from django.utils.http import urlquote
 

	
 
# Application imports
 
from conntrackt.models import Project, Location, Entity, Interface, Communication
 

	
 
from conntrackt.views import IndexView, SearchView, APISearchView
 
from conntrackt.views import entity_iptables, project_iptables, project_diagram
 

	
 
from conntrackt.views import ProjectView, ProjectCreateView, ProjectUpdateView, ProjectDeleteView
 
from conntrackt.views import LocationCreateView, LocationUpdateView, LocationDeleteView
 
from conntrackt.views import EntityView, EntityCreateView, EntityUpdateView, EntityDeleteView
 
from conntrackt.views import InterfaceCreateView, InterfaceUpdateView, InterfaceDeleteView
 
from conntrackt.views import CommunicationCreateView, CommunicationUpdateView, CommunicationDeleteView
 

	
 
# Test imports.
 
from .forms import FormWithWidgetCSSClassFormMixin, FormWithPlaceholderFormMixin
 
from .helpers import PermissionTestMixin, RenderTestMixin, create_get_request, generate_get_response, FakeMessages
 
from .views import RedirectToNextMixinView
 
from .factories import setup_test_data
 

	
 

	
 
class IndexViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    sufficient_permissions = ("view",)
 
    view_class = IndexView
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context_no_projects(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called and
 
        no projects are available.
 
        """
 

	
 
        Project.objects.all().delete()
 

	
 
        # Get the view.
 
        view = IndexView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view)
 

	
 
        # Validate the response.
 
        self.assertQuerysetEqual(response.context_data["projects"], [])
 

	
 
    def test_context_no_locations(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called and
 
        no locations are available.
 
        """
 

	
 
        Location.objects.all().delete()
 

	
 
        # Get the view.
 
        view = IndexView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view)
 

	
 
        # Validate the response.
 
        self.assertQuerysetEqual(response.context_data["locations"], [])
 

	
 
    def test_context_projects(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called and
 
        there's multiple projects available.
 
        """
 

	
 
        # Get the view.
 
        view = IndexView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view)
 

	
 
        self.assertQuerysetEqual(response.context_data["projects"], ["<Project: Test Project 1>", "<Project: Test Project 2>"])
 

	
 
    def test_locations_available(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called and
 
        there's multiple locationsg available.
 
        """
 

	
 
        # Get the view.
 
        view = IndexView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view)
 

	
 
        # Validate the response.
 
        self.assertQuerysetEqual(response.context_data["locations"], ["<Location: Test Location 1>", "<Location: Test Location 2>"])
 

	
 

	
 
class ProjectViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    sufficient_permissions = ("view",)
 
    permission_test_view_kwargs = {"pk": "1"}
 
    render_test_view_kwargs = {"pk": "1"}
 
    view_class = ProjectView
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called.
 
        """
 

	
 
        # Get the view.
 
        view = ProjectView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, pk=1)
 

	
 
        # Fetch context data from response.
 
        location, entities = response.context_data["location_entities"][0]
 

	
 
        # Set-up expected context data values.
 
        expected_entities = ["<Entity: Test Entity 1 (Test Project 1 - Test Location 1)>",
 
                             "<Entity: Test Entity 2 (Test Project 1 - Test Location 1)>"]
 

	
 
        # Validate context data.
 
        self.assertEqual(location.name, "Test Location 1")
 
        self.assertQuerysetEqual(entities, expected_entities, ordered=False)
 

	
 
        # Fetch context data from response.
 
        location, entities = response.context_data["location_entities"][1]
 

	
 
        # Set-up expected context data values.
 
        expected_entities = ["<Entity: Test Entity 3 (Test Project 1 - Test Location 2)>",
 
                             "<Entity: Test Subnet 4 (Test Project 1 - Test Location 2)>"]
 

	
 
        # Validate context data.
 
        self.assertEqual(location.name, "Test Location 2")
 
        self.assertQuerysetEqual(entities, expected_entities, ordered=False)
 

	
 
        # Validate context data.
 
        self.assertEqual(str(response.context_data["project"]), "Test Project 1")
 

	
 
        # Validate context data is present.
 
        self.assertIn("communications", response.context_data.keys())
 

	
 

	
 
class EntityViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = EntityView
 
    sufficient_permissions = ("view",)
 
    permission_test_view_kwargs = {"pk": "1"}
 
    render_test_view_kwargs = {"pk": "1"}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Tests if the form comes pre-populated with proper content.
 
        """
 

	
 
        # Get the view.
 
        view = EntityView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, pk=1)
 

	
 
        # Set-up expected context data.
 
        expected_entity = Entity.objects.get(pk=1)
 

	
 
        expected_incoming_communications = ["<Communication: Test Entity 2 -> Test Entity 1 (TCP:22)>",
 
                                            "<Communication: Test Entity 2 -> Test Entity 1 (ICMP:8)>",
 
                                            "<Communication: Test Entity 3 -> Test Entity 1 (TCP:3306)>",
 
                                            "<Communication: Test Subnet 4 -> Test Entity 1 (TCP:22)>"]
 

	
 
        expected_outgoing_communications = ["<Communication: Test Entity 1 -> Test Entity 3 (UDP:53)>",
 
                                            "<Communication: Test Entity 1 -> Test Entity 2 (UDP:123)>"]
 

	
 
        expected_interfaces = ["<Interface: Test Entity 1 (192.168.1.1)>"]
 

	
 
        # Validate the response.
 
        self.assertQuerysetEqual(response.context_data["interfaces"], expected_interfaces)
 
        self.assertQuerysetEqual(response.context_data["incoming_communications"], expected_incoming_communications)
 
        self.assertQuerysetEqual(response.context_data["outgoing_communications"], expected_outgoing_communications)
 
        self.assertEqual(response.context_data["entity"], expected_entity)
 
        self.assertTrue("entity_iptables" in response.context_data)
 

	
 

	
 
class EntityIptablesTest(PermissionTestMixin, TestCase):
 

	
 
    view_function = staticmethod(entity_iptables)
 
    sufficient_permissions = ("view",)
 
    permission_test_view_kwargs = {"pk": "1"}
 
    render_test_view_kwargs = {"pk": "1"}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_invalid_entity(self):
 
        """
 
        Tests if a 404 is returned if no entity was found (invalid ID).
 
        """
 

	
 
        # Set-up a request.
 
        request = create_get_request()
 

	
 
        # Get the view.
 
        view = entity_iptables
 

	
 
        # Validate the response.
 
        self.assertRaises(Http404, view, request, pk=200)
 

	
 
    def test_content_type(self):
 
        """
 
        Test if correct content type is being returned by the response.
 
        """
 

	
 
        # Get the view.
 
        view = entity_iptables
 

	
 
        # Get the response.
 
        response = generate_get_response(view, pk=1)
 

	
 
        self.assertEqual(response['Content-Type'], "text/plain")
 

	
 
    def test_content_disposition(self):
 
        """
 
        Test if the correct content disposition has been set.
 
        """
 

	
 
        # Get the view.
 
        view = entity_iptables
 

	
 
        # Get the response.
 
        response = generate_get_response(view, pk=1)
 

	
 
        self.assertEqual(response['Content-Disposition'], "attachment; filename=test_entity_1-iptables.conf")
 

	
 
    def test_content(self):
 
        """
 
        Tests content produced by the view.
 
        """
 

	
 
        # Get the view.
 
        view = entity_iptables
 

	
 
        # Get the response.
 
        response = generate_get_response(view, pk=1)
 

	
 
        self.assertContains(response, ":INPUT")
 
        self.assertContains(response, ":OUTPUT")
 
        self.assertContains(response, ":FORWARD")
 

	
 

	
 
class ProjectIptablesTest(PermissionTestMixin, TestCase):
 

	
 
    view_function = staticmethod(project_iptables)
 
    sufficient_permissions = ("view",)
 
    permission_test_view_kwargs = {"project_id": 1}
 
    render_test_view_kwargs = {"project_id": 1}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_invalid_project(self):
 
        """
 
        Tests if a 404 is returned if no project was found (invalid ID).
 
        """
 

	
 
        # Set-up a request.
 
        request = create_get_request()
 

	
 
        # Get the view.
 
        view = project_iptables
 

	
 
        # Request iptables for whole project.
 
        self.assertRaises(Http404, view, request, 200)
 
        # Request iptables for project location
 
        self.assertRaises(Http404, view, request, 200, 1)
 

	
 
    def test_invalid_location(self):
 
        """
 
        Tests if a 404 is returned if no location was found (invalid ID).
 
        """
 

	
 
        # Set-up a request.
 
        request = create_get_request()
 

	
 
        # Get the view.
 
        view = project_iptables
 

	
 
        # Request iptables for project location
 
        self.assertRaises(Http404, view, request, 1, 200)
 

	
 
    def test_content_type(self):
 
        """
 
        Test if correct content type is being returned by the response.
 
        """
 

	
 
        # Get the view.
 
        view = project_iptables
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, 1)
 

	
 
        # Validate the response.
 
        self.assertEqual(response['Content-Type'], "application/zip")
 

	
 
    def test_content_disposition(self):
 
        """
 
        Test if the correct content disposition has been set.
 
        """
 

	
 
        # Get the view.
 
        view = project_iptables
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, 1)
 
        self.assertEqual(response['Content-Disposition'], 'attachment; filename="test_project_1-iptables.zip"')
 

	
 
        response = generate_get_response(view, None, 1, 1)
 
        self.assertEqual(response['Content-Disposition'], 'attachment; filename="test_project_1-test_location_1-iptables.zip"')
 

	
 
    def test_content_project(self):
 
        """
 
        Verifies that the content is properly generated when the view is called
 
        for an entire project.
 
        """
 

	
 
        # Get the view.
 
        view = project_iptables
 

	
 
        # Get the response.
 
        response = generate_get_response(project_iptables, None, 1)
 

	
 
        buff = StringIO(response.content)
 

	
 
        zipped_iptables = ZipFile(buff, "r", ZIP_DEFLATED)
 
        expected_zip_files = ["test_entity_1-iptables.conf",
 
                              "test_entity_2-iptables.conf",
 
                              "test_entity_3-iptables.conf",
 
                              "test_subnet_4-iptables.conf"]
 

	
 
        self.assertEqual(len(zipped_iptables.namelist()), 4)
 
        self.assertEqual(zipped_iptables.namelist(), expected_zip_files)
 

	
 
        for filename in expected_zip_files:
 
            iptables_file = zipped_iptables.read(filename)
 
            self.assertIn(":INPUT", iptables_file)
 
            self.assertIn(":OUTPUT", iptables_file)
 
            self.assertIn(":FORWARD", iptables_file)
 

	
 
        zipped_iptables.close()
 

	
 
    def test_content_location(self):
 
        """
 
        Verifies that the content is properly generated when the view is called
 
        for an entire project.
 
        """
 

	
 
        # Get the view.
 
        view = project_iptables
 

	
 
        # Get the response.
 
        response = generate_get_response(project_iptables, None, 1, 1)
 

	
 
        buff = StringIO(response.content)
 

	
 
        zipped_iptables = ZipFile(buff, "r", ZIP_DEFLATED)
 
        expected_zip_files = ["test_entity_1-iptables.conf",
 
                              "test_entity_2-iptables.conf"]
 

	
 
        self.assertEqual(len(zipped_iptables.namelist()), 2)
 
        self.assertEqual(zipped_iptables.namelist(), expected_zip_files)
 

	
 
        for filename in expected_zip_files:
 
            iptables_file = zipped_iptables.read(filename)
 
            self.assertIn(":INPUT", iptables_file)
 
            self.assertIn(":OUTPUT", iptables_file)
 
            self.assertIn(":FORWARD", iptables_file)
 

	
 
        zipped_iptables.close()
 

	
 

	
 
class ProjectCreateViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = ProjectCreateView
 
    sufficient_permissions = ("add_project",)
 

	
 

	
 
class ProjectUpdateViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = ProjectUpdateView
 
    sufficient_permissions = ("change_project",)
 
    permission_test_view_kwargs = {"pk": 1}
 
    render_test_view_kwargs = {"pk": 1}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called for
 
        specific project.
 
        """
 

	
 
        # Get the view.
 
        view = ProjectUpdateView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, pk=1)
 

	
 
        self.assertEqual(response.context_data["project"].name, "Test Project 1")
 
        self.assertEqual(response.context_data["headline"], "Update project Test Project 1")
 

	
 

	
 
class ProjectDeleteViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = ProjectDeleteView
 
    sufficient_permissions = ("delete_project",)
 
    permission_test_view_kwargs = {"pk": "1"}
 
    render_test_view_kwargs = {"pk": "1"}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called for
 
        specific project.
 
        """
 

	
 
        # Get the expected project.
 
        project = Project.objects.get(pk=1)
 

	
 
        # Get the view.
 
        view = ProjectDeleteView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, pk=1)
 

	
 
        self.assertEqual(response.context_data["project"], project)
 
        self.assertEqual(response.context_data["headline"], "Delete project Test Project 1")
 

	
 
    def test_message(self):
 
        """
 
        Tests if the message gets added when the project is deleted.
 
        """
 

	
 
        # Get the view.
 
        view = ProjectDeleteView.as_view()
 

	
 
        # Generate the request.
 
        request = RequestFactory().post("/fake-path/")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 
        request._messages = FakeMessages()
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertIn("Project Test Project 1 has been removed.", request._messages.messages)
 

	
 

	
 
class LocationCreateViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = LocationCreateView
 
    sufficient_permissions = ("add_location",)
 

	
 

	
 
class LocationUpdateViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = LocationUpdateView
 
    sufficient_permissions = ("change_location",)
 
    permission_test_view_kwargs = {"pk": 1}
 
    render_test_view_kwargs = {"pk": 1}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called for
 
        specific location.
 
        """
 

	
 
        # Get the view.
 
        view = LocationUpdateView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, pk=1)
 

	
 
        self.assertEqual(response.context_data["location"].name, "Test Location 1")
 
        self.assertEqual(response.context_data["headline"], "Update location Test Location 1")
 

	
 

	
 
class LocationDeleteViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = LocationDeleteView
 
    sufficient_permissions = ("delete_location",)
 
    permission_test_view_kwargs = {"pk": "1"}
 
    render_test_view_kwargs = {"pk": "1"}    
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called for
 
        specific location.
 
        """
 

	
 
        # Get the expected location.
 
        location = Location.objects.get(pk=1)
 

	
 
        # Get the view.
 
        view = LocationDeleteView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, pk=1)
 

	
 
        self.assertEqual(response.context_data["location"], location)
 
        self.assertEqual(response.context_data["headline"], "Delete location Test Location 1")
 

	
 
    def test_message(self):
 
        """
 
        Tests if the message gets added when the location is deleted.
 
        """
 

	
 
        # Get the view.
 
        view = LocationDeleteView.as_view()
 

	
 
        # Generate the request.
 
        request = RequestFactory().post("/fake-path/")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        request._messages = FakeMessages()
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertIn("Location Test Location 1 has been removed.", request._messages.messages)
 

	
 

	
 
class EntityCreateViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = EntityCreateView
 
    sufficient_permissions = ("add_entity",)
 

	
 
    def setUp(self):
 
        """
 
        Sets-up some data necessary for testing.
 
        """
 

	
 
        # Set-up some data for testing.
 
        Project.objects.create(name="Test Project 1", description="This is test project 1.")
 
        Project.objects.create(name="Test Project 2", description="This is test project 2.")
 
        Location.objects.create(name="Test Location 1", description="This is test location 1.")
 
        Location.objects.create(name="Test Location 2", description="This is test location 2.")
 

	
 
    def test_form_project_limit(self):
 
        """
 
        Tests if the queryset is properly limitted to specific project if GET
 
        parameters is passed.
 
        """
 

	
 
        # Set-up the view.
 
        view = EntityCreateView()
 
        view.request = RequestFactory().get("/fake-path?project=1")
 
        view.object = None
 

	
 
        # Get the form.
 
        form = view.get_form(view.get_form_class())
 

	
 
        self.assertQuerysetEqual(form.fields["project"].queryset, ["<Project: Test Project 1>"])
 

	
 
    def test_form_location_limit(self):
 
        """
 
        Tests if the queryset is properly limitted to specific location if GET
 
        parameters is passed.
 
        """
 

	
 
        # Set-up the view.
 
        view = EntityCreateView()
 
        view.request = RequestFactory().get("/fake-path?location=1")
 
        view.object = None
 

	
 
        # Get the form.
 
        form = view.get_form(view.get_form_class())
 

	
 
        self.assertQuerysetEqual(form.fields["location"].queryset, ["<Location: Test Location 1>"])
 

	
 
    def test_initial_project(self):
 
        """
 
        Tests if the choice field for project is defaulted to project passed as
 
        part of GET parameters.
 
        """
 

	
 
        view = EntityCreateView()
 
        view.request = RequestFactory().get("/fake-path?project=1")
 
        view.object = None
 

	
 
        initial = view.get_initial()
 

	
 
        self.assertDictContainsSubset({"project": "1"}, initial)
 

	
 
    def test_initial_location(self):
 
        """
 
        Tests if the choice field for location is defaulted to location passed
 
        as part of GET parameters.
 
        """
 

	
 
        view = EntityCreateView()
 
        view.request = RequestFactory().get("/fake-path?location=1")
 
        view.object = None
 

	
 
        initial = view.get_initial()
 

	
 
        self.assertDictContainsSubset({"location": "1"}, initial)
 

	
 

	
 
class EntityDeleteViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = EntityDeleteView
 
    sufficient_permissions = ("delete_entity",)
 
    permission_test_view_kwargs = {"pk": 1}
 
    render_test_view_kwargs = {"pk": 1}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called for
 
        specific entity.
 
        """
 

	
 
        # Get the expected entity.
 
        entity = Entity.objects.get(pk=1)
 

	
 
        # Get the view.
 
        view = EntityDeleteView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, pk=1)
 

	
 
        self.assertEqual(response.context_data["entity"], entity)
 
        self.assertEqual(response.context_data["headline"], "Delete entity Test Entity 1")
 

	
 
    def test_message(self):
 
        """
 
        Tests if the message gets added when the entity is deleted.
 
        """
 

	
 
        # Get the view.
 
        view = EntityDeleteView.as_view()
 

	
 
        # Generate the request.
 
        request = RequestFactory().post("/fake-path/")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        request._messages = FakeMessages()
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertIn("Entity Test Entity 1 has been removed.", request._messages.messages)
 

	
 
    def test_success_url(self):
 
        """
 
        Validate that the success URL is set properly after delete.
 
        """
 

	
 
        # Get the view.
 
        view = EntityDeleteView.as_view()
 

	
 
        # Generate the request
 
        request = RequestFactory().post("/fake-path/")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 
        request._messages = FakeMessages()
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertEqual(response["Location"], reverse("project", args=(1,)))
 

	
 

	
 
class EntityUpdateViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = EntityUpdateView
 
    sufficient_permissions = ("change_entity",)
 
    permission_test_view_kwargs = {"pk": 1}
 
    render_test_view_kwargs = {"pk": 1}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called for
 
        specific entity.
 
        """
 

	
 
        # Get the view.
 
        view = EntityUpdateView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, pk=1)
 

	
 
        self.assertEqual(response.context_data["entity"].name, "Test Entity 1")
 
        self.assertEqual(response.context_data["headline"], "Update entity Test Entity 1")
 

	
 

	
 
class InterfaceCreateViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = InterfaceCreateView
 
    sufficient_permissions = ("add_interface",)
 

	
 
    def setUp(self):
 
        """
 
        Sets-up some data necessary for testing.
 
        """
 

	
 
        # Set-up some data for testing.
 
        project = Project.objects.create(name="Test Project", description="This is test project.")
 
        location = Location.objects.create(name="Test Location", description="This is test location.")
 
        Entity.objects.create(name="Test Entity 1", description="This is test entity 1.", project=project, location=location)
 
        Entity.objects.create(name="Test Entity 2", description="This is test entity 2.", project=project, location=location)
 

	
 
    def test_form_entity_limit(self):
 
        """
 
        Tests if the queryset is properly limitted to specific entity if GET
 
        parameter is passed.
 
        """
 

	
 
        # Set-up the view.
 
        view = InterfaceCreateView()
 
        view.request = RequestFactory().get("/fake-path?entity=1")
 
        view.object = None
 

	
 
        # Get the form.
 
        form = view.get_form(view.get_form_class())
 

	
 
        self.assertQuerysetEqual(form.fields["entity"].queryset, ["<Entity: Test Entity 1 (Test Project - Test Location)>"])
 

	
 
    def test_initial_project(self):
 
        """
 
        Tests if the choice field for entity is defaulted to entity passed as
 
        part of GET parameters.
 
        """
 

	
 
        view = InterfaceCreateView()
 
        view.request = RequestFactory().get("/fake-path?entity=1")
 
        view.object = None
 

	
 
        initial = view.get_initial()
 

	
 
        self.assertDictContainsSubset({"entity": "1"}, initial)
 

	
 
    def test_success_url(self):
 
        """
 
        Validate that the success URL is set properly after interface is
 
        created.
 
        """
 

	
 
        # Get the view.
 
        view = InterfaceCreateView.as_view()
 

	
 
        # Generate the request.
 
        post_data = {"name": "eth0", "description": "Main interface.",
 
                     "entity": "1", "address": "192.168.1.1",
 
                     "netmask": "255.255.255.255"}
 
        request = RequestFactory().post("/fake-path/", data=post_data)
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertEqual(response["Location"], reverse("entity", args=(1,)))
 
        self.assertEqual(response.status_code, 302)
 

	
 

	
 
class InterfaceUpdateViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = InterfaceUpdateView
 
    sufficient_permissions = ("change_interface",)
 
    permission_test_view_kwargs = {"pk": 1}
 
    render_test_view_kwargs = {"pk": 1}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called for
 
        specific entity.
 
        """
 

	
 
        # Get the view.
 
        view = InterfaceUpdateView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, pk=1)
 

	
 
        # Set-up expected interface.
 
        interface = Interface.objects.get(pk=1)
 

	
 
        self.assertEqual(response.context_data["interface"], interface)
 
        self.assertEqual(response.context_data["headline"], "Update interface eth0")
 

	
 
    def test_form_entity_limit(self):
 
        """
 
        Tests if the queryset is properly limitted to specific project's
 
        entities.
 
        """
 

	
 
        # Set-up the view.
 
        view = InterfaceUpdateView()
 
        view.request = RequestFactory().get("/fake-path/1")
 
        view.object = Interface.objects.get(pk=1)
 

	
 
        # Get the form.
 
        form = view.get_form(view.get_form_class())
 

	
 
        expected_entities = ["<Entity: Test Entity 1 (Test Project 1 - Test Location 1)>",
 
                             "<Entity: Test Entity 2 (Test Project 1 - Test Location 1)>",
 
                             "<Entity: Test Entity 3 (Test Project 1 - Test Location 2)>",
 
                             "<Entity: Test Subnet 4 (Test Project 1 - Test Location 2)>"]
 

	
 
        self.assertQuerysetEqual(form.fields["entity"].queryset, expected_entities, ordered=False)
 

	
 
    def test_success_url(self):
 
        """
 
        Validate that the success URL is set properly after update.
 
        """
 

	
 
        # Get the view.
 
        view = InterfaceUpdateView.as_view()
 

	
 
        # Get the interface object.
 
        interface = Interface.objects.get(pk=1)
 

	
 
        # Generate the request.
 
        post_data = {"name": interface.name, "description": interface.name,
 
                     "entity": "1", "address": "192.168.1.1",
 
                     "netmask": "255.255.255.255"}
 
        request = RequestFactory().post("/fake-path/", data=post_data)
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertEqual(response["Location"], reverse("entity", args=(1,)))
 
        self.assertEqual(response.status_code, 302)
 

	
 

	
 
class InterfaceDeleteViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = InterfaceDeleteView
 
    sufficient_permissions = ("delete_interface",)
 
    permission_test_view_kwargs = {"pk": 1}
 
    render_test_view_kwargs = {"pk": 1}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called for
 
        specific interface.
 
        """
 

	
 
        # Get the expected entity.
 
        interface = Interface.objects.get(pk=1)
 

	
 
        # Get the view.
 
        view = InterfaceDeleteView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, pk=1)
 

	
 
        self.assertEqual(response.context_data["interface"], interface)
 
        self.assertEqual(response.context_data["headline"], "Delete interface eth0")
 

	
 
    def test_message(self):
 
        """
 
        Tests if the message gets added when the interface is deleted.
 
        """
 

	
 
        # Get the view.
 
        view = InterfaceDeleteView.as_view()
 

	
 
        # Generate the request.
 
        request = RequestFactory().post("/fake-path/")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        request._messages = FakeMessages()
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertIn("Interface eth0 has been removed.", request._messages.messages)
 

	
 
    def test_success_url(self):
 
        """
 
        Validate that the success URL is set properly after delete.
 
        """
 

	
 
        # Get the view.
 
        view = InterfaceDeleteView.as_view()
 

	
 
        # Generate the request
 
        request = RequestFactory().post("/fake-path/")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 
        request._messages = FakeMessages()
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertEqual(response["Location"], reverse("entity", args=(1,)))
 

	
 

	
 
class CommunicationCreateViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = CommunicationCreateView
 
    sufficient_permissions = ("add_communication",)
 

	
 
    def setUp(self):
 
        """
 
        Sets-up some data necessary for testing.
 
        """
 

	
 
        # Set-up some data for testing.
 
        project1 = Project.objects.create(name="Test Project 1", description="This is test project 1.")
 
        project2 = Project.objects.create(name="Test Project 2", description="This is test project 2.")
 
        location = Location.objects.create(name="Test Location", description="This is test location.")
 
        entity1 = Entity.objects.create(name="Test Entity 1", description="This is test entity 1.", project=project1, location=location)
 
        entity2 = Entity.objects.create(name="Test Entity 2", description="This is test entity 2.", project=project1, location=location)
 
        entity3 = Entity.objects.create(name="Test Entity 3", description="This is test entity 3.", project=project2, location=location)
 
        Interface.objects.create(name="eth0", description="Main interface", entity=entity1, address="192.168.1.1", netmask="255.255.255.255")
 
        Interface.objects.create(name="eth0", description="Main interface", entity=entity2, address="192.168.1.2", netmask="255.255.255.255")
 
        Interface.objects.create(name="eth0", description="Main interface", entity=entity3, address="192.168.1.3", netmask="255.255.255.255")
 

	
 
    def test_interface_limit_from_entity(self):
 
        """
 
        Tests if the queryset is properly limitted if GET parameter is passed.
 
        """
 

	
 
        # Set-up the view.
 
        view = CommunicationCreateView()
 
        view.request = RequestFactory().get("/fake-path?from_entity=1")
 
        view.object = None
 

	
 
        # Get the form.
 
        form = view.get_form(view.get_form_class())
 

	
 
        # Set-up expected interfaces.
 
        expected_interfaces = ["<Interface: Test Entity 1 (192.168.1.1)>",
 
                               "<Interface: Test Entity 2 (192.168.1.2)>"]
 

	
 
        self.assertQuerysetEqual(form.fields["source"].queryset, expected_interfaces, ordered = False)
 
        self.assertQuerysetEqual(form.fields["destination"].queryset, expected_interfaces, ordered = False)
 

	
 
    def test_interface_limit_to_entity(self):
 
        """
 
        Tests if the queryset is properly limitted if GET parameter is passed.
 
        """
 

	
 
        # Set-up the view.
 
        view = CommunicationCreateView()
 
        view.request = RequestFactory().get("/fake-path?to_entity=1")
 
        view.object = None
 

	
 
        # Get the form.
 
        form = view.get_form(view.get_form_class())
 

	
 
        # Set-up expected interfaces.
 
        expected_interfaces = ["<Interface: Test Entity 1 (192.168.1.1)>",
 
                               "<Interface: Test Entity 2 (192.168.1.2)>"]
 

	
 
        self.assertQuerysetEqual(form.fields["source"].queryset, expected_interfaces, ordered=False)
 
        self.assertQuerysetEqual(form.fields["destination"].queryset, expected_interfaces, ordered=False)
 

	
 
    def test_interface_limit_project(self):
 
        """
 
        Tests if the queryset is properly limitted if GET parameter is passed.
 
        """
 

	
 
        # Set-up the view.
 
        view = CommunicationCreateView()
 
        view.request = RequestFactory().get("/fake-path?project=1")
 
        view.object = None
 

	
 
        # Get the form.
 
        form = view.get_form(view.get_form_class())
 

	
 
        # Set-up expected interfaces.
 
        expected_interfaces = ["<Interface: Test Entity 1 (192.168.1.1)>",
 
                               "<Interface: Test Entity 2 (192.168.1.2)>"]
 

	
 
        self.assertQuerysetEqual(form.fields["source"].queryset, expected_interfaces, ordered=False)
 
        self.assertQuerysetEqual(form.fields["destination"].queryset, expected_interfaces, ordered=False)
 

	
 
    def test_initial_from_entity(self):
 
        """
 
        Tests if the choice field for interface is defaulted to first interface
 
        of entity passed as part of GET parameters.
 
        """
 

	
 
        # Set-up the view.
 
        view = CommunicationCreateView()
 
        view.request = RequestFactory().get("/fake-path?from_entity=1")
 
        view.object = None
 

	
 
        # Get the expected interface ID.
 
        interface = Entity.objects.get(pk=1).interface_set.all()[0]
 

	
 
        # Fetch the initial values.
 
        initial = view.get_initial()
 

	
 
        self.assertDictContainsSubset({"source": interface.pk}, initial)
 

	
 
    def test_initial_to_entity(self):
 
        """
 
        Tests if the choice field for interface is defaulted to first interface
 
        of entity passed as part of GET parameters.
 
        """
 

	
 
        # Set-up the view.
 
        view = CommunicationCreateView()
 
        view.request = RequestFactory().get("/fake-path?to_entity=1")
 
        view.object = None
 

	
 
        # Get the expected interface ID.
 
        interface = Entity.objects.get(pk=1).interface_set.all()[0]
 

	
 
        # Fetch the initial value.
 
        initial = view.get_initial()
 

	
 
        self.assertDictContainsSubset({"destination": interface.pk}, initial)
 

	
 
    def test_initial_invalid_from_entity(self):
 
        """
 
        Tests if the choice fields for source and destination interfaces are not
 
        defaulted in case invalid entity ID is passed as GET parameter.
 
        """
 

	
 
        # Set-up the view.
 
        view = CommunicationCreateView()
 
        view.request = RequestFactory().get("/fake-path?from_entity=10")
 
        view.object = None
 

	
 
        # Get the initial values.
 
        initial = view.get_initial()
 

	
 
        self.assertEqual(len(initial), 0)
 

	
 
    def test_initial_invalid_to_entity(self):
 
        """
 
        Tests if the choice fields for source and destination interfaces are not
 
        defaulted in case invalid entity ID is passed as GET parameter.
 
        """
 

	
 
        # Set-up the view.
 
        view = CommunicationCreateView()
 
        view.request = RequestFactory().get("/fake-path?to_entity=10")
 
        view.object = None
 

	
 
        # Get the initial values.
 
        initial = view.get_initial()
 

	
 
        self.assertEqual(len(initial), 0)
 

	
 
    def test_initial_invalid_project(self):
 
        """
 
        Tests if the choice fields for source and destination interfaces are not
 
        defaulted in case invalid project ID is passed as GET parameter.
 
        """
 

	
 
        # Set-up the view.
 
        view = CommunicationCreateView()
 
        view.request = RequestFactory().get("/fake-path?project=10")
 
        view.object = None
 

	
 
        # Get the initial values.
 
        initial = view.get_initial()
 

	
 
        self.assertEqual(len(initial), 0)
 

	
 
    def test_success_url_next(self):
 
        """
 
        Validate that the success URL is set properly after communication is
 
        created if "next" GET parameter is provided.
 
        """
 

	
 
        # Get the view.
 
        view = CommunicationCreateView.as_view()
 

	
 
        # Generate the request.
 
        source = Interface.objects.get(pk=1)
 
        destination = Interface.objects.get(pk=2)
 
        post_data = {"source": source.pk,
 
                     "destination": destination.pk,
 
                     "protocol": "TCP",
 
                     "port": "22",
 
                     "description": "SSH."}
 
        request = RequestFactory().post("/fake-path?next=/next-page", data=post_data)
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        # Get the response.
 
        response = view(request)
 

	
 
        self.assertEqual(response["Location"], "/next-page")
 
        self.assertEqual(response.status_code, 302)
 

	
 
    def test_success_url_no_next(self):
 
        """
 
        Validate that the success URL is set properly after communication is
 
        created if no "next" GET parameter is provided.
 
        """
 

	
 
        # Get the view.
 
        view = CommunicationCreateView.as_view()
 

	
 
        # Generate the request.
 
        source = Interface.objects.get(pk=1)
 
        destination = Interface.objects.get(pk=2)
 
        post_data = {"source": source.pk,
 
                     "destination": destination.pk,
 
                     "protocol": "TCP",
 
                     "port": "22",
 
                     "description": "SSH."}
 
        request = RequestFactory().post("/fake-path", data=post_data)
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        # Get the response.
 
        response = view(request)
 

	
 
        self.assertEqual(response["Location"], reverse("project", args=(1,)))
 
        self.assertEqual(response.status_code, 302)
 

	
 

	
 
class CommunicationUpdateViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = CommunicationUpdateView
 
    sufficient_permissions = ("change_communication",)
 
    permission_test_view_kwargs = {"pk": 1}
 
    render_test_view_kwargs = {"pk": 1}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Verifies that the context is properly set-up.
 
        """
 

	
 
        # Get the view.
 
        view = CommunicationUpdateView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, pk=1)
 

	
 
        # Set-up expected interface.
 
        communication = Communication.objects.get(pk=1)
 

	
 
        self.assertEqual(response.context_data["communication"], communication)
 
        self.assertEqual(response.context_data["headline"], "Update communication Test Entity 2 -> Test Entity 1 (TCP:22)")
 

	
 
    def test_form_interface_limit(self):
 
        """
 
        Tests if the queryset is properly limitted to specific project's
 
        entity interfaces.
 
        """
 

	
 
        # Set-up the view.
 
        view = CommunicationUpdateView()
 
        view.request = RequestFactory().get("/fake-path/1")
 
        view.object = Communication.objects.get(pk=1)
 

	
 
        # Get the form.
 
        form = view.get_form(view.get_form_class())
 

	
 
        expected_interfaces = ["<Interface: Test Entity 1 (192.168.1.1)>",
 
                               "<Interface: Test Entity 2 (192.168.1.2)>",
 
                               "<Interface: Test Entity 3 (192.168.1.3)>",
 
                               "<Interface: Test Subnet 4 (10.10.4.0/255.255.255.0)>"]
 

	
 
        self.assertQuerysetEqual(form.fields["source"].queryset, expected_interfaces, ordered=False)
 
        self.assertQuerysetEqual(form.fields["destination"].queryset, expected_interfaces, ordered=False)
 

	
 
    def test_success_url_next(self):
 
        """
 
        Validate that the success URL is set properly after update if GET
 
        parameter is passed.
 
        """
 

	
 
        # Get the view.
 
        view = CommunicationUpdateView.as_view()
 

	
 
        # Get the communication object.
 
        communication = Communication.objects.get(pk=1)
 

	
 
        # Generate the request.
 
        post_data = {"source": communication.source.pk,
 
                     "destination": communication.destination.pk,
 
                     "protocol": communication.protocol,
 
                     "port": communication.port}
 
        request = RequestFactory().post("/fake-path?next=/next-page", data=post_data)
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertEqual(response["Location"], "/next-page")
 
        self.assertEqual(response.status_code, 302)
 

	
 
    def test_success_url_no_next(self):
 
        """
 
        Validate that the success URL is set properly after communication is
 
        created if no "next" GET parameter is provided.
 
        """
 

	
 
        # Get the view.
 
        view = CommunicationUpdateView.as_view()
 

	
 
        # Get the communication object.
 
        communication = Communication.objects.get(pk=1)
 

	
 
        # Generate the request.
 
        post_data = {"source": communication.source.pk,
 
                     "destination": communication.destination.pk,
 
                     "protocol": communication.protocol,
 
                     "port": communication.port}
 
        request = RequestFactory().post("/fake-path/", data=post_data)
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertEqual(response["Location"], reverse("project", args=(communication.source.entity.project.id,)))
 
        self.assertEqual(response.status_code, 302)
 

	
 

	
 
class CommunicationDeleteViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    view_class = CommunicationDeleteView
 
    sufficient_permissions = ("delete_communication",)
 
    permission_test_view_kwargs = {"pk": 1}
 
    render_test_view_kwargs = {"pk": 1}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_context(self):
 
        """
 
        Verifies that the context is properly set-up when the view is called for
 
        specific communication.
 
        """
 

	
 
        # Get the expected entity.
 
        communication = Communication.objects.get(pk=1)
 

	
 
        # Get the view.
 
        view = CommunicationDeleteView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, None, pk=1)
 

	
 
        self.assertEqual(response.context_data["communication"], communication)
 
        self.assertEqual(response.context_data["headline"], "Delete communication Test Entity 2 -> Test Entity 1 (TCP:22)")
 

	
 
    def test_message(self):
 
        """
 
        Tests if the message gets added when the communication is deleted.
 
        """
 

	
 
        # Get the view.
 
        view = CommunicationDeleteView.as_view()
 

	
 
        # Generate the request.
 
        request = RequestFactory().post("/fake-path/")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        request._messages = FakeMessages()
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertIn("Communication Test Entity 2 -> Test Entity 1 (TCP:22) has been removed.", request._messages.messages)
 

	
 
    def test_success_url_from_entity(self):
 
        """
 
        Validate that the success URL is set properly after delete.
 
        """
 

	
 
        # Get the view.
 
        view = CommunicationDeleteView.as_view()
 

	
 
        # Generate the request
 
        request = RequestFactory().post("/fake-path?from_entity=1")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 
        request._messages = FakeMessages()
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertEqual(response["Location"], reverse("entity", args=(1,)))
 

	
 
    def test_success_url_to_entity(self):
 
        """
 
        Validate that the success URL is set properly after delete.
 
        """
 

	
 
        # Get the view.
 
        view = CommunicationDeleteView.as_view()
 

	
 
        # Generate the request
 
        request = RequestFactory().post("/fake-path?to_entity=1")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 
        request._messages = FakeMessages()
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertEqual(response["Location"], reverse("entity", args=(1,)))
 

	
 
    def test_success_url_no_entity(self):
 
        """
 
        Validate that the success URL is set properly after delete.
 
        """
 

	
 
        # Get the view.
 
        view = CommunicationDeleteView.as_view()
 

	
 
        # Get the communication object.
 
        communication = Communication.objects.get(pk=1)
 

	
 
        # Generate the request
 
        request = RequestFactory().post("/fake-path")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 
        request._messages = FakeMessages()
 

	
 
        # Get the response.
 
        response = view(request, pk=1)
 

	
 
        self.assertEqual(response["Location"], reverse("entity", args=(communication.source.entity.pk,)))
 

	
 

	
 
class ProjectDiagramTest(PermissionTestMixin, TestCase):
 

	
 
    view_function = staticmethod(project_diagram)
 
    sufficient_permissions = ("view",)
 
    permission_test_view_kwargs = {"pk": "1"}
 
    render_test_view_kwargs = {"pk": "1"}
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_invalid_project(self):
 
        """
 
        Tests if a 404 is returned if no project was found (invalid ID).
 
        """
 

	
 
        # Set-up a request.
 
        request = create_get_request()
 

	
 
        # Get the view.
 
        view = project_diagram
 

	
 
        # Validate the response.
 
        self.assertRaises(Http404, view, request, pk=200)
 

	
 
    def test_content_type(self):
 
        """
 
        Test if correct content type is being returned by the response.
 
        """
 

	
 
        # Get the view.
 
        view = project_diagram
 

	
 
        # Get the response.
 
        response = generate_get_response(view, pk=1)
 

	
 
        self.assertEqual(response['Content-Type'], "image/svg+xml")
 

	
 
    def test_content(self):
 
        """
 
        Tests content produced by the view.
 
        """
 

	
 
        # Get the view.
 
        view = project_diagram
 

	
 
        # Get the response.
 
        response = generate_get_response(view, pk=1)
 

	
 
        self.assertContains(response, '"-//W3C//DTD SVG 1.1//EN"')
 
        self.assertContains(response, "Test Project 1")
 

	
 

	
 
class RedirectToNextMixinTest(TestCase):
 

	
 
    def test_request_with_next(self):
 
        """
 
        Test if the get_success_url returns correct URL if "next" is present in
 
        request's GET parameters.
 
        """
 

	
 
        # Generate the request.
 
        request = RequestFactory().post("/fake-path?next=/next")
 

	
 
        # Initialise the pseudo-view.
 
        view = RedirectToNextMixinView(request)
 

	
 
        self.assertEqual("/next", view.get_success_url())
 

	
 
    def test_request_without_next(self):
 
        """
 
        Test if the get_success_url returns correct URL if "next" is not present
 
        in request's GET parameters.
 
        """
 

	
 
        # Generate the request.
 
        request = RequestFactory().post("/fake-path")
 

	
 
        # Initialise the pseudo-view.
 
        view = RedirectToNextMixinView(request)
 

	
 
        self.assertEqual("/STATIC", view.get_success_url())
 

	
 
    def test_request_custom_parameter_name(self):
 
        """
 
        Test if the mixin honours the custom parameter name.
 
        """
 

	
 
        # Generate the request.
 
        request = RequestFactory().post("/fake-path?custom=/next")
 

	
 
        # Initialise the pseudo-view.
 
        view = RedirectToNextMixinView(request)
 
        view.next_parameter = "custom"
 

	
 
        self.assertEqual("/next", view.get_success_url())
 

	
 

	
 
class SearchViewTest(RenderTestMixin, PermissionTestMixin, TestCase):
 

	
 
    sufficient_permissions = ("view",)
 
    view_class = SearchView
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_empty_query_error_message(self):
 
        """
 
        Verifies that an error is reported to the user in case an empty query is
 
        submitted.
 
        """
 

	
 
        # Get the view.
 
        view = SearchView.as_view()
 

	
 
        # Generate the request
 
        request = RequestFactory().get("/fake-path?q=")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 
        request._messages = FakeMessages()
 

	
 
        # Get the response.
 
        response = view(request)
 

	
 
        self.assertIn("Search query is not allowed to be empty.", request._messages.messages)
 

	
 
    def test_strip_search_term(self):
 
        """
 
        Verifies that the search term is stripped when search is performed.
 
        """
 

	
 
        # Get the view.
 
        view = SearchView.as_view()
 

	
 
        # Set-up a request.
 
        search_term = " \t \t something with lots of tabs \t  \t"
 
        stripped_search_term = "something with lots of tabs"
 
        request = RequestFactory().get("/fake-path?q=%s" % urlquote(search_term))
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        # Get the response.
 
        response = view(request)
 

	
 
        # Validate the response.
 
        self.assertEqual(stripped_search_term, response.context_data["search_term"])
 

	
 
    def test_no_query_context(self):
 
        """
 
        Tests that context is not set if no query was sent.
 
        """
 

	
 
        # Get the view.
 
        view = SearchView.as_view()
 

	
 
        # Set-up a request.
 
        response = generate_get_response(view)
 

	
 
        self.assertNotIn("entities", response.context_data)
 
        self.assertNotIn("projects", response.context_data)
 
        self.assertNotIn("search_term", response.context_data)
 
        # Only the "view" context variable should be present.
 
        self.assertEqual(1, len(response.context_data))
 

	
 

	
 
class APISearchViewTest(PermissionTestMixin, TestCase):
 

	
 
    sufficient_permissions = ("view",)
 
    view_class = APISearchView
 

	
 
    def setUp(self):
 
        """
 
        Set-up some test data.
 
        """
 

	
 
        setup_test_data()
 

	
 
    def test_limit_negative(self):
 
        """
 
        Test if an exception is raised in case a negative limit is requested.
 
        """
 

	
 
        # Get the view.
 
        view = APISearchView.as_view()
 

	
 
        # Generate the request.
 
        request = RequestFactory().get("/fake-path?limit=-1")
 
        request.user = mock.Mock()
 
        request._dont_enforce_csrf_checks = True
 

	
 
        # Validate the response.
 
        self.assertRaisesRegexp(ValidationError, "Limit may not be a negative value.", view, request, search_term="test")
 

	
 
    def test_empty_query(self):
 
        """
 
        Test that the response is empty if empty query was sent.
 
        """
 

	
 
        # Get the view.
 
        view = APISearchView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, search_term="")
 

	
 
        self.assertEqual(response['Content-Type'], "application/json")
 
        self.assertEqual(response.content, "[]")
 

	
 
    def test_strip_search_term(self):
 
        """
 
        Verifies that the search term is stripped when search is performed.
 
        """
 

	
 
        # Get the view.
 
        view = APISearchView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, search_term="Test Entity 1")
 

	
 
        # Validate the response.
 
        expected_content = """[{"project": "Test Project 1", "url": "/conntrackt/entity/1/", "type": "entity", "name": "Test Entity 1"}]"""
 
        self.assertEqual(response.content, expected_content)
 

	
 
    def test_no_items(self):
 
        """
 
        Test the response if no items are found.
 
        """
 

	
 
        # Get the view.
 
        view = APISearchView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, search_term="string that does not exist")
 

	
 
        self.assertEqual(response['Content-Type'], "application/json")
 
        self.assertEqual(response.content, "[]")
 

	
 
    def test_entity_found(self):
 
        """
 
        Test the response if a single entity is found.
 
        """
 

	
 
        # Get the view.
 
        view = APISearchView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, search_term="Test Entity 1")
 

	
 
        expected_content = """[{"project": "Test Project 1", "url": "/conntrackt/entity/1/", "type": "entity", "name": "Test Entity 1"}]"""
 

	
 
        self.assertEqual(response['Content-Type'], "application/json")
 
        self.assertEqual(response.content, expected_content)
 

	
 
    def test_project_found(self):
 
        """
 
        Test the response if a single project is found.
 
        """
 

	
 
        # Get the view.
 
        view = APISearchView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, search_term="Test Project 1")
 

	
 
        expected_content = """[{"project": "Test Project 1", "url": "/conntrackt/project/1/", "type": "project", "name": "Test Project 1"}]"""
 

	
 
        self.assertEqual(response['Content-Type'], "application/json")
 
        self.assertEqual(response.content, expected_content)
 

	
 
    def test_multiple_items_found(self):
 
        """
 
        Test the response if multiple items are found.
 
        """
 

	
 
        # Get the view.
 
        view = APISearchView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, search_term="Test")
 

	
 
        # Verify that the JSON reply is valid.
 
        try:
 
            items = json.loads(response.content)
 
        except ValueError:
 
            self.fail("Parsing of resulting JSON has failed")
 

	
 
        # Verify that a list of items was returned.
 
        self.assertTrue(isinstance(items, list))
 

	
 
        # Verify each item.
 
        for item in items:
 
            # Every item must be a dictionary.
 
            self.assertTrue(isinstance(item, dict))
 
            keys = item.keys()
 
            # Verify that 4 specific keys are present in dictionary (project,
 
            #  url, name, type).
 
            self.assertEqual(len(keys), 4)
 
            self.assertIn("project", keys)
 
            self.assertIn("name", keys)
 
            self.assertIn("url", keys)
 
            self.assertIn("type", keys)
 
            # Verify the type associated with item.
 
            self.assertIn(item["type"], ["project", "entity"])
 

	
 
    def test_content_type(self):
 
        """
 
        Test if correct content type is being returned by the response.
 
        """
 

	
 
        # Get the view.
 
        view = APISearchView.as_view()
 

	
 
        # Get the response.
 
        response = generate_get_response(view, search_term="test")
 

	
 
        self.assertEqual(response['Content-Type'], "application/json")
conntrackt/views.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
#
 
# Copyright (C) 2013 Branko Majic
 
#
 
# This file is part of Django Conntrackt.
 
#
 
# Django Conntrackt is free software: you can redistribute it and/or modify it
 
# under the terms of the GNU General Public License as published by the Free
 
# Software Foundation, either version 3 of the License, or (at your option) any
 
# later version.
 
#
 
# Django Conntrackt is distributed in the hope that it will be useful, but
 
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 
# details.
 
#
 
# You should have received a copy of the GNU General Public License along with
 
# Django Conntrackt.  If not, see <http://www.gnu.org/licenses/>.
 
#
 

	
 

	
 
# Standard library imports.
 
from StringIO import StringIO
 
from zipfile import ZipFile, ZIP_DEFLATED
 
import json
 

	
 
# Django imports.
 
from django.contrib.auth.decorators import permission_required
 
from django.contrib import messages
 
from django.core.exceptions import ValidationError
 
from django.core.urlresolvers import reverse, reverse_lazy
 
from django.db.models import Q
 
from django.db.models.deletion import Collector
 
from django.http import HttpResponse
 
from django.shortcuts import render_to_response, get_object_or_404
 
from django.urls import reverse, reverse_lazy
 
from django.views.generic import TemplateView, DetailView, CreateView, UpdateView, DeleteView, View
 

	
 
# Third-party application imports.
 
from braces.views import MultiplePermissionsRequiredMixin, SetHeadlineMixin
 

	
 
# Application imports.
 
from .forms import ProjectForm, LocationForm, EntityForm, InterfaceForm, CommunicationForm
 
from .models import Project, Entity, Location, Interface, Communication
 
from .utils import generate_entity_iptables, generate_project_diagram
 

	
 

	
 
class RedirectToNextMixin(object):
 
    """
 
    View mixin that can be used for redirecting the user to URL defined through
 
    a GET parameter. The mixin is usable with Create/Update/Delete views that
 
    utilise the get_success_url() call.
 

	
 
    The mixin accepts the following class options:
 

	
 
        next_parameter - Name of the GET parameter that contains the redirect
 
        URL. Defaults to "next".
 
    """
 

	
 
    next_parameter = "next"
 

	
 
    def get_success_url(self):
 
        """
 
        Returns the success URL to which the user will be redirected.
 
        """
 

	
 
        return self.request.GET.get(self.next_parameter, super(RedirectToNextMixin, self).get_success_url())
 

	
 

	
 
class RelatedItemsMixin(object):
 
    """
 
    View mixin that adds related items of a referenced model object to context
 
    data in form of a nested list, including the reference model object as the
 
    first element of a list.
 

	
 
    The context data will be passed using the "related_items" key.
 

	
 
    This data can be used in the template by passing it through the
 
    unordered_list template tag.
 

	
 
    The reference object is accessed via "object" property of calling view
 
    (i.e. self.object).
 

	
 
    For more details on implementation, see:
 

	
 
    RelatedCollectorMixin.get_dependant_objects_representation method
 
    """
 

	
 
    def get_context_data(self, **kwargs):
 
        """
 
        Adds the related items of a reference model object to context data.
 

	
 
        Returns:
 
          Context data.
 
        """
 

	
 
        # Set the context using the parent class.
 
        context = super(RelatedItemsMixin, self).get_context_data(**kwargs)
 

	
 
        # Add to context the nested list of string representations of related
 
        # items.
 
        context["related_items"] = self.object.get_dependant_objects_representation()
 

	
 
        return context
 

	
 

	
 
class IndexView(MultiplePermissionsRequiredMixin, TemplateView):
 
    """
 
    Custom view used for rendering the index page.
 
    """
 

	
 
    template_name = 'conntrackt/index.html'
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.view",),
 
        }
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get_context_data(self, **kwargs):
 
        """
 
        Returns the context data that should be used for rendering of the
 
        template.
 

	
 
        Adds the 'projects' context object containing all of the projects
 
        available sorted aplhabetically by name.
 
        """
 

	
 
        # Set the context using the parent class.
 
        context = super(IndexView, self).get_context_data(**kwargs)
 

	
 
        # Store information about all projcts in context.
 
        context['projects'] = Project.objects.all().order_by('name')
 

	
 
        # Store information about all locations in context.
 
        context['locations'] = Location.objects.all().order_by('name')
 

	
 
        return context
 

	
 

	
 
class ProjectView(MultiplePermissionsRequiredMixin, DetailView):
 
    """
 
    Custom view for presenting the project information.
 
    """
 

	
 
    model = Project
 

	
 
    permissions = {
 
        "all": ("conntrackt.view",),
 
        }
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get_context_data(self, **kwargs):
 
        """
 
        Returns the context data that should be used for rendering of the
 
        template.
 

	
 
        Adds the 'location_entities' context object that contains tuples of the
 
        form '(location, entities)', where location is an instance of a
 
        Location, and entities is a query set of entities that belong to that
 
        particular location, and to related project.
 
        """
 

	
 
        # Set the context using the parent class.
 
        context = super(ProjectView, self).get_context_data(**kwargs)
 

	
 
        # Set-up an array that will contaion (location, entities) tuples.
 
        location_entities = []
 

	
 
        # Add the (location, entities) tuple for each location that has entities
 
        # belonging to the related project.
 
        for location in Location.objects.filter(entity__project=self.object).distinct().order_by("name"):
 
            entities = Entity.objects.filter(project=self.object, location=location)
 
            location_entities.append((location, entities))
 

	
 
        # Add the (location, entities) tuples to context.
 
        context['location_entities'] = location_entities
 

	
 
        # Add all project communications to context.
 
        context['communications'] = self.object.get_project_communications_summary()
 

	
 
        # Finally return the context.
 
        return context
 

	
 

	
 
class EntityView(MultiplePermissionsRequiredMixin, DetailView):
 
    """
 
    Custom view for presenting entity information.
 
    """
 

	
 
    # Optimise the query to fetch the related data from reverse relationships.
 
    queryset = Entity.objects.all()
 
    queryset = queryset.prefetch_related('interface_set__destination_set__source__entity')
 
    queryset = queryset.prefetch_related('interface_set__destination_set__destination__entity')
 
    queryset = queryset.prefetch_related('interface_set__source_set__source__entity')
 
    queryset = queryset.prefetch_related('interface_set__source_set__destination__entity')
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.view",),
 
        }
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get_context_data(self, **kwargs):
 
        """
 
        Returns the context data that should be used for rendering of the
 
        template.
 

	
 
        Adds the 'entity_iptables' context object that contains full iptables
 
        rules generated for the entity.
 
        """
 

	
 
        # Call the parent class method.
 
        context = super(EntityView, self).get_context_data(**kwargs)
 

	
 
        # Add the rendered iptables rules to the context.
 
        context['entity_iptables'] = generate_entity_iptables(self.object)
 

	
 
        # Add the incoming and outgoing commmunication to the context.
 
        context["incoming_communications"] = self.object.incoming_communications()
 
        context["outgoing_communications"] = self.object.outgoing_communications()
 

	
 
        # Add the interfaces to the context.
 
        context["interfaces"] = self.object.interface_set.all().order_by("name")
 

	
 
        # Add project/location to the context.
 
        context["project"] = self.object.project
 
        context["location"] = self.object.location
 

	
 
        return context
 

	
 

	
 
@permission_required("conntrackt.view", raise_exception=True)
 
def entity_iptables(request, pk):
 
    """
 
    Custom view that returns response containing iptables rules generated for an
 
    entity.
 

	
 
    Makes sure to set the Content-Disposition of a response in order to
 
    signal the browser it should start download of this view's response
 
    immediately. Also sets the suggested filename for it.
 

	
 
    Arguments:
 

	
 
        pk - Primary key of the Entity object for which the rules should be
 
        generated.
 

	
 
    Returns:
 

	
 
        Response object that contains the iptables rules for specified entity.
 
    """
 

	
 
    # Fetch the entity, and construct the response with iptables rules as
 
    # content.
 
    entity = get_object_or_404(Entity, pk=pk)
 
    content = generate_entity_iptables(entity)
 
    response = HttpResponse(content, content_type='text/plain')
 

	
 
    # Add the Content-Disposition information for the browser, telling the
 
    # browser to download the file with suggested filename.
 
    response['Content-Disposition'] = "attachment; filename=%s-iptables.conf" % entity.name.lower().replace(" ", "_")
 

	
 
    return response
 

	
 

	
 
@permission_required("conntrackt.view", raise_exception=True)
 
def project_iptables(request, project_id, location_id=None):
 
    """
 
    Custom view for obtaining iptables for all entities of a project or project
 
    location in a single ZIP file.
 

	
 
    Arguments:
 

	
 
        request - Request object.
 

	
 
        project_id - Unique ID of the project for whose entities the iptables
 
        rules should be generated.
 

	
 
        location_id - Optional unique ID of the project location for whose
 
        entities the iptables rules should be generated. Default is None, which
 
        means generate rules for _all_ entities in a project.
 

	
 
    Returns:
 

	
 
        Response object that contains the ZIP file and Content-Disposition
 
        information.
 
    """
 

	
 
    # Fetch the project.
 
    project = get_object_or_404(Project, pk=project_id)
 

	
 
    # Set-up a string IO object to which we'll write the ZIP file (in-memory).
 
    buff = StringIO()
 

	
 
    # Create a new ZIP file in-memory.
 
    zipped_iptables = ZipFile(buff, "w", ZIP_DEFLATED)
 

	
 
    # Create the response object, setting the mime type so browser could offer
 
    # to open the file with program as well.
 
    response = HttpResponse(content_type='application/zip')
 

	
 
    # If specific location was specified, get the entities that are part of that
 
    # project location only, otherwise fetch all of the project's entities. Also
 
    # set-up the filename that will be suggested to the browser.
 
    if location_id:
 
        location = get_object_or_404(Location, pk=location_id)
 
        entities = project.entity_set.filter(location=location)
 
        filename = '%s-%s-iptables.zip' % (project.name.lower().replace(" ", "_"), location.name.lower().replace(" ", "_"))
 
    else:
 
        entities = project.entity_set.all()
 
        filename = '%s-iptables.zip' % (project.name.lower().replace(" ", "_"))
 

	
 
    # Render iptables rules for each entity, placing them in the ZIP archive.
 
    for entity in entities:
 
        entity_iptables = generate_entity_iptables(entity)
 
        zipped_iptables.writestr("%s-iptables.conf" % entity.name.lower().replace(" ", "_"), entity_iptables)
 

	
 
    # Close the archive, and flush the buffer.
 
    zipped_iptables.close()
 
    buff.flush()
 

	
 
    # Write the contents of our buffer (ZIP archive) to response content, and
 
    # close the IO string.
 
    response.write(buff.getvalue())
 
    buff.close()
 

	
 
    # Set the Content-Disposition so the browser would know it should download
 
    # the archive, and suggest the filename.
 
    response['Content-Disposition'] = 'attachment; filename="%s"' % filename
 

	
 
    # Finally return the response object.
 
    return response
 

	
 

	
 
class ProjectCreateView(RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, CreateView):
 
    """
 
    View for creating a new project.
 
    """
 

	
 
    model = Project
 
    form_class = ProjectForm
 
    headline = "Add new project"
 
    template_name = "conntrackt/create_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.add_project",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 

	
 
class ProjectUpdateView(RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, UpdateView):
 
    """
 
    View for modifying an existing project.
 
    """
 

	
 
    model = Project
 
    form_class = ProjectForm
 
    template_name = "conntrackt/update_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.change_project",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get_headline(self):
 
        """
 
        Set headline based on project name.
 
        """
 

	
 
        return "Update project %s" % self.object.name
 

	
 

	
 
class ProjectDeleteView(RelatedItemsMixin, RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, DeleteView):
 
    """
 
    View for deleting a project.
 
    """
 

	
 
    model = Project
 
    template_name = "conntrackt/delete_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.delete_project",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    success_url = reverse_lazy("index")
 

	
 
    def post(self, *args, **kwargs):
 
        """
 
        Add a success message that will be displayed to the user to confirm the
 
        project deletion.
 
        """
 

	
 
        messages.success(self.request, "Project %s has been removed." % self.get_object().name, extra_tags="alert alert-success")
 

	
 
        return super(ProjectDeleteView, self).post(*args, **kwargs)
 

	
 
    def get_headline(self):
 
        """
 
        Set headline based on project name.
 
        """
 

	
 
        return "Delete project %s" % self.object.name
 

	
 

	
 
class LocationCreateView(RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, CreateView):
 
    """
 
    View for creating a new location.
 
    """
 

	
 
    model = Location
 
    form_class = LocationForm
 
    headline = "Add new location"
 
    template_name = "conntrackt/create_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.add_location",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    success_url = reverse_lazy("index")
 

	
 

	
 
class LocationUpdateView(RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, UpdateView):
 
    """
 
    View for modifying an existing location.
 
    """
 

	
 
    model = Location
 
    form_class = LocationForm
 
    template_name = "conntrackt/update_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.change_location",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    success_url = reverse_lazy("index")
 

	
 
    def get_headline(self):
 
        """
 
        Set headline based on location name.
 
        """
 

	
 
        return "Update location %s" % self.object.name
 

	
 

	
 
class LocationDeleteView(RelatedItemsMixin, RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, DeleteView):
 
    """
 
    View for deleting a location.
 
    """
 

	
 
    model = Location
 
    template_name = "conntrackt/delete_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.delete_location",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    success_url = reverse_lazy("index")
 

	
 
    def post(self, *args, **kwargs):
 
        """
 
        Add a success message that will be displayed to the user to confirm the
 
        location deletion.
 
        """
 

	
 
        messages.success(self.request, "Location %s has been removed." % self.get_object().name, extra_tags="alert alert-success")
 

	
 
        return super(LocationDeleteView, self).post(*args, **kwargs)
 

	
 
    def get_headline(self):
 
        """
 
        Set headline based on location name.
 
        """
 

	
 
        return "Delete location %s" % self.object.name
 

	
 

	
 
class EntityCreateView(RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, CreateView):
 
    """
 
    View for creating a new entity.
 
    """
 

	
 
    model = Entity
 
    form_class = EntityForm
 
    headline = "Add new entity"
 
    template_name = "conntrackt/create_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.add_entity",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get_form(self, form_class=EntityForm):
 
        """
 
        Returns an instance of form that can be used by the view.
 

	
 
        The method will limit the project or location select inputs if request
 
        contained this information.
 
        """
 

	
 
        form = super(EntityCreateView, self).get_form(form_class)
 

	
 
        # Limit the project selection if required.
 
        project_id = self.request.GET.get("project", None)
 
        if project_id:
 
            form.fields["project"].queryset = Project.objects.filter(pk=project_id)
 
            form.fields["project"].empty_label = None
 

	
 
        # Limit the location selection if required.
 
        location_id = self.request.GET.get("location", None)
 
        if location_id:
 
            form.fields["location"].queryset = Location.objects.filter(pk=location_id)
 
            form.fields["location"].empty_label = None
 

	
 
        return form
 

	
 
    def get_initial(self):
 
        """
 
        Returns initial values that should be pre-selected (if they were
 
        specified through a GET parameter).
 
        """
 

	
 
        initial = super(EntityCreateView, self).get_initial()
 

	
 
        initial["project"] = self.request.GET.get("project", None)
 
        initial["location"] = self.request.GET.get("location", None)
 

	
 
        return initial
 

	
 

	
 
class EntityUpdateView(RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, UpdateView):
 
    """
 
    View for updating an existing entity.
 
    """
 

	
 
    model = Entity
 
    form_class = EntityForm
 
    template_name = "conntrackt/update_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.change_entity",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get_headline(self):
 
        """
 
        Set headline based on entity name.
 
        """
 

	
 
        return "Update entity %s" % self.object.name
 

	
 

	
 
class EntityDeleteView(RelatedItemsMixin, RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, DeleteView):
 
    """
 
    View for deleting an entity.
 
    """
 

	
 
    model = Entity
 
    template_name = "conntrackt/delete_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.delete_entity",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def post(self, *args, **kwargs):
 
        """
 
        Add a success message that will be displayed to the user to confirm the
 
        entity deletion.
 
        """
 

	
 
        messages.success(self.request, "Entity %s has been removed." % self.get_object().name, extra_tags="alert alert-success")
 

	
 
        return super(EntityDeleteView, self).post(*args, **kwargs)
 

	
 
    def delete(self, *args, **kwargs):
 
        """
 
        Deletes the object. This method is overridden in order to obtain the
 
        project ID for success URL.
 

	
 
        @TODO: Fix this once Django 1.6 comes out with fix from ticket 19044.
 
        """
 

	
 
        self.success_url = reverse("project", args=(self.get_object().project.id,))
 

	
 
        return super(EntityDeleteView, self).delete(*args, **kwargs)
 

	
 
    def get_headline(self):
 
        """
 
        Set headline based on entity name.
 
        """
 

	
 
        return "Delete entity %s" % self.object.name
 

	
 

	
 
class InterfaceCreateView(RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, CreateView):
 
    """
 
    View for creating a new interface.
 
    """
 

	
 
    model = Interface
 
    form_class = InterfaceForm
 
    headline = "Add new interface"
 
    template_name = "conntrackt/create_form.html"
 

	
 
    # Required permissions
 
    permissions = {
 
        "all": ("conntrackt.add_interface",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get_form(self, form_class=InterfaceForm):
 
        """
 
        Returns an instance of form that can be used by the view.
 

	
 
        The method will limit the entity select input if request contained this
 
        information.
 
        """
 

	
 
        form = super(InterfaceCreateView, self).get_form(form_class)
 

	
 
        # Limit the entity selection if required.
 
        entity_id = self.request.GET.get("entity", None)
 
        if entity_id:
 
            form.fields["entity"].queryset = Entity.objects.filter(pk=entity_id)
 
            form.fields["entity"].empty_label = False
 

	
 
        return form
 

	
 
    def get_initial(self):
 
        """
 
        Returns initial values that should be pre-selected (if they were
 
        specified through a GET parameter).
 
        """
 

	
 
        initial = super(InterfaceCreateView, self).get_initial()
 

	
 
        initial["entity"] = self.request.GET.get("entity", None)
 

	
 
        return initial
 

	
 
    def get_success_url(self):
 
        """
 
        Returns the URL to which the user should be redirected after an
 
        interface has been created.
 

	
 
        The URL in this case will be set to entity's details page.
 
        """
 

	
 
        return reverse("entity", args=(self.object.entity.pk,))
 

	
 

	
 
class InterfaceUpdateView(RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, UpdateView):
 
    """
 
    View for updating an existing interface.
 
    """
 

	
 
    model = Interface
 
    form_class = InterfaceForm
 
    template_name = "conntrackt/update_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.change_interface",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get_form(self, form_class=InterfaceForm):
 
        """
 
        Returns an instance of form that can be used by the view.
 

	
 
        The method will limit the entities that can be selected for the
 
        interface to the ones that belong to the same project as the currently
 
        set entity.
 
        """
 

	
 
        form = super(InterfaceUpdateView, self).get_form(form_class)
 

	
 
        # Limit the entities to same project.
 
        form.fields["entity"].queryset = Entity.objects.filter(project=self.object.entity.project)
 

	
 
        return form
 

	
 
    def get_success_url(self):
 
        """
 
        Returns the URL to which the user should be redirected after an
 
        interface has been updated.
 

	
 
        The URL in this case will be set to entity's details page.
 
        """
 

	
 
        return reverse("entity", args=(self.object.entity.pk,))
 

	
 
    def get_headline(self):
 
        """
 
        Set headline based on interface name.
 
        """
 

	
 
        return "Update interface %s" % self.object.name
 

	
 

	
 
class InterfaceDeleteView(RelatedItemsMixin, RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, DeleteView):
 
    """
 
    View for deleting an interface.
 
    """
 

	
 
    model = Interface
 
    template_name = "conntrackt/delete_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.delete_interface",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def post(self, *args, **kwargs):
 
        """
 
        Add a success message that will be displayed to the user to confirm the
 
        interface deletion.
 
        """
 

	
 
        messages.success(self.request, "Interface %s has been removed." % self.get_object().name, extra_tags="alert alert-success")
 

	
 
        return super(InterfaceDeleteView, self).post(*args, **kwargs)
 

	
 
    def delete(self, *args, **kwargs):
 
        """
 
        Deletes the object. This method is overridden in order to obtain the
 
        entity ID for success URL.
 

	
 
        @TODO: Fix this once Django 1.6 comes out with fix from ticket 19044.
 
        """
 

	
 
        self.success_url = reverse("entity", args=(self.get_object().entity.id,))
 

	
 
        return super(InterfaceDeleteView, self).delete(*args, **kwargs)
 

	
 
    def get_headline(self):
 
        """
 
        Set headline based on interface name.
 
        """
 

	
 
        return "Delete interface %s" % self.object.name
 

	
 

	
 
class CommunicationCreateView(RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, CreateView):
 
    """
 
    View for creating a new communication.
 
    """
 

	
 
    model = Communication
 
    form_class = CommunicationForm
 
    headline = "Add new communication"
 
    template_name = "conntrackt/create_form.html"
 

	
 
    # Required permissions
 
    permissions = {
 
        "all": ("conntrackt.add_communication",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get_form(self, form_class=CommunicationForm):
 
        """
 
        Returns an instance of form that can be used by the view.
 

	
 
        The method will limit the source and destination interface selection to
 
        interfaces belonging to the same project as provided entity ID (if any
 
        was provided).
 
        """
 

	
 
        form = super(CommunicationCreateView, self).get_form(form_class)
 

	
 
        # Limit the interface selection based on provided source entity,
 
        # destination entity, or project.
 
        entity_id = self.request.GET.get("from_entity", None) or self.request.GET.get("to_entity", None)
 
        project_id = self.request.GET.get("project", None)
 

	
 
        if project_id:
 
            form.fields["source"].queryset = Interface.objects.filter(entity__project=project_id)
 
            form.fields["destination"].queryset = Interface.objects.filter(entity__project=project_id)
 
        elif entity_id:
 
            entity = Entity.objects.get(pk=1)
 
            form.fields["source"].queryset = Interface.objects.filter(entity__project=entity.project)
 
            form.fields["destination"].queryset = Interface.objects.filter(entity__project=entity.project)
 

	
 
        return form
 

	
 
    def get_initial(self):
 
        """
 
        Returns initial values that should be pre-selected (if they were
 
        specified through a GET parameter).
 
        """
 

	
 
        initial = super(CommunicationCreateView, self).get_initial()
 

	
 
        # If source or destination entity were specified in request, fetch the
 
        # first interface from them and use it as initial source and destination.
 
        from_entity = self.request.GET.get("from_entity", None)
 
        to_entity = self.request.GET.get("to_entity", None)
 

	
 
        if from_entity:
 
            try:
 
                interface = Interface.objects.filter(entity=from_entity)[0]
 
                initial["source"] = interface.id
 
            except IndexError:
 
                pass
 

	
 
        if to_entity:
 
            try:
 
                interface = Interface.objects.filter(entity=to_entity)[0]
 
                initial["destination"] = interface.id
 
            except IndexError:
 
                pass
 

	
 
        return initial
 

	
 
    def get_success_url(self):
 
        """
 
        Returns the URL to which the user should be redirected after a
 
        communication has been created.
 

	
 
        The URL will either point to value provided via GET parameter "next", or
 
        to project page to which the communication belongs.
 
        """
 

	
 
        # We must set the success URL to something first.
 
        self.success_url = reverse("project", args=(self.object.source.entity.project.pk,))
 

	
 
        # This will override the URL if parameter "next" was provided (from
 
        # RedirectToNextMixin).
 
        success_url = super(CommunicationCreateView, self).get_success_url()
 

	
 
        return success_url
 

	
 

	
 
class CommunicationUpdateView(RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, UpdateView):
 
    """
 
    View for updating an existing communication.
 
    """
 

	
 
    model = Communication
 
    form_class = CommunicationForm
 
    template_name = "conntrackt/update_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.change_communication",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get_form(self, form_class=CommunicationForm):
 
        """
 
        Returns an instance of form that can be used by the view.
 

	
 
        The method will limit the source and destination interfaces that can be
 
        selected for the communication. Both will be limited to interfaces
 
        coming from entities that belong to the same project as current
 
        communication's source interface.
 
        """
 

	
 
        form = super(CommunicationUpdateView, self).get_form(form_class)
 

	
 
        project = self.object.source.entity.project
 

	
 
        form.fields["source"].queryset = Interface.objects.filter(entity__project=project)
 
        form.fields["destination"].queryset = Interface.objects.filter(entity__project=project)
 

	
 
        return form
 

	
 
    def get_success_url(self):
 
        """
 
        Returns the URL to which the user should be redirected after a
 
        communication has been created.
 

	
 
        The URL will either point to value provided via GET parameter "next", or
 
        to project page to which the communication belongs.
 
        """
 

	
 
        return self.request.GET.get("next", reverse("project", args=(self.object.source.entity.project.pk,)))
 

	
 
    def get_headline(self):
 
        """
 
        Set headline based on communication.
 
        """
 

	
 
        return "Update communication %s" % self.object
 

	
 

	
 
class CommunicationDeleteView(RelatedItemsMixin, RedirectToNextMixin, SetHeadlineMixin, MultiplePermissionsRequiredMixin, DeleteView):
 
    """
 
    View for deleting an communication.
 
    """
 

	
 
    model = Communication
 
    template_name = "conntrackt/delete_form.html"
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.delete_communication",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def post(self, *args, **kwargs):
 
        """
 
        Add a success message that will be displayed to the user to confirm the
 
        communication deletion.
 
        """
 

	
 
        messages.success(self.request, "Communication %s has been removed." % self.get_object(), extra_tags="alert alert-success")
 

	
 
        return super(CommunicationDeleteView, self).post(*args, **kwargs)
 

	
 
    def delete(self, *args, **kwargs):
 
        """
 
        Deletes the object. This method is overridden in order to obtain the
 
        entity ID for success URL.
 

	
 
        @TODO: Fix this once Django 1.6 comes out with fix from ticket 19044.
 
        """
 

	
 
        entity_id = self.request.GET.get("from_entity", None) or self.request.GET.get("to_entity", None) or self.get_object().source.entity.pk
 

	
 
        self.success_url = reverse("entity", args=(entity_id,))
 

	
 
        return super(CommunicationDeleteView, self).delete(*args, **kwargs)
 

	
 
    def get_headline(self):
 
        """
 
        Set headline based on communication.
 
        """
 

	
 
        return "Delete communication %s" % self.object
 

	
 

	
 
@permission_required("conntrackt.view", raise_exception=True)
 
def project_diagram(request, pk):
 
    """
 
    Custom view that returns response containing diagram of project
 
    communications.
 

	
 
    The diagram will include coloured entities, with directional lines
 
    connecting the source and destination end entities.
 

	
 
    The output format is SVG.
 

	
 
    Arguments:
 

	
 
        request - Request object.
 

	
 
        pk - Project ID for which the diagram should be generated.
 

	
 
    Returns:
 

	
 
        Response object that contains the project diagram rendered as SVG.
 
    """
 

	
 
    # Fetch the project.
 
    project = get_object_or_404(Project, pk=pk)
 

	
 
    # Generate the diagram.
 
    content = generate_project_diagram(project).create_svg()
 

	
 
    # Set the mime type.
 
    response = HttpResponse(content, content_type='image/svg+xml')
 

	
 
    # Return the response object.
 
    return response
 

	
 

	
 
class SearchView(MultiplePermissionsRequiredMixin, TemplateView):
 
    """
 
    Custom view used for rendering the search (results) page.
 
    """
 

	
 
    template_name = 'conntrackt/search.html'
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.view",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get_context_data(self, **kwargs):
 
        """
 
        Returns the context data that should be used for rendering of the
 
        template.
 

	
 
        Adds context objects:
 
          - 'entities', which is a list of entities that had the search term in
 
            their name or description.
 
          - 'projects', which is a list of entities that had the search term in
 
            their name or description.
 
          - 'search_term', which is a string of previous query that brought the
 
            user to page (if any). The term will be stripped from leading and
 
            trailing spaces/tabs.
 
        """
 

	
 
        # Set the context using the parent aclass.
 
        context = super(SearchView, self).get_context_data(**kwargs)
 

	
 
        # Retrieve the search term, and strip it if it was provided.
 
        search_term = self.request.GET.get("q", None)
 
        if search_term:
 
            search_term = search_term.strip()
 

	
 
        # Do not allow empty searches.
 
        if search_term == "":
 
            messages.error(self.request, "Search query is not allowed to be empty.", extra_tags="alert alert-error")
 
        # Set-up the context objects if search was sent. Otherwise empty search
 
        # page will be shown.
 
        elif search_term is not None:
 
            context['search_term'] = search_term
 
            context['entities'] = Entity.objects.search(search_term)
 
            context['projects'] = Project.objects.search(search_term)
 

	
 
        return context
 

	
 

	
 
class APISearchView(MultiplePermissionsRequiredMixin, View):
 
    """
 
    API view implementing search for entities and projects that match the
 
    provided search term.
 

	
 
    The output generated by the view uses JSON. The result will include a list
 
    of matched items, where each item is a dictionary with the following keys:
 

	
 
      - name (name of the matched item)
 
      - project (project to which the item belongs)
 
      - type (type of the matched item)
 
      - url (URL towards the matched item)
 
    """
 

	
 
    # Required permissions.
 
    permissions = {
 
        "all": ("conntrackt.view",),
 
        }
 

	
 
    # Raise authorisation denied exception for unmet permissions.
 
    raise_exception = True
 

	
 
    def get(self, request, search_term=""):
 
        """
 
        Implements response handling for a GET request.
 
        """
 

	
 
        # Retrieve the search term, and strip it if it was provided.
 
        if search_term:
 
            search_term = search_term.strip()
 

	
 
        # Set-up a list that will contain found items.
 
        items = []
 

	
 
        # Fetch the maximum number of items that should be returned.
 
        limit = int(request.GET.get("limit", 0))
 
        if limit < 0:
 
            raise ValidationError("Limit may not be a negative value.")
 

	
 
        # Don't perform search with empty search term.
 
        if search_term != "":
 

	
 
            # Run the search on entities and projects.
 
            entities = Entity.objects.search(search_term).select_related("project")
 
            projects = Project.objects.search(search_term)
 

	
 
            # If maximum number of items was provided, narrow-down the results.
 
            if limit > 0:
 
                entities = entities[:limit]
 
                projects = projects[:limit]
 

	
 
            # Add found entities.
 
            for entity in entities:
 
                items.append({"name": entity.name,
 
                              "project": entity.project.name,
 
                              "type": "entity",
 
                              "url": entity.get_absolute_url()})
 

	
 
            # Add found projects.
 
            for project in projects:
 
                items.append({"name": project.name,
 
                              "project": project.name,
 
                              "type": "project",
 
                              "url": project.get_absolute_url()})
 

	
 
        # Generate the JSON response.
 
        content = json.dumps(items)
 
        response = HttpResponse(content, content_type="application/json")
 

	
 
        # Return the response.
 
        return response
requirements/base.in
Show inline comments
 
#
 
# Copyright (C) 2017 Branko Majic
 
#
 
# This file is part of Django Conntrackt.
 
#
 
# Django Conntrackt is free software: you can redistribute it and/or modify it
 
# under the terms of the GNU General Public License as published by the Free
 
# Software Foundation, either version 3 of the License, or (at your option) any
 
# later version.
 
#
 
# Django Conntrackt is distributed in the hope that it will be useful, but
 
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 
# details.
 
#
 
# You should have received a copy of the GNU General Public License along with
 
# Django Conntrackt.  If not, see <http://www.gnu.org/licenses/>.
 
#
 

	
 
# Convenience mixins for Django (for authentication etc).
 
django-braces~=1.12.0
 

	
 
# Convenience tools for controlling rendering of forms while embracing DRY.
 
django-crispy-forms~=1.6.0
 
django-crispy-forms~=1.7.0
 

	
 
# Web framework used by application.
 
django~=1.10.0
 
django~=1.11.0
 

	
 
# Library for programatic calculation of colours (contrasts,
 
# inversions etc).
 
palette~=0.2.0
 

	
 
# Interaface towards Graphviz for chart generation.
 
pydot~=1.2.0
requirements/development.txt
Show inline comments
 
#
 
# This file is autogenerated by pip-compile
 
# To update, run:
 
#
 
#    pip-compile --output-file requirements/development.txt requirements/development.in
 
#
 
alabaster==0.7.10         # via sphinx
 
babel==2.5.1              # via sphinx
 
certifi==2017.11.5        # via requests
 
chardet==3.0.4            # via requests
 
coverage==4.4.2
 
django-braces==1.12.0
 
django-crispy-forms==1.6.1
 
django==1.10.8
 
django-crispy-forms==1.7.0
 
django==1.11.8
 
docutils==0.14            # via sphinx
 
factory-boy==2.1.2
 
funcsigs==1.0.2           # via mock
 
idna==2.6                 # via requests
 
imagesize==0.7.1          # via sphinx
 
jinja2==2.10              # via sphinx
 
markupsafe==1.0           # via jinja2
 
mock==1.3.0
 
palette==0.2
 
pbr==3.1.1                # via mock
 
pydot==1.2.3
 
pygments==2.2.0           # via sphinx
 
pyparsing==2.2.0          # via pydot
 
pytz==2017.3
 
requests==2.18.4          # via sphinx
 
six==1.11.0               # via mock, sphinx
 
snowballstemmer==1.2.1    # via sphinx
 
sphinx==1.6.5
 
sphinxcontrib-websupport==1.0.1  # via sphinx
 
typing==3.6.2             # via sphinx
 
urllib3==1.22             # via requests
requirements/test.txt
Show inline comments
 
#
 
# This file is autogenerated by pip-compile
 
# To update, run:
 
#
 
#    pip-compile --output-file requirements/test.txt requirements/test.in
 
#
 
alabaster==0.7.10         # via sphinx
 
babel==2.5.1              # via sphinx
 
certifi==2017.11.5        # via requests
 
chardet==3.0.4            # via requests
 
coverage==4.4.2
 
django-braces==1.12.0
 
django-crispy-forms==1.6.1
 
django==1.10.8
 
django-crispy-forms==1.7.0
 
django==1.11.8
 
docutils==0.14            # via sphinx
 
factory-boy==2.1.2
 
funcsigs==1.0.2           # via mock
 
idna==2.6                 # via requests
 
imagesize==0.7.1          # via sphinx
 
jinja2==2.10              # via sphinx
 
markupsafe==1.0           # via jinja2
 
mock==1.3.0
 
palette==0.2
 
pbr==3.1.1                # via mock
 
pydot==1.2.3
 
pygments==2.2.0           # via sphinx
 
pyparsing==2.2.0          # via pydot
 
pytz==2017.3
 
requests==2.18.4          # via sphinx
 
six==1.11.0               # via mock, sphinx
 
snowballstemmer==1.2.1    # via sphinx
 
sphinx==1.6.5
 
sphinxcontrib-websupport==1.0.1  # via sphinx
 
typing==3.6.2             # via sphinx
 
urllib3==1.22             # via requests
setup.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
#
 
# Copyright (C) 2013 Branko Majic
 
#
 
# This file is part of Django Conntrackt.
 
#
 
# Django Conntrackt is free software: you can redistribute it and/or modify it
 
# under the terms of the GNU General Public License as published by the Free
 
# Software Foundation, either version 3 of the License, or (at your option) any
 
# later version.
 
#
 
# Django Conntrackt is distributed in the hope that it will be useful, but
 
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 
# details.
 
#
 
# You should have received a copy of the GNU General Public License along with
 
# Django Conntrackt.  If not, see <http://www.gnu.org/licenses/>.
 
#
 

	
 

	
 
import os
 
from setuptools import setup, find_packages
 

	
 
README = open(os.path.join(os.path.dirname(__file__), 'README.rst')).read()
 
INSTALL_REQUIREMENTS = [
 
    "django-braces~=1.12.0",
 
    "django-crispy-forms~=1.6.0",
 
    "django~=1.10.0",
 
    "django-crispy-forms~=1.7.0",
 
    "django~=1.11.0",
 
    "palette~=0.2.0",
 
    "pydot~=1.2.0",
 
]
 

	
 
# allow setup.py to be run from any path
 
os.chdir(os.path.normpath(os.path.join(os.path.abspath(__file__), os.pardir)))
 

	
 
setup(
 
    name='django-conntrackt',
 
    version='dev',
 
    packages=find_packages(exclude=["testproject", "testproject.*"]),
 
    include_package_data=True,
 
    license='GPLv3+',
 
    description='A simple application for tracking connection requirements between different entities in a network.',
 
    long_description=README,
 
    url='http://projects.majic.rs/conntrackt',
 
    author='Branko Majic',
 
    author_email='branko@majic.rs',
 
    install_requires=INSTALL_REQUIREMENTS,
 
    classifiers=[
 
        'Development Status :: 4 - Beta',
 
        'Environment :: Web Environment',
 
        'Framework :: Django',
 
        'Intended Audience :: System Administrators',
 
        'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)',
 
        'Operating System :: OS Independent',
 
        'Programming Language :: Python :: 2.7',
 
        'Topic :: Internet :: WWW/HTTP',
 
        'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
 
        'Topic :: System :: Networking :: Firewalls',
 
    ],
 
)
0 comments (0 inline, 0 general)