75 lines
1.7 KiB
Text
75 lines
1.7 KiB
Text
|
#!/usr/bin/env python3
|
||
|
from nftables import Nftables
|
||
|
from os import path
|
||
|
import sys
|
||
|
|
||
|
NFT = Nftables()
|
||
|
NAME = path.basename(__file__)
|
||
|
|
||
|
def error(message, code):
|
||
|
print("%s: %s" % (NAME, message), file = sys.stderr)
|
||
|
exit(code)
|
||
|
|
||
|
def error_no_arg(arg_name):
|
||
|
error("no %s provided" % (arg_name), 1)
|
||
|
|
||
|
def error_invalid_arg(arg_name, value):
|
||
|
error('invalid %s "%s"' % (arg_name, value), 2)
|
||
|
|
||
|
def allowed_ports(operation, protocol, port):
|
||
|
if operation != "add" and operation != "delete":
|
||
|
raise ValueError("not a valid nftables operation")
|
||
|
|
||
|
if protocol != "tcp" and protocol != "udp":
|
||
|
raise ValueError("not a valid protocol")
|
||
|
|
||
|
if port < 0 or port > 65535:
|
||
|
raise ValueError("not a valid port number")
|
||
|
|
||
|
NFT.cmd("%s element inet filter allowed_%s { %d }" % (operation, protocol, port))
|
||
|
|
||
|
def manage_ports(action, args):
|
||
|
match action:
|
||
|
case "allow":
|
||
|
operation = "add"
|
||
|
case "deny":
|
||
|
operation = "delete"
|
||
|
case _:
|
||
|
raise ValueError("invalid action")
|
||
|
|
||
|
try:
|
||
|
protocol = args.pop(0)
|
||
|
except IndexError:
|
||
|
error_no_arg("protocol")
|
||
|
|
||
|
try:
|
||
|
port = args.pop(0)
|
||
|
except IndexError:
|
||
|
error_no_arg("port")
|
||
|
|
||
|
try:
|
||
|
port = int(port)
|
||
|
except ValueError:
|
||
|
error_invalid_arg("port", port)
|
||
|
|
||
|
try:
|
||
|
allowed_ports(operation, protocol, port)
|
||
|
except ValueError as e:
|
||
|
error(e, 3)
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
sys.argv.pop(0)
|
||
|
|
||
|
try:
|
||
|
action = sys.argv.pop(0)
|
||
|
except IndexError:
|
||
|
error_no_arg("action")
|
||
|
|
||
|
match action:
|
||
|
case "help":
|
||
|
usage()
|
||
|
case "allow" | "deny":
|
||
|
manage_ports(action, sys.argv)
|
||
|
case _:
|
||
|
error_invalid_arg("action", action)
|