1
0
Fork 0
forked from ahurac/dotfiles
ahuarc-dotfiles/bin/nyfthon

103 lines
2.6 KiB
Text
Raw Normal View History

#!/usr/bin/env python3
from nftables import Nftables
from os import path
import sys
2023-12-16 22:18:32 +01:00
import json
NFT = Nftables()
NAME = path.basename(__file__)
2023-12-16 22:18:32 +01:00
TABLE_NAME = "inet filter"
def error(message, code):
print("%s: %s" % (NAME, message), file = sys.stderr)
exit(code)
def error_no_arg(arg_name):
error("no %s provided" % (arg_name), 1)
def error_invalid_arg(arg_name, value):
error('invalid %s "%s"' % (arg_name, value), 2)
2023-12-16 22:18:32 +01:00
def usage():
print("""
{name}: dumb Python wrapper for nftables o_O
usage:
{name} list - show currently allowed ports
{name} help - show this :)
{name} allow|deny <protocol> <port number> - add or remove a port from the filtered sets
""".format(name = NAME).strip())
def get_nft_set_for(protocol):
return json.loads(NFT.cmd("list set %s allowed_%s" % (TABLE_NAME, protocol))[1])["nftables"][1]["set"]
def show_ports():
NFT.set_json_output(True)
tcp_ports = get_nft_set_for("tcp")
udp_ports = get_nft_set_for("udp")
if "elem" in tcp_ports:
print("Allowed TCP ports : %s" % (tcp_ports["elem"]))
if "elem" in udp_ports:
print("Allowed UDP ports: %s" % (udp_ports["elem"]))
def allowed_ports(operation, protocol, port):
if operation != "add" and operation != "delete":
raise ValueError("not a valid nftables operation")
if protocol != "tcp" and protocol != "udp":
raise ValueError("not a valid protocol")
if port < 0 or port > 65535:
raise ValueError("not a valid port number")
2023-12-16 22:18:32 +01:00
NFT.cmd("%s element %s allowed_%s { %d }" % (operation, TABLE_NAME, protocol, port))
def manage_ports(action, args):
match action:
case "allow":
operation = "add"
case "deny":
operation = "delete"
case _:
raise ValueError("invalid action")
try:
protocol = args.pop(0)
except IndexError:
error_no_arg("protocol")
try:
port = args.pop(0)
except IndexError:
error_no_arg("port")
try:
port = int(port)
except ValueError:
error_invalid_arg("port", port)
try:
allowed_ports(operation, protocol, port)
except ValueError as e:
error(e, 3)
if __name__ == "__main__":
sys.argv.pop(0)
try:
action = sys.argv.pop(0)
except IndexError:
error_no_arg("action")
match action:
case "help":
usage()
case "allow" | "deny":
manage_ports(action, sys.argv)
2023-12-16 22:18:32 +01:00
case "list":
show_ports()
case _:
error_invalid_arg("action", action)