Skip to content
Snippets Groups Projects
Commit f41ea28d authored by Jiří Ježek's avatar Jiří Ježek
Browse files

Merge branch 'fix/flase_positives_unreachable_network_rules' into 'master'

Fix false positives unreachable when it is not allowed by security group network rules

See merge request !25
parents 5bbd08b6 f0698e47
No related branches found
No related tags found
1 merge request!25Fix false positives unreachable when it is not allowed by security group network rules
Pipeline #623464 passed
......@@ -6,9 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [1.6.5] - 2025-05-14
### Fix
- Fix false positives unreachable when it is not allowed by security group network rules
## [1.6.4] - 2025-05-07
### Fix
- Extend connection timeout when trzing tcp ping
- Extend connection timeout when trying tcp ping
## [1.6.3] - 2025-05-05
### Fix
......
......@@ -17,6 +17,7 @@ import ipaddress
import enum
import socket
import logging
import subprocess
from prometheus_client import start_http_server, Gauge, REGISTRY, PROCESS_COLLECTOR, PLATFORM_COLLECTOR
from ping3 import ping
......@@ -74,7 +75,7 @@ class AppMetrics:
if args.exporter_address:
self.exporter_address = ipaddress.ip_address(args.exporter_address)
else:
self.exporter_address = None
self.exporter_address = self.get_container_ip() # Modified from None
if args.ip_status_ipv4_monitor_visibility_exceptions != "":
self.ipv4_monitor_visibility_exceptions = [ipaddress.ip_network(prefix.strip()) for prefix in args.ip_status_ipv4_monitor_visibility_exceptions.split(",")]
else:
......@@ -100,6 +101,86 @@ class AppMetrics:
"Custom info for server ports",
["server_id", "port_id", "ip_address", "network_id", "network_name", "floating_ip"])
def get_container_ip(self):
"""
Get the container's primary IP address
"""
try:
# Get all IP addresses
result = subprocess.run(['hostname', '-I'], capture_output=True, text=True)
if result.returncode == 0:
ips = result.stdout.strip().split()
for ip in ips:
try:
addr = ipaddress.ip_address(ip.strip())
# Return the first non-loopback IPv4 address
if not addr.is_loopback and addr.version == 4:
logging.info(f"Detected container IP: {addr}")
return addr
except ValueError:
continue
except Exception as e:
logging.warning(f"Failed to detect container IP: {e}")
return None
def check_network_compatibility(self, required_networks, protocol=None):
"""
Check if the container's IP is within the required networks for access
Returns tuple (is_compatible, reason)
"""
if not self.exporter_address:
logging.warning("Could not determine container IP address")
return True, "Container IP unknown" # Assume compatible if we can't determine
if len(required_networks) == 0:
logging.info("No allowed networks specified")
return True, "No allowed networks specified"
# Check if container IP is in any of the allowed networks
for network in required_networks:
if self.exporter_address in network:
logging.debug(f"Container IP {self.exporter_address} is within allowed network {network}")
return True, f"Container IP in allowed network {network}"
logging.info(f"Container IP {self.exporter_address} is not in any allowed network: {[str(n) for n in required_networks]}")
return False, f"Container IP {self.exporter_address} not in allowed networks"
def get_enabled_ports_and_networks(self, security_groups):
"""
Returns a tuple of (enabled_ports, allowed_networks) from security groups
Added from openstack-ip-tester-modified.py
"""
enabled_ports = set()
allowed_networks = set()
for i_sg in security_groups:
logging.debug(f"Checking security group: {i_sg.name} (ID: {i_sg.id})")
for rule in i_sg.security_group_rules:
# Collect allowed networks
if rule['direction'] == 'ingress' and rule['remote_ip_prefix']:
try:
network = ipaddress.ip_network(rule['remote_ip_prefix'], strict=False)
allowed_networks.add(network)
except ValueError:
pass
# Skip this rule if the exporter can't reach the VM through it.
if self.sg_rule_transparent(rule) and rule['protocol'] in Constants.TCP_PROTOCOLS:
logging.debug(f"Transparent rule found: {rule}")
for i_port in Constants.PORTS_COMMON:
if i_port in range(rule['port_range_min'], (rule['port_range_max']) + 1):
enabled_ports.add(i_port)
logging.debug(f"Added common port {i_port}")
# We don't want to add enormous number of ports, so the len() limit is used.
if ((rule['port_range_max'] is None or rule['port_range_min'] == rule['port_range_max']) and
len(enabled_ports) <= len(Constants.PORTS_COMMON) + Constants.PORTS_ADDITIONAL_NUM):
enabled_ports.add(rule['port_range_min'])
logging.debug(f"Added specific port {rule['port_range_min']}")
logging.info(f"Enabled ports: {sorted(list(enabled_ports))}")
logging.info(f"Allowed networks: {[str(n) for n in allowed_networks]}")
return enabled_ports, allowed_networks
def run_metrics_loop(self):
"""Metrics fetching loop"""
......@@ -297,6 +378,7 @@ class AppMetrics:
return False
# replaced by function get_enabled_ports_and_networks(self, security_groups)
def get_enabled_ports(self, security_groups):
''' Returns a set of enabled ports throughout all security groups and their rules. '''
enabled_ports = set()
......@@ -350,6 +432,10 @@ class AppMetrics:
if not valid_security_groups:
return IPAddressStatus.HIDDEN.value
ports, allowed_networks = self.get_enabled_ports_and_networks(valid_security_groups)
network_is_compatible, reason = self.check_network_compatibility(allowed_networks)
status = None
if self.protocols_in_any_security_group(valid_security_groups, Constants.ICMP_PROTOCOLS):
status = self.get_ip_accessibility_status_ping(address)
......@@ -360,12 +446,16 @@ class AppMetrics:
# Ping timed out, trying ports
else:
logging.info(str(address) + " | Ping not allowed or None")
ports = self.get_enabled_ports(valid_security_groups)
logging.info(str(address) + " | Enabled ports: " + str(ports))
if not ports:
logging.info(f"Not a single port allowed by security groups")
return IPAddressStatus.HIDDEN.value
status = self.get_ip_accessibility_status_ports(address, ports)
if not network_is_compatible and status == IPAddressStatus.UNREACHABLE.value:
logging.warning(f"Container network incompatible with VM security rules: {reason}")
return IPAddressStatus.HIDDEN.value
return status
def update_sg_and_vm_cache(self):
......@@ -502,7 +592,7 @@ def parse_arguments(script_summary="script"):
help="Timeout for API call, default: %(default)s")
parser.add_argument("--ip-status-ipv6-monitor-visibility", default="::/0, 2001:718:801::/48",
help="IPv6 prefix visible from monitoring, default: %(default)s")
parser.add_argument("--ip-status-ipv4-monitor-visibility", default="147.251.0.0/16, 10.16.0.0/16",
parser.add_argument("--ip-status-ipv4-monitor-visibility", default="147.251.0.0/16, 10.16.0.0/16", # mozna pridat i 100
help="IPv4 prefix visible from monitoring, default: %(default)s")
parser.add_argument("--ip-status-ipv4-monitor-visibility-exceptions", default="",
help="Subnets of --ip-status-ipv4-monitor-visibility considered unreachable, default: %(default)s")
......@@ -545,3 +635,4 @@ def main():
if __name__ == "__main__":
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment