#!/usr/bin/env python3 from nftables import Nftables from os import path import sys import json NFT = Nftables() NAME = path.basename(__file__) TABLE_NAME = "inet filter" def error(message, code): print("%s: %s" % (NAME, message), file = sys.stderr) exit(code) def error_no_arg(arg_name): error("no %s provided" % (arg_name), 1) def error_invalid_arg(arg_name, value): error('invalid %s "%s"' % (arg_name, value), 2) def usage(): print(""" {name}: dumb Python wrapper for nftables o_O usage: {name} list - show currently allowed ports {name} help - show this :) {name} allow|deny - add or remove a port from the filtered sets """.format(name = NAME).strip()) def get_nft_set_for(protocol): return json.loads(NFT.cmd("list set %s allowed_%s" % (TABLE_NAME, protocol))[1])["nftables"][1]["set"] def show_ports(): NFT.set_json_output(True) tcp_ports = get_nft_set_for("tcp") udp_ports = get_nft_set_for("udp") if "elem" in tcp_ports: print("Allowed TCP ports : %s" % (tcp_ports["elem"])) if "elem" in udp_ports: print("Allowed UDP ports: %s" % (udp_ports["elem"])) def allowed_ports(operation, protocol, port): if operation != "add" and operation != "delete": raise ValueError("not a valid nftables operation") if protocol != "tcp" and protocol != "udp": raise ValueError("not a valid protocol") if port < 0 or port > 65535: raise ValueError("not a valid port number") NFT.cmd("%s element %s allowed_%s { %d }" % (operation, TABLE_NAME, protocol, port)) def manage_ports(action, args): match action: case "allow": operation = "add" case "deny": operation = "delete" case _: raise ValueError("invalid action") try: protocol = args.pop(0) except IndexError: error_no_arg("protocol") try: port = args.pop(0) except IndexError: error_no_arg("port") try: port = int(port) except ValueError: error_invalid_arg("port", port) try: allowed_ports(operation, protocol, port) except ValueError as e: error(e, 3) if __name__ == "__main__": sys.argv.pop(0) try: action = sys.argv.pop(0) except IndexError: error_no_arg("action") match action: case "help": usage() case "allow" | "deny": manage_ports(action, sys.argv) case "list": show_ports() case _: error_invalid_arg("action", action)