Files
@ 348cf5e17195
Branch filter:
Location: conntrackt/conntrackt/tests/test_iptables.py
348cf5e17195
5.4 KiB
text/x-python
CONNT-22: Clarified license a bit. Added release notes.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | # Django imports.
from django.test import TestCase
# Application imports.
from conntrackt import iptables
class RuleTest(TestCase):
def test_output_case(self):
"""
Test that protocol name is lower-cased during rule generation.
"""
rule = iptables.Rule("192.168.1.1/255.255.255.255", "192.168.1.2/255.255.255.255", "tCp", "80", description="Web server.")
self.assertEqual(str(rule), "-s 192.168.1.1/255.255.255.255 -d 192.168.1.2/255.255.255.255 -p tcp -m tcp --dport 80 -j ACCEPT")
def test_output_tcp(self):
"""
Tests that a TCP rule is generated properly.
"""
rule = iptables.Rule("192.168.1.1/255.255.255.255", "192.168.1.2/255.255.255.255", "TCP", "80", description="Web server.")
self.assertEqual(str(rule), "-s 192.168.1.1/255.255.255.255 -d 192.168.1.2/255.255.255.255 -p tcp -m tcp --dport 80 -j ACCEPT")
def test_output_udp(self):
"""
Tests that a UDP rule is generated properly.
"""
rule = iptables.Rule("192.168.1.1/255.255.255.255", "192.168.1.2/255.255.255.255", "UDP", "53", description="DNS server.")
self.assertEqual(str(rule), "-s 192.168.1.1/255.255.255.255 -d 192.168.1.2/255.255.255.255 -p udp -m udp --dport 53 -j ACCEPT")
def test_output_icmp(self):
"""
Tests that an ICMP rule is generated properly.
"""
rule = iptables.Rule("192.168.1.1/255.255.255.255", "192.168.1.2/255.255.255.255", "ICMP", "8", description="Ping.")
self.assertEqual(str(rule), "-s 192.168.1.1/255.255.255.255 -d 192.168.1.2/255.255.255.255 -p icmp -m icmp --icmp-type 8 -j ACCEPT")
def test_unsupported_protocol(self):
"""
Tests that unsupported protocol will raise an exception.
"""
self.assertRaises(ValueError, iptables.Rule, "192.168.1.1/255.255.255.255", "192.168.1.2/255.255.255.255",
"NONEXIST", "8", description="Non-existing")
class LoopbackRuleTest(TestCase):
def test_output(self):
"""
Tests that a loopback rule is generated properly.
"""
rule = iptables.LoopbackRule()
self.assertEqual(str(rule), "-i lo -j ACCEPT")
class RelatedRuleTest(TestCase):
def test_output(self):
"""
Tests that a related rule is generated properly.
"""
rule = iptables.RelatedRule()
self.assertEqual(str(rule), "-m state --state RELATED,ESTABLISHED -j ACCEPT")
class ChainTest(TestCase):
def test_output_empty(self):
"""
Test generation of empty chain.
"""
chain = iptables.Chain("INPUT", "ACCEPT")
self.assertEqual(str(chain), ":INPUT ACCEPT [0:0]\n")
def test_unsupported_protocol(self):
"""
Tests that unsupported target will raise an exception.
"""
self.assertRaises(ValueError, iptables.Chain, "INPUT", "NOTARGET")
def test_add_rule(self):
"""
Tests that the rule is being added to the chain properly.
"""
chain = iptables.Chain("INPUT", "ACCEPT")
rule = iptables.Rule("192.168.1.1/255.255.255.255", "192.168.1.2/255.255.255.255", "TCP", "22", "SSH")
chain.add_rule(rule)
self.assertItemsEqual(chain.rules, [rule])
def test_output(self):
"""
Tests that a chain is generated properly.
"""
chain = iptables.Chain("INPUT", "ACCEPT")
chain.add_rule(iptables.Rule("192.168.1.1/255.255.255.255", "192.168.1.2/255.255.255.255", "TCP", "80", "Web server"))
chain.add_rule(iptables.Rule("192.168.1.1/255.255.255.255", "192.168.1.3/255.255.255.255", "TCP", "80", "Web server"))
chain.add_rule(iptables.Rule("192.168.1.1/255.255.255.255", "192.168.1.2/255.255.255.255", "TCP", "22", "SSH"))
chain.add_rule(iptables.Rule("192.168.1.1/255.255.255.255", "192.168.1.3/255.255.255.255", "TCP", "22", ""))
expected_output = """:INPUT ACCEPT [0:0]
-A INPUT -s 192.168.1.1/255.255.255.255 -d 192.168.1.3/255.255.255.255 -p tcp -m tcp --dport 22 -j ACCEPT
# SSH
-A INPUT -s 192.168.1.1/255.255.255.255 -d 192.168.1.2/255.255.255.255 -p tcp -m tcp --dport 22 -j ACCEPT
# Web server
-A INPUT -s 192.168.1.1/255.255.255.255 -d 192.168.1.2/255.255.255.255 -p tcp -m tcp --dport 80 -j ACCEPT
-A INPUT -s 192.168.1.1/255.255.255.255 -d 192.168.1.3/255.255.255.255 -p tcp -m tcp --dport 80 -j ACCEPT
"""
self.assertEqual(str(chain), expected_output)
class TableTest(TestCase):
def test_output_empty(self):
"""
Tests that an empty table is generated properly.
"""
table = iptables.Table("filter")
self.assertEqual(str(table), "*filter\nCOMMIT\n")
def test_output(self):
"""
Tests that a table is generated properly.
"""
table = iptables.Table("filter")
table.add_chain(iptables.Chain("INPUT", "ACCEPT"))
table.add_chain(iptables.Chain("OUTPUT", "ACCEPT"))
table.add_chain(iptables.Chain("FORWARD", "ACCEPT"))
expected_output = """*filter
:INPUT ACCEPT [0:0]
:OUTPUT ACCEPT [0:0]
:FORWARD ACCEPT [0:0]
COMMIT
"""
self.assertEqual(str(table), expected_output)
def test_add_chain(self):
"""
Tests that the chain is being added to the table properly.
"""
table = iptables.Table("filter")
chain = iptables.Chain("INPUT", "ACCEPT")
table.add_chain(chain)
self.assertItemsEqual(table.chains, [chain])
|