Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port https_availability bit → nibble #4023

Open
wants to merge 13 commits into
base: feature/nibbles
Choose a base branch
from
15 changes: 0 additions & 15 deletions octopoes/bits/https_availability/bit.py

This file was deleted.

21 changes: 0 additions & 21 deletions octopoes/bits/https_availability/https_availability.py

This file was deleted.

4 changes: 4 additions & 0 deletions octopoes/nibbles/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def __hash__(self):
def _ini(self) -> dict[str, Any]:
return {"id": self.id, "enabled": self.enabled, "checksum": self._checksum}

@property
def triggers(self) -> set[type[OOI]]:
return {sgn.object_type for sgn in self.signature if issubclass(sgn.object_type, OOI)}


def get_nibble_definitions() -> dict[str, NibbleDefinition]:
nibble_definitions = {}
Expand Down
2 changes: 1 addition & 1 deletion octopoes/nibbles/disallowed_csp_hostnames/nibble.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def query(targets: list[Reference | None]) -> str:


NIBBLE = NibbleDefinition(
id="disallowed-csp-hostnames",
id="disallowed_csp_hostnames",
signature=[
NibbleParameter(object_type=HTTPHeaderHostname, parser="[*][?object_type == 'HTTPHeaderHostname'][]"),
NibbleParameter(object_type=Config, parser="[*][?object_type == 'Config'][]", optional=True),
Expand Down
19 changes: 19 additions & 0 deletions octopoes/nibbles/https_availability/https_availability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from collections.abc import Iterator

from octopoes.models import OOI
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.network import IPAddress, IPPort
from octopoes.models.ooi.web import Website


def nibble(ipaddress: IPAddress, ipport80: IPPort, website: Website, port443s: int) -> Iterator[OOI]:
_ = ipaddress
_ = ipport80
if port443s < 1:
ft = KATFindingType(id="KAT-HTTPS-NOT-AVAILABLE")
yield ft
yield Finding(
ooi=website.reference,
finding_type=ft.reference,
description="HTTP port is open, but HTTPS port is not open",
)
66 changes: 66 additions & 0 deletions octopoes/nibbles/https_availability/nibble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from nibbles.definitions import NibbleDefinition, NibbleParameter
from octopoes.models import Reference
from octopoes.models.ooi.network import IPAddress, IPPort
from octopoes.models.ooi.web import Website


def query(targets: list[Reference | None]) -> str:
def pull(statements: list[str]) -> str:
return f"""
{{
:query {{
:find [(pull ?ipaddress [*]) (pull ?ipport80 [*]) (pull ?website [*]) (- (count ?ipport443) 1)]
:where [
{" ".join(statements)}
]
}}
}}
"""

base_query = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I found a bug in this query while trying to modify it:

Scherm­afbeelding 2025-01-15 om 10 25 40

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right that is the whole point, 1 is really 0 as you are counting the nil

"""
[?website :Website/ip_service ?ip_service]
[?ipservice :IPService/ip_port ?ipport80]
[?ipport80 :IPPort/port 80]
[?ipport80 :IPPort/address ?ipaddress]
(or
(and [?ipport443 :IPPort/address ?ipaddress][?ipport443 :IPPort/port 443])
[(identity nil) ?ipport443]
)
"""
]

ref_queries = [
f'[?ipaddress :IPAddress/primary_key "{str(targets[0])}"]',
f'[?ipport80 :IPPort/primary_key "{str(targets[1])}"]',
f'[?website :Website/primary_key "{str(targets[2])}"]',
]

sgn = "".join(str(int(isinstance(target, Reference))) for target in targets)
if sgn == "1000":
return pull(ref_queries[0:1] + base_query)
elif sgn == "0100":
if int(str(targets[1]).split("|")[-1]) == 80:
return pull(ref_queries[1:2] + base_query)
else:
return pull(base_query)
elif sgn == "0010":
return pull(ref_queries[2:3] + base_query)
elif sgn == "1110":
return pull(ref_queries + base_query)
else:
return pull(base_query)


NIBBLE = NibbleDefinition(
id="https_availability",
signature=[
NibbleParameter(
object_type=IPAddress, parser="[*][?object_type == 'IPAddressV6' || object_type == 'IPAddressV4'][]"
),
NibbleParameter(object_type=IPPort, parser="[*][?object_type == 'IPPort'][]"),
NibbleParameter(object_type=Website, parser="[*][?object_type == 'Website'][]"),
NibbleParameter(object_type=int, parser="[*][-1][]"),
],
query=query,
)
13 changes: 6 additions & 7 deletions octopoes/nibbles/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,14 @@ def _run(self, ooi: OOI, valid_time: datetime) -> dict[str, dict[tuple[Any, ...]
nibblet_nibbles = {self.nibbles[nibblet.method] for nibblet in nibblets if nibblet.method in self.nibbles}

for nibble in filter(
lambda x: any(isinstance(ooi, param.object_type) for param in x.signature) and x not in nibblet_nibbles,
lambda x: x.enabled and x not in nibblet_nibbles and any(isinstance(ooi, t) for t in x.triggers),
self.nibbles.values(),
):
if nibble.enabled:
if len(nibble.signature) > 1:
self._write(valid_time)
args = self.ooi_repository.nibble_query(ooi, nibble, valid_time)
results = {tuple(arg): set(flatten([nibble(arg)])) for arg in args}
return_value |= {nibble.id: results}
if len(nibble.signature) > 1:
self._write(valid_time)
args = self.ooi_repository.nibble_query(ooi, nibble, valid_time)
results = {tuple(arg): set(flatten([nibble(arg)])) for arg in args}
return_value |= {nibble.id: results}
self.cache = merge_results(self.cache, {ooi: return_value})
return return_value

Expand Down
76 changes: 76 additions & 0 deletions octopoes/tests/integration/test_https_availability_nibble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os
from datetime import datetime
from unittest.mock import Mock

import pytest
from nibbles.https_availability.nibble import NIBBLE as https_availability

from octopoes.core.service import OctopoesService
from octopoes.models.ooi.dns.zone import Hostname
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.network import IPAddressV4, IPPort, Network, Protocol
from octopoes.models.ooi.service import IPService, Service
from octopoes.models.ooi.web import HostnameHTTPURL, HTTPHeader, HTTPResource, WebScheme, Website

if os.environ.get("CI") != "1":
pytest.skip("Needs XTDB multinode container.", allow_module_level=True)

STATIC_IP = ".".join((4 * "1 ").split())


def test_https_availability_query(xtdb_octopoes_service: OctopoesService, event_manager: Mock, valid_time: datetime):
originalsouth marked this conversation as resolved.
Show resolved Hide resolved
xtdb_octopoes_service.nibbler.nibbles = {https_availability.id: https_availability}

network = Network(name="internet")
xtdb_octopoes_service.ooi_repository.save(network, valid_time)

hostname = Hostname(name="example.com", network=network.reference)
xtdb_octopoes_service.ooi_repository.save(hostname, valid_time)

web_url = HostnameHTTPURL(
network=network.reference, netloc=hostname.reference, port=443, path="/", scheme=WebScheme.HTTP
)
xtdb_octopoes_service.ooi_repository.save(web_url, valid_time)

service = Service(name="https")
xtdb_octopoes_service.ooi_repository.save(service, valid_time)

ip_address = IPAddressV4(address=STATIC_IP, network=network.reference)
xtdb_octopoes_service.ooi_repository.save(ip_address, valid_time)

port = IPPort(port=80, address=ip_address.reference, protocol=Protocol.TCP)
xtdb_octopoes_service.ooi_repository.save(port, valid_time)

ip_service = IPService(ip_port=port.reference, service=service.reference)
xtdb_octopoes_service.ooi_repository.save(ip_service, valid_time)

website = Website(ip_service=ip_service.reference, hostname=hostname.reference)
xtdb_octopoes_service.ooi_repository.save(website, valid_time)

resource = HTTPResource(website=website.reference, web_url=web_url.reference)
xtdb_octopoes_service.ooi_repository.save(resource, valid_time)

header = HTTPHeader(
resource=resource.reference, key="strict-transport-security", value="max-age=21536000; includeSubDomains"
)
xtdb_octopoes_service.ooi_repository.save(header, valid_time)

event_manager.complete_process_events(xtdb_octopoes_service)

assert xtdb_octopoes_service.ooi_repository.list_oois({Finding}, valid_time).count == 1
assert (
xtdb_octopoes_service.ooi_repository.list_oois({Finding}, valid_time).items[0].description
== "HTTP port is open, but HTTPS port is not open"
)
assert xtdb_octopoes_service.ooi_repository.list_oois({KATFindingType}, valid_time).count == 1
originalsouth marked this conversation as resolved.
Show resolved Hide resolved
assert (
xtdb_octopoes_service.ooi_repository.list_oois({KATFindingType}, valid_time).items[0].id
== "KAT-HTTPS-NOT-AVAILABLE"
)

port443 = IPPort(port=443, address=ip_address.reference, protocol=Protocol.TCP)
xtdb_octopoes_service.ooi_repository.save(port443, valid_time)
event_manager.complete_process_events(xtdb_octopoes_service)

assert xtdb_octopoes_service.ooi_repository.list_oois({Finding}, valid_time).count == 0
assert xtdb_octopoes_service.ooi_repository.list_oois({KATFindingType}, valid_time).count == 0
11 changes: 6 additions & 5 deletions octopoes/tests/test_bits.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from bits.https_availability.https_availability import run as run_https_availability
from nibbles.https_availability.https_availability import nibble as run_https_availability
from nibbles.oois_in_headers.oois_in_headers import nibble as run_oois_in_headers

from octopoes.models.ooi.config import Config
from octopoes.models.ooi.findings import Finding
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.network import IPPort
from octopoes.models.ooi.web import URL, HTTPHeader, HTTPHeaderURL, Website

Expand Down Expand Up @@ -42,7 +42,8 @@ def test_url_extracted_by_oois_in_headers_relative_path(http_resource_https):
def test_finding_generated_when_443_not_open_and_80_is_open():
port_80 = IPPort(address="fake", protocol="tcp", port=80)
website = Website(ip_service="fake", hostname="fake")
results = list(run_https_availability(None, [port_80, website], {}))
finding = results[0]
assert isinstance(finding, Finding)
results = list(run_https_availability(None, port_80, website, 0))
finding = [result for result in results if isinstance(result, Finding)][0]
assert finding.description == "HTTP port is open, but HTTPS port is not open"
katfindingtype = [result for result in results if not isinstance(result, Finding)][0]
assert isinstance(katfindingtype, KATFindingType)
Loading