diff --git a/scripts/eve-parity.py b/scripts/eve-parity.py new file mode 100755 index 0000000000..07b25ebf6f --- /dev/null +++ b/scripts/eve-parity.py @@ -0,0 +1,164 @@ +#! /usr/bin/env python3 +# +# Tool for checking parity between the EVE schema and Suricata +# keywords. +# +# Usage: ./scripts/eve-parity.py [missing|having] +# +# ## unmapped-keywords +# +# Display all known keywords that are not mapped to an EVE field. +# +# ## unmapped-fields +# +# Display all eve fields that do not have a keyword mapping. +# +# ## mapped-fields +# +# Display all EVE fields that have a keyword mapping. + + +import sys +import subprocess +import json +import argparse + + +def main(): + parser = argparse.ArgumentParser(description="EVE Parity Check Tool") + parser.add_argument( + "command", choices=["mapped-fields", "unmapped-keywords", "unmapped-fields"] + ) + args = parser.parse_args() + + keywords = load_known_keywords() + keys = load_schema() + + if args.command == "mapped-fields": + mapped_fields(keywords, keys) + elif args.command == "unmapped-keywords": + unmapped_keywords(keywords, keys) + elif args.command == "unmapped-fields": + unmapped_fields(keywords, keys) + + +def unmapped_keywords(keywords, keys): + """Report known keywords that are not mapped to an EVE field.""" + schema_keywords = set() + for key in keys.keys(): + if "keywords" in keys[key] and keys[key]["keywords"]: + for keyword in keys[key]["keywords"]: + schema_keywords.add(keyword) + unmapped = keywords - schema_keywords + for keyword in sorted(unmapped): + print(keyword) + + +def unmapped_fields(keywords, keys): + with_missing = set() + + for key in keys.keys(): + if "keywords" not in keys[key]: + with_missing.add(key) + + # Print sorted. + for key in sorted(with_missing): + print(key) + + +def mapped_fields(keywords, keys): + for key in keys.keys(): + if "keywords" in keys[key] and keys[key]["keywords"]: + for keyword in keys[key]["keywords"]: + if keyword not in keywords: + errprint("ERROR: Unknown keyword: {}".format(keyword)) + print("{} -> [{}]".format(key, ", ".join(keys[key]["keywords"]))) + + +def load_schema(): + schema = json.load(open("etc/schema.json")) + stack = [(schema, [])] + keys = {} + + while stack: + (current, path) = stack.pop(0) + + for name, props in current["properties"].items(): + if "$ref" in props: + ref = find_ref(schema, props["$ref"]) + if not ref: + raise Exception("$ref not found: {}".format(props["$ref"])) + props = ref + if props["type"] in ["string", "integer", "boolean", "number"]: + # End of the line... + key = ".".join(path + [name]) + keys[key] = props.get("suricata", {}) + elif props["type"] == "object": + # An object can set "suricata.keywords" to false to + # disable descending into it. For examples, "stats". + keywords = props.get("suricata", {}).get("keywords") + if keywords is False: + # print("Skipping object {}, keywords disabled".format(".".join(path + [name]))) + continue + + if "properties" in props: + stack.insert(0, (props, path + [name])) + else: + # May want to warn that this object has no properties. + key = ".".join(path + [name]) + keys[key] = {} + elif props["type"] == "array": + if "items" in props and "type" in props["items"]: + if "properties" in props["items"]: + stack.insert( + 0, + ( + props["items"], + path + ["{}".format(name)], + ), + ) + else: + # May want to warn that this array has no properties. + key = ".".join(path + [name]) + keys[key] = {} + else: + # May want to warn that this array has no items. + key = ".".join(path + [name]) + keys[key] = {} + else: + raise Exception("Unsupported type: {}".format(props["type"])) + + return keys + + +def load_known_keywords(): + keywords = set() + result = subprocess.check_output(["./src/suricata", "--list-keywords=csv"]) + lines = result.decode().split("\n") + # Skip first line, as its a header line. + for line in lines[1:]: + parts = line.split(";") + if parts: + keywords.add(parts[0]) + return keywords + + +def errprint(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) + + +def find_ref(schema: dict, ref: str) -> dict: + parts = ref.split("/") + + root = parts.pop(0) + if root != "#": + raise Exception("Unsupported reference: {}".format(ref)) + + while parts: + schema = schema[parts.pop(0)] + + return schema + + +if __name__ == "__main__": + sys.exit(main())