Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 105 additions & 7 deletions webappanalyzer/webappanalyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,61 @@


class WebAppAnalyzer:
def __init__(self, update: bool = False, path: pathlib.Path = pathlib.Path("data")):
def __init__(self, update: bool = False, path: pathlib.Path = pathlib.Path("data"), resolve_categories: bool = False):
self._json_path: pathlib.Path = path
self._resolve_categories_enabled: bool = resolve_categories
path.mkdir(parents=True, exist_ok=True)

json_list = list(string.ascii_lowercase)
json_list.append("_")
categories_filename: str = "categories.json"
groups_filename: str = "groups.json"
expected_files: set[str] = {f"{j}.json" for j in json_list}
if self._resolve_categories_enabled:
expected_files.add(categories_filename)
expected_files.add(groups_filename)
existing_files: set[str] = {entry.name for entry in path.iterdir() if entry.is_file()}

if len(list(path.iterdir())) != len(json_list) or update:
if update or not expected_files.issubset(existing_files):
for j in json_list:
with requests.get(f"https://raw.githubusercontent.com/enthec/webappanalyzer/main/src/technologies/{j}.json", stream=True) as r:
with path.joinpath(f"{j}.json").open("wb") as t:
for chunk in r.iter_content(chunk_size=8192):
t.write(chunk)
if self._resolve_categories_enabled:
with requests.get("https://raw.githubusercontent.com/enthec/webappanalyzer/main/src/categories.json", stream=True) as r:
with path.joinpath(categories_filename).open("wb") as t:
for chunk in r.iter_content(chunk_size=8192):
t.write(chunk)
with requests.get("https://raw.githubusercontent.com/enthec/webappanalyzer/main/src/groups.json", stream=True) as r:
with path.joinpath(groups_filename).open("wb") as t:
for chunk in r.iter_content(chunk_size=8192):
t.write(chunk)

self._categories_by_id: dict[int, dict[str, Any]] = {}
categories_path: pathlib.Path = path.joinpath(categories_filename)
if self._resolve_categories_enabled and categories_path.exists():
with categories_path.open("rb") as categories_file:
for category_id, category_data in ijson.kvitems(categories_file, ""):
try:
parsed_category_id: int = int(category_id)
except (TypeError, ValueError):
continue
self._categories_by_id[parsed_category_id] = {
"name": category_data.get("name"),
"groups": self._normalize_int_list(category_data.get("groups", []))
}

self._groups_by_id: dict[int, str] = {}
groups_path: pathlib.Path = path.joinpath(groups_filename)
if self._resolve_categories_enabled and groups_path.exists():
with groups_path.open("rb") as groups_file:
for group_id, group_data in ijson.kvitems(groups_file, ""):
try:
parsed_group_id: int = int(group_id)
except (TypeError, ValueError):
continue
self._groups_by_id[parsed_group_id] = group_data.get("name")

self.version_regexp = re.compile(r"^(?:(?P<prefix>.*)?\\(?P<group>\d+)(?:\?(?P<first>.*)?:(?P<second>.*)?)?|(?P<fixed>[a-zA-Z0-9.]+)?)$")
cpe_regex: str = r"""cpe:2\.3:[aho\*\-](:(((\?*|\*?)([a-zA-Z0-9\-\._]|(\\[\\\*\?!"#$$%&'\(\)\+,/:;<=>@\[\]\^`\{\|}~]))+(\?*|\*?))|[\*\-])){5}(:(([a-zA-Z]{2,3}(-([a-zA-Z]{2}|[0-9]{3}))?)|[\*\-]))(:(((\?*|\*?)([a-zA-Z0-9\-\._]|(\\[\\\*\?!"#$$%&'\(\)\+,/:;<=>@\[\]\^`\{\|}~]))+(\?*|\*?))|[\*\-])){4}"""
Expand All @@ -36,14 +78,17 @@ def analyze(self, webpage: WebPage):
detectors: dict[str, list] = self._prepare_detectors(content)
detection_result: dict[str, Any] = self.detect(detectors, webpage)
if detection_result.get("match"):
detected.append({
detection: dict[str, Any] = {
"tech": technology,
"confidence": min(detection_result.get('confidence'), 100),
"cpe": content.get('cpe'),
"implies": detectors.get('implies'),
"requires": [impl.lower() for impl in content.get("requires", [])],
"versions": detection_result.get("versions")
})
}
if self._resolve_categories_enabled:
detection["cats"] = self._normalize_int_list(content.get("cats", []))
detected.append(detection)
resync: bool = True
while resync:
resync: bool = False
Expand All @@ -70,14 +115,17 @@ def analyze(self, webpage: WebPage):
for technology, content in ijson.kvitems(tech_file, ""):
if technology.lower() == new_tech.lower():
resync: bool = True
detected.append({
detection: dict[str, Any] = {
"tech": technology,
"confidence": 100,
"cpe": content.get("cpe"),
"implies": [impl.lower() for impl in content.get("implies", [])],
"requires": [impl.lower() for impl in content.get("requires", [])],
"versions": []
})
}
if self._resolve_categories_enabled:
detection["cats"] = self._normalize_int_list(content.get("cats", []))
detected.append(detection)
to_add.clear()

tech_names: set[str] = {tech.get("tech").lower() for tech in detected}
Expand All @@ -97,6 +145,13 @@ def analyze(self, webpage: WebPage):
d.pop("confidence")
d.pop("versions")
d.pop("requires")
if self._resolve_categories_enabled:
category_data: dict[str, list[Any]] = self._resolve_categories(d.get("cats", []))
d["category_ids"] = category_data.get("category_ids", [])
d["categories"] = category_data.get("categories", [])
d["group_ids"] = category_data.get("group_ids", [])
d["groups"] = category_data.get("groups", [])
d.pop("cats", None)
if d.get("cpe") and d["version"]:
d["cpe"] = ":".join(d["cpe"].split(":")[:5]+[d["version"]]+d["cpe"].split(":")[6:])
if not self._is_cpe_valid(d.get("cpe")):
Expand Down Expand Up @@ -259,6 +314,49 @@ def _format_version(self, current_match: re.Match, version: str) -> Optional[str
final_version: Optional[str] = None
return final_version

@staticmethod
def _normalize_int_list(values: Any) -> list[int]:
if not isinstance(values, list):
return []
normalized_values: list[int] = []
for value in values:
try:
parsed_value: int = int(value)
except (TypeError, ValueError):
continue
normalized_values.append(parsed_value)
return normalized_values

def _resolve_categories(self, category_ids: list[int]) -> dict[str, list[Any]]:
normalized_ids: list[int] = []
for category_id in category_ids:
if category_id not in normalized_ids:
normalized_ids.append(category_id)

categories: list[str] = []
group_ids: list[int] = []
for category_id in normalized_ids:
category_meta: dict[str, Any] = self._categories_by_id.get(category_id, {})
category_name: Optional[str] = category_meta.get("name")
if category_name:
categories.append(category_name)
for group_id in category_meta.get("groups", []):
if group_id not in group_ids:
group_ids.append(group_id)

groups: list[str] = []
for group_id in group_ids:
group_name: Optional[str] = self._groups_by_id.get(group_id)
if group_name:
groups.append(group_name)

return {
"category_ids": normalized_ids,
"categories": categories,
"group_ids": group_ids,
"groups": groups,
}

def _prepare_detectors(self, tech_content: dict):
clean: dict[str, list] = {}
clean["headers"] = self._process_object(tech_content.get("headers", {}))
Expand Down Expand Up @@ -386,4 +484,4 @@ def __ge__(self, other):
def __ne__(self, other):
return mycmp(self.obj, other.obj) != 0

return CmpToKey
return CmpToKey