Source code for cgroup_monitor.v2_manager

import os
import subprocess


[docs]class CGroupManager: def __init__(self, cgroup_name, cgroup_base_path="/sys/fs/cgroup", helper_script=None): ''' Initialize the CGroupManager object. If helper_script is not provided, the script will use the default method of managing cgroups. One CGroupManager object is responsible for managing one cgroup. Hence, the code expects the cgoup_name to be provided while initializing the object. Parameters: - cgroup_name (str): Name of the cgroup. Necessary argument. - cgroup_base_path (str): Base path of the cgroup. Default is /sys/fs/cgroup. - helper_script (str): Path to the helper script to manage cgroups. Default is None. Returns: - None ''' self.cgroup_name = cgroup_name self.cgroup_base_path = cgroup_base_path self.cgroup_path = os.path.join(self.cgroup_base_path, cgroup_name) self.helper_script = None if helper_script is not None: self.helper_script = os.path.join(os.path.dirname(__file__), helper_script) def _run_command(self, runner_cmd, helper_cmd, sudo): ''' Internal function to run given command conditionally. If sudo is True, the command will be run with sudo privileges. Otherwise, the command will be run as is. If helper_script is provided, the command will be run using the helper script. Parameters: - runner_cmd (str): Command to run using subprocess.run. - helper_cmd (list): Command to run using subprocess.run. - sudo (bool): Whether to use sudo to run the command. Returns: - returncode (int): Return whether the command was successful. ''' try: if sudo: runner_cmd = f"sudo sh -c '{runner_cmd}'" helper_cmd.insert(0, "sudo") if self.helper_script is None: proc = subprocess.run(runner_cmd, shell=True, check=True) else: proc = subprocess.run(helper_cmd, check=True) return proc.returncode except Exception as e: raise Exception(f"Failed to run command: {e}")
[docs] def create_cgroup(self): ''' If non-existant, create cgroup and take ownership. Note, this command requires sudo privileges. Parameters: - None Returns: - returncode (int): Return whether the command was successful. ''' # Create the cgroup if not os.path.exists(self.cgroup_path): if self.helper_script is None: proc = subprocess.run(["sudo", "mkdir", self.cgroup_path], check=True) else: proc = subprocess.run(["sudo", self.helper_script, "create", self.cgroup_path], check=True) if proc.returncode != 0: raise Exception(f"Failed to create cgroup with command: {proc.args}") # Take ownership of the cgroup if self.helper_script is None: proc = subprocess.run(["sudo", "chown", "-R", f"{os.getuid()}:{os.getgid()}", self.cgroup_path], check=True) else: proc = subprocess.run(["sudo", self.helper_script, "chown", self.cgroup_path], check=True) if proc.returncode != 0: raise Exception(f"Failed to taking ownership of cgroup with command: {proc.args}") return proc.returncode
[docs] def set_cpu_limit(self, quota, period=100000, sudo=False): ''' Set the CPU limits. Sets the CPU using quota*period. Parameters: - quota (int): CPU quota in number of cores. - period (int): CPU period in microseconds. Default is 100000. - sudo (bool): Whether to use sudo to run the command. Default is False. Returns: - returncode (int): Return whether the command was successful. ''' cpu_max_data = f"{quota * period} {period}" cpu_max_path = os.path.join(self.cgroup_path, "cpu.max") runner_cmd = f"echo \"{cpu_max_data}\" > {cpu_max_path}" helper_cmd = [self.helper_script, "write", cpu_max_path, str(cpu_max_data)] return self._run_command(runner_cmd, helper_cmd, sudo)
[docs] def set_memory_limit(self, limit, sudo=False): ''' Set the memory limit in bytes. Parameters: - limit (int): Memory limit in bytes. - sudo (bool): Whether to use sudo to run the command. Returns: - returncode (int): Return whether the command was successful. ''' memory_max_path = os.path.join(self.cgroup_path, "memory.max") runner_cmd = f"echo \"{limit}\" > {memory_max_path}" helper_cmd = [self.helper_script, "write", memory_max_path, str(limit)] return self._run_command(runner_cmd, helper_cmd, sudo)
[docs] def set_memory_swap_limit(self, limit, sudo=False): ''' Set the memory+swap limit in bytes. Note: This sets maximum amount for the sum of memory and swap usage. Parameters: - limit (int): Memory+swap limit in bytes. - sudo (bool): Whether to use sudo to run the command. Returns: - returncode (int): Return whether the command was successful. ''' memory_swap_max_path = os.path.join(self.cgroup_path, "memory.swap.max") runner_cmd = f"echo \"{limit}\" > {memory_swap_max_path}" helper_cmd = [self.helper_script, "write", memory_swap_max_path, str(limit)] return self._run_command(runner_cmd, helper_cmd, sudo)
[docs] def add_process(self, pid, sudo=False): ''' Add a process to the cgroup. Parameters: - pid (int): Process ID to add to the cgroup. - sudo (bool): Whether to use sudo to run the command. Returns: - returncode (int): Return whether the command was successful. ''' proc_path = os.path.join(self.cgroup_path, "cgroup.procs") runner_cmd = f"echo \"{pid}\" > {proc_path}" helper_cmd = [self.helper_script, "write", proc_path, str(pid)] return self._run_command(runner_cmd, helper_cmd, sudo)
[docs] def delete_cgroup(self, sudo=False): ''' Delete the cgroup. Parameters: - sudo (bool): Whether to use sudo to run the command. Returns: - returncode (int): Return whether the command was successful. ''' runner_cmd = f"rmdir {self.cgroup_path}" helper_cmd = [self.helper_script, "delete", self.cgroup_path] return self._run_command(runner_cmd, helper_cmd, sudo)