Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/powerapi/cli/common_cli_parsing_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ def __init__(self):
subparser_openstack_pre_processor = SubgroupConfigParsingManager("openstack")
subparser_openstack_pre_processor.add_argument("p", "puller", help_text="Name of the puller to attach the pre-processor to", is_mandatory=True)
subparser_openstack_pre_processor.add_argument("n", "name", help_text="Name of the pre-processor", default_value='preprocessor_openstack')
subparser_openstack_pre_processor.add_argument('i', "polling-interval", help_text="OpenStack API polling interval (in seconds)", argument_type=float, default_value=10.0)
self.add_subgroup_parser("pre-processor", subparser_openstack_pre_processor)

def parse_argv(self):
Expand Down
3 changes: 2 additions & 1 deletion src/powerapi/cli/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,5 +462,6 @@ def _openstack_pre_processor_factory(processor_config: dict) -> ProcessorActor:
"""
from powerapi.processor.pre.openstack.actor import OpenStackPreProcessorActor
name = processor_config[ACTOR_NAME_KEY]
api_polling_interval = processor_config['polling-interval']
level_logger = logging.DEBUG if processor_config[GENERAL_CONF_VERBOSE_KEY] else logging.INFO
return OpenStackPreProcessorActor(name, level_logger)
return OpenStackPreProcessorActor(name, api_polling_interval, level_logger)
4 changes: 2 additions & 2 deletions src/powerapi/processor/pre/openstack/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

def get_instance_name_from_libvirt_cgroup(target: str) -> str | None:
"""
Returns the instance name of the target.
Extract the instance name from the libvirt cgroup path.
:param target: Cgroup path
:return: Instance name (``instance-XXXXXXXX``)
"""
Expand All @@ -44,7 +44,7 @@ def get_instance_name_from_libvirt_cgroup(target: str) -> str | None:
# For example: /sys/fs/cgroup/machine.slice/machine-qemu\\x2d3\\x2dinstance\\x2d00000003.scope/libvirt/emulator
target = target.encode("utf-8").decode("unicode_escape")

match = LIBVIRT_INSTANCE_NAME_REGEX.match(target)
match = LIBVIRT_INSTANCE_NAME_REGEX.search(target)
if match:
return match.group(1)

Expand Down
16 changes: 12 additions & 4 deletions src/powerapi/processor/pre/openstack/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,56 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import logging
from multiprocessing import Manager

from powerapi.actor import Actor, State
from powerapi.actor.message import StartMessage, PoisonPillMessage
from powerapi.processor.pre.openstack.handlers import StartMessageHandler, PoisonPillMessageHandler, HWPCReportHandler
from powerapi.processor.processor_actor import ProcessorActor
from powerapi.report import HWPCReport
from .metadata_cache_manager import OpenStackMetadataCacheManager
from .monitor_agent import OpenStackMonitorAgent


class OpenStackProcessorState(State):
"""
State of the OpenStack processor actor.
"""

def __init__(self, actor: Actor):
def __init__(self, actor: Actor, polling_interval: float):
"""
Initializes an OpenStack processor state.
:param actor: Actor instance
:param polling_interval: OpenStack API polling interval (in seconds)
"""
super().__init__(actor)

self.metadata_cache_manager = OpenStackMetadataCacheManager()
self.manager = Manager()
self.metadata_cache_manager = OpenStackMetadataCacheManager(self.manager)
self.monitor_agent = OpenStackMonitorAgent(self.metadata_cache_manager, polling_interval)


class OpenStackPreProcessorActor(ProcessorActor):
"""
Pre-Processor Actor that adds OpenStack related metadata to reports.
"""

def __init__(self, name: str, level_logger: int = logging.WARNING):
def __init__(self, name: str, polling_interval: float, level_logger: int = logging.WARNING):
"""
Initializes an OpenStack pre-processor actor.
:param name: Name of the actor
:param polling_interval: OpenStack API polling interval (in seconds)
:param level_logger: Logging level of the actor
"""
super().__init__(name, level_logger, 5000)

self.polling_interval = polling_interval

def setup(self):
"""
Set up the OpenStack pre-processor actor.
"""
self.state = OpenStackProcessorState(self)
self.state = OpenStackProcessorState(self, self.polling_interval)

self.add_handler(StartMessage, StartMessageHandler(self.state))
self.add_handler(PoisonPillMessage, PoisonPillMessageHandler(self.state))
Expand Down
21 changes: 8 additions & 13 deletions src/powerapi/processor/pre/openstack/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from powerapi.processor.handlers import ProcessorReportHandler
from powerapi.report import HWPCReport
from ._utils import get_instance_name_from_libvirt_cgroup
from .metadata_cache_manager import ServerMetadata, MetadataSyncFailed
from .metadata_cache_manager import ServerMetadata


class StartMessageHandler(StartHandler):
Expand All @@ -58,6 +58,11 @@ def teardown(self, soft: bool = False):
"""
Teardown the OpenStack processor.
"""
self.state.monitor_agent.terminate()
self.state.monitor_agent.join()

self.state.manager.shutdown()

for actor in self.state.actor.target_actors:
actor.disconnect()

Expand All @@ -75,21 +80,11 @@ def try_get_server_metadata(self, sensor_name: str, instance_name: str) -> Serve
:param instance_name: Name of the instance to fetch metadata for
:return: Server metadata entry or None if not found
"""
server_metadata = None
try:
server_metadata = self.state.metadata_cache_manager.get_server_metadata(sensor_name, instance_name)
if server_metadata is None:
# Retry once after syncing the metadata cache.
self.state.metadata_cache_manager.sync_metadata_cache_from_api()
server_metadata = self.state.metadata_cache_manager.get_server_metadata(sensor_name, instance_name)
except MetadataSyncFailed as exn:
self.state.logger.warning(exn)

return server_metadata
return self.state.metadata_cache_manager.get_server_metadata(sensor_name, instance_name)

def handle(self, msg: HWPCReport):
"""
Process an HWPCReport to add the Kubernetes metadata.
Process an HWPCReport to add the OpenStack metadata.
:param msg: The HWPCReport to process
"""
instance_name = get_instance_name_from_libvirt_cgroup(msg.target)
Expand Down
41 changes: 14 additions & 27 deletions src/powerapi/processor/pre/openstack/metadata_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,62 +27,49 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from collections.abc import MutableMapping
from dataclasses import dataclass
from multiprocessing.managers import SyncManager

from openstack.connection import Connection
from openstack.exceptions import SDKException


class MetadataSyncFailed(Exception):
"""
Exception raised when the metadata cache sync operation fails.
"""


@dataclass(frozen=True, slots=True)
@dataclass(frozen=True)
class ServerMetadata:
"""
Represents an OpenStack server metadata cache entry.
"""
server_id: str
server_name: str
host: str
instance_name: str
metadata: dict[str, str]


class OpenStackMetadataCacheManager:
"""
OpenStack metadata cache manager.
Use the OpenStack API to fetch details about the servers hosted on the infrastructure.
It requires credentials with sufficient permissions to access server metadata across all projects.
Permission to read Nova Extended Server Attributes (OS-EXT-SRV-ATTR) is **mandatory** in order to map cgroups to servers.
"""

def __init__(self):
self._openstack_api = Connection(app_name='PowerAPI') # Configuration is taken from OS_* environment variables
self._metadata_cache: dict[tuple, ServerMetadata] = {}
def __init__(self, manager: SyncManager):
"""
:param manager: Manager of the shared metadata cache
"""
self._metadata_cache: MutableMapping[tuple[str, str], ServerMetadata] = manager.dict()

def get_server_metadata(self, host: str, instance_name: str) -> ServerMetadata | None:
"""
Get metadata for the server of the specified host from the cache.
:param host: Name of the host (hypervisor) where the server is located
:param instance_name: Name of the instance (libvirt instance name)
:return: Server metadata cache entry
:return: Server metadata cache entry or None if not found
"""
return self._metadata_cache.get((host, instance_name), None)

def sync_metadata_cache_from_api(self) -> None:
def update_server_metadata(self, server_metadata: ServerMetadata) -> None:
"""
Sync the running servers metadata cache from the OpenStack API.
Add or update metadata for a server.
:param server_metadata: Server metadata cache entry
"""
try:
for server in self._openstack_api.compute.servers(details=True, all_projects=True):
cache_entry = ServerMetadata(server.id, server.name, server.host, server.metadata)
self._metadata_cache[(server.host, server.instance_name)] = cache_entry
except SDKException as exn:
raise MetadataSyncFailed('Failed to retrieve servers metadata from OpenStack API') from exn
except ValueError as exn:
raise MetadataSyncFailed('Required server attribute is missing from the OpenStack API response') from exn
self._metadata_cache[(server_metadata.host, server_metadata.instance_name)] = server_metadata

def clear_metadata_cache(self) -> None:
"""
Expand Down
127 changes: 127 additions & 0 deletions src/powerapi/processor/pre/openstack/monitor_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright (c) 2026, Inria
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import logging
import sys
from multiprocessing import Process
from signal import signal, SIGINT, SIGTERM
from time import sleep

from openstack.compute.v2.server import Server
from openstack.connection import Connection
from openstack.exceptions import SDKException

from .metadata_cache_manager import OpenStackMetadataCacheManager, ServerMetadata


class OpenStackMonitorAgent(Process):
"""
Background monitoring agent that updates the shared metadata cache from the OpenStack API.
It requires credentials with sufficient permissions to access server metadata across all projects.
Permission to read Nova Extended Server Attributes (OS-EXT-SRV-ATTR) is **mandatory** in order to map cgroups to servers.
"""

def __init__(self, cache_manager: OpenStackMetadataCacheManager, poll_interval: float, level_logger: int = logging.WARNING):
"""
:param cache_manager: Metadata cache manager
:param poll_interval: Interval in seconds between OpenStack API synchronizations
:param level_logger: Logger level
"""
super().__init__(name='openstack-processor-monitor-agent')

self.logger = logging.getLogger(self.name)
self.logger.setLevel(level_logger)
formatter = logging.Formatter('%(asctime)s || %(levelname)s || %(process)d %(processName)s || %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)

self.metadata_cache_manager = cache_manager
self.poll_interval = poll_interval
self.stop_monitoring = False

@staticmethod
def _setup_openstack_api_client() -> Connection:
"""
Setup OpenStack API client.
Configuration is taken from OS_* environment variables.
"""
return Connection(app_name='PowerAPI')

def _setup_signal_handlers(self):
"""
Setup signal handlers for the current Process.
"""
def stop_monitor(_, __):
self.stop_monitoring = True
sys.exit(0)

signal(SIGTERM, stop_monitor)
signal(SIGINT, stop_monitor)

def run(self):
"""
Main code executed by the OpenStack monitor agent.
"""
self._setup_signal_handlers()
openstack_api = self._setup_openstack_api_client()

# Prevents orphaned entries that no longer exist in the OpenStack API.
self.metadata_cache_manager.clear_metadata_cache()

while not self.stop_monitoring:
for server in self.fetch_servers_metadata(openstack_api):
self.metadata_cache_manager.update_server_metadata(server)

sleep(self.poll_interval)

@staticmethod
def build_metadata_cache_entry_from_server(server: Server) -> ServerMetadata:
"""
Build and return a metadata cache entry from an OpenStack server object.
:param server: OpenStack server object
:return: Cache key and server metadata entry
"""
return ServerMetadata(server.id, server.name, server.host, server.instance_name, server.metadata)

def fetch_servers_metadata(self, openstack_api: Connection) -> list[ServerMetadata]:
"""
Fetch servers metadata from the OpenStack API.
:param openstack_api: OpenStack API client
:return: List of servers metadata
"""
try:
return [
self.build_metadata_cache_entry_from_server(server)
for server in openstack_api.compute.servers(details=True, all_projects=True)
]
except SDKException as exn:
logging.warning('Failed to retrieve server metadata from OpenStack API: %s', exn.message)
except (AttributeError, ValueError) as exn:
logging.error('Required server attribute is missing from the OpenStack API response: %s', exn)

return []
3 changes: 2 additions & 1 deletion tests/unit/cli/test_generator_openstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def openstack_config():
'pre-processor': {
'pytest-openstack-preprocessor': {
'type': 'openstack',
'puller': 'pytest-json-puller'
'puller': 'pytest-json-puller',
'polling-interval': 10.0
}
}
}
Expand Down
27 changes: 27 additions & 0 deletions tests/unit/processor/pre/openstack/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2026, Inria
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Loading