/
var
/
opt
/
nydus
/
ops
/
customer_local_ops
/
control_panel
/
up file
home
from datetime import datetime from typing import Dict, Any, List, Tuple import functools import logging import os import re import socket import json from shortuuid import ShortUUID from customer_local_ops import OpType, ResourceType from customer_local_ops.operating_system.linux import ( AlmaLinux8, AlmaLinux9, AlmaLinux10, Linux, CentOS, CentOS6, CentOS7, Debian, Debian8, Ubuntu1604) from customer_local_ops.control_panel.cpanel import CPanelException, OSCPanel from customer_local_ops.util import random_password from customer_local_ops.util.execute import runCommand, run_uapi_command from customer_local_ops.util.helpers import replace_line, edit_file_lines, create_file, replace_file_lines_multiple from customer_local_ops.util.retry import Retry, RETRY LOG = logging.getLogger(__name__) SHORT_UUID_DEFAULT_LENGTH = 30 class LinuxCPanel(Linux, OSCPanel): """ CPanel Customer Local Ops for the Linux OS. All function names should contain 'cpanel' so as not to override the OS ops """ AUTO_RESTART_EXCLUDE_SERVICES = ['nydus-ex', 'nydus-ex-api'] AUTO_RESTART_EXCLUSION_FILE = '/etc/cpanel/local/ignore_outdated_services' # ^ as of cPanel 11.76 / Jan 13 2019 HOSTNAME_CHANGE_LOCK_FILE = '/var/cpanel/.application-locks/UpdateHostname' op_type = OpType.CONTROL_PANEL_OPERATING_SYSTEM # this is a bit different. If the user picks cpanel, we don't want to do the regular os_op configureMTA # (exim conflicts with sendmail) def configure_mta_cpanel(self, payload: Dict[str, Any], *args: Any) -> Any: """Configures the mail transfer agent for cPanel :param payload: A dict containing input data """ LOG.info("LinuxCPanel.configure_mta_cpanel start") op_name = 'configure_mta_cpanel' try: set_tgt = functools.partial(replace_line, match='defaultmailaction', replace='defaultmailaction=fail\n', firstword=False) edit_file_lines('/var/cpanel/cpanel.config', set_tgt) relay = payload.get('relay_address') if relay is not None: exim_conf_path = '/etc/exim.conf.local' LOG.info("LinuxCPanel.configure_mta_cpanel writing %s", exim_conf_path) create_file(exim_conf_path, """ @AUTH@ @BEGINACL@ @CONFIG@ @DIRECTOREND@ @DIRECTORMIDDLE@ @DIRECTORSTART@ @ENDACL@ @RETRYEND@ @RETRYSTART@ @REWRITE@ @ROUTEREND@ @ROUTERSTART@ send_to_smart_host: driver = manualroute route_list = !+local_domains %s transport = remote_smtp @TRANSPORTEND@ @TRANSPORTMIDDLE@ @TRANSPORTSTART@ """ % relay) except Exception as ex: # pylint: disable=broad-except LOG.error('cp_os_op LinuxCPanel configure_mta_cpanel result(fail): %s', str(ex)) return False, self.build_result_dict('', str(ex), op_name) LOG.info("LinuxCPanel.configure_mta_cpanel running buildeximconf") return self._run_uapi_command(['/scripts/buildeximconf'], 'build exim config', op_name) def _run_uapi_command(self, cmd_list: List[str], description: str, op_name: str) -> Tuple[bool, Dict[str, Any]]: """Runs a local cPanel command :param cmd_list: A list of commands to run :param description: A description of the command(s) to be run :param op_name: The name of the op calling this function """ exit_code, outs, errs = run_uapi_command(cmd_list, description, op_name) if exit_code == 0: # cpanel's uapi returns 0 even when it errors. For example, if you try to add the same site name twice. LOG.debug("cp_op_result: success! %s", errs) if 'uapi --user=' in ' '.join(cmd_list): # The errors key is always returned. A '~' value indicates no errors occurred. if 'errors: ~' not in outs: return False, self.build_result_dict(outs, errs, op_name) if 'whmapi1 ' in ' '.join(cmd_list): # If result is 1, that denotes success. 0 denotes failure. if 'result: 1' not in outs: return False, self.build_result_dict(outs, errs, op_name) return True, self.build_result_dict(outs, errs, op_name) return False, self.build_result_dict(outs, errs, op_name) def change_hostname_cpanel(self, hostname: str, *_: Any, intermediate_result: Dict[str, Any] = None) -> Any: """Changes the server hostname via a command for the local operating system :param hostname: the new server hostname :param intermediate_result: Nydus intermediate result for storing the set-hostname result while waiting for the cPanel lock file to clear :returns: the result of the set-hostname cPanel call, in Nydus/CLO result format """ LOG.info("LinuxCPanel.change_hostname_cpanel start") if intermediate_result is None: result = self._run_uapi_command( ['/usr/local/cpanel/bin/set_hostname', hostname], 'set cpanel host', 'change_hostname_cpanel') if not result[0]: return result intermediate_result = {'set_hostname_result': result} # Wait for lock file to go if os.path.exists(self.HOSTNAME_CHANGE_LOCK_FILE): LOG.info('cPanel hostname change lock file exists (%s), will try again later.', self.HOSTNAME_CHANGE_LOCK_FILE) return Retry(intermediate_result=intermediate_result) return intermediate_result['set_hostname_result'] def get_public_ip_cpanel(self, *args: Any) -> Any: """Gets the cPanel public IP for this server""" op_name = 'get_public_ip_cpanel' command = ['wget', '-q', '-O', '-', 'http://cpanel.net/showip.cgi'] exit_code, outs, errs = runCommand(command, 'Get cPanel outbound IP') if exit_code == 0: public_ip = outs return public_ip return False, self.build_result_dict(outs, errs, op_name) def mark_internal_addresses_cpanel(self, private_addrs: List[str], *args: Any) -> Tuple[bool, Dict]: """Marks the server IPs as reserved :param private_addrs: A list of IPs to mark """ op_name = 'mark_internal_addresses_cpanel' try: create_file('/etc/reservedips', '\n'.join(private_addrs)) create_file('/etc/reservedipreasons', 'Internal datacenter-local addresses not publicly accessible.') except OSError as ex: return False, self.build_result_dict('', str(ex), op_name) return True, self.build_result_dict('Marking internal addresses succeeded', '', op_name) def cpanel_enable(self, cpanel_public_ip: str, *args: Any) -> Any: """Enables cPanel functionality for this server :param cpanel_public_ip: The cPanel public IP for the server """ op_name = 'cpanel_enable' LOG.debug("cpanel_public_ip- %s", cpanel_public_ip) # update /etc/wwwacct.conf ADDR try: edit_file_lines('/etc/wwwacct.conf', functools.partial(replace_line, match='ADDR', replace='ADDR %s\n' % cpanel_public_ip, firstword=True)) # update /var/cpanel/mainip try: os.remove('/var/cpanel/mainip') except OSError: pass create_file('/var/cpanel/mainip', '%s' % cpanel_public_ip) create_file('/var/cpanel/activate/2012-07.v01.EULACPWHM', '') create_file('/etc/.whostmgrft', '') exit_code, outs, errs = runCommand(['sed', '-i', 's/rpmup_allow_kernel=0/rpmup_allow_kernel=1/g', '/var/cpanel/cpanel.config'], 'enable automatic kernel updates') LOG.debug("Automatic kernel updates result: %s: %s -- %s", exit_code, errs, outs) exit_code, outs, errs = runCommand(['sed', '-i', 's/allow_deprecated_accesshash=0/allow_deprecated_accesshash=1/g', '/var/cpanel/cpanel.config'], 'enable accesshash') LOG.debug("Enable accesshash result: %s: %s -- %s", exit_code, errs, outs) exit_code, outs, errs = runCommand(['service', 'cpanel', 'restart'], 'restart cpanel') LOG.debug("restart cpanel result: %s: %s -- %s", exit_code, errs, outs) for feature in ('appconfig', 'email_archiving', 'email_autodiscovery', 'log_archiving', 'query_apache_for_nobody_senders', 'server_usage_analytics', 'servers_usage_analytics', 'smtp_restrictions', 'trust_x_php_script'): create_file(os.path.join('/var/cpanel/activate/features/', feature), """USER=root MODIFIED=%s TIMESTAMP=%s INTERFACE=GUI """ % (datetime.now().ctime(), datetime.now().ctime())) # The following section is an attempt to reduce queueprocd errors. exit_code, outs, errs = runCommand(['/scripts/restartsrv_queueprocd'], 'restart queueprocd') LOG.debug("Restart queueprocd result: %s: %s -- %s", exit_code, errs, outs) except OSError as ex: return False, self.build_result_dict('', str(ex), op_name) return True, self.build_result_dict('CPanel enabled successfully', '', op_name) def cpanel_activate(self, vm_resource: str, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any: """Activates cPanel license for this server. If another process is running cpkeyclt, this one will wait and retry to ensure a successful licensing. :param vm_resource: The resource name for the third-party hosting provider :param intermediate_result: an intermediate result """ op_name = self.OP_CPANEL_ACTIVATE exit_code, outs, errs = runCommand(['/usr/local/cpanel/cpkeyclt'], 'activate license') if 'A License check appears to already be running' in outs: return RETRY if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) ops_map = self.CPANEL_OPS_RESOURCE_ATTRIBUTE_MAP[ResourceType(vm_resource)][op_name] if ops_map[self.RUN_INSTALLATRON_REPAIR]: # We need to re-initialize installatron after getting ne wkey (Only on VMs) exit_code, outs, errs = runCommand( ['rm', '-fr', '/usr/local/installatron/lib', '/usr/local/installatron/etc/php.ini'], 'rm installatron php.ini') LOG.debug("rm installatron php.ini results: %s: %s -- %s", exit_code, errs, outs) exit_code, outs, errs = runCommand(['curl', '-O', 'https://data.installatron.com/installatron-plugin.sh'], 'curl installatron-plugin') LOG.debug("curl installatron-plugin result: %s: %s -- %s", exit_code, errs, outs) exit_code, outs, errs = runCommand(['chmod', '+x', 'installatron-plugin.sh'], 'chmod plugin') LOG.debug("chmod plugin result: %s: %s -- %s", exit_code, errs, outs) exit_code, outs, errs = runCommand(['./installatron-plugin.sh', '-f', '--quick'], 'rebuild plugin') LOG.debug("rebuild plugin result: %s: %s -- %s", exit_code, errs, outs) def _cleanup_pyinstaller_temp_dirs(self) -> None: """Clean up stale PyInstaller extraction directories PyInstaller extracts files to /opt/nydus/tmp/_MEI* directories at runtime. These should be cleaned up automatically, but sometimes orphaned directories remain and cause "already exists but should not" errors on AlmaLinux 8. This method removes stale _MEI directories older than 1 hour. """ nydus_tmp_dir = '/opt/nydus/tmp' if not os.path.exists(nydus_tmp_dir): return try: # Find and remove _MEI directories older than 1 hour (3600 seconds) current_time = datetime.now().timestamp() for entry in os.listdir(nydus_tmp_dir): if entry.startswith('_MEI'): dir_path = os.path.join(nydus_tmp_dir, entry) if os.path.isdir(dir_path): # Check if directory is older than 1 hour dir_mtime = os.path.getmtime(dir_path) if current_time - dir_mtime > 3600: LOG.debug("Cleaning up stale PyInstaller directory: %s", dir_path) exit_code, _, errs = runCommand( ['rm', '-rf', dir_path], f'cleanup pyinstaller temp {entry}') if exit_code != 0: LOG.warning("Failed to cleanup %s: %s", dir_path, errs) except (OSError, IOError) as ex: LOG.warning("Error during PyInstaller temp cleanup: %s", str(ex)) def set_mysql_password_cpanel(self, *args: Any, intermediate_result: Dict[str, Any] = None) -> Any: """Generates and sets a random mysql password for cPanel :param intermediate_result: Intermediate result for retry state tracking """ op_name = 'set_mysql_password_cpanel' # Clean up stale PyInstaller directories that can cause extraction errors # on AlmaLinux 8 (fixes "already exists but should not" errors) self._cleanup_pyinstaller_temp_dirs() password = random_password() exit_code, outs, errs = runCommand( ['/scripts/mysqlpasswd', 'root', password], 'mysql password', omitString=password) # Check for MySQL connection errors that indicate service not ready if exit_code != 0: mysql_not_ready_errors = [ 'CR_CONNECTION_ERROR', "Can't connect to local server", "Can't connect to MySQL server", 'mysql.sock', 'Connection refused' ] if any(error in errs for error in mysql_not_ready_errors): LOG.warning("MySQL not ready yet, will retry: %s", errs) return RETRY # Trigger retry via decorator in parent class return False, self.build_result_dict(outs, errs, op_name) exit_code, outs, errs = runCommand(['/scripts/mysqlconnectioncheck'], 'restart mysql') if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) def enable_secure_tmp_cpanel(self, *args: Any) -> Any: """Re-secures the /tmp directory""" op_name = 'enable_secure_tmp_cpanel' exit_code, outs, errs = runCommand(['rm', '-f', '/var/cpanel/version/securetmp_disabled'], 're-secure tmp dir') if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) def cpanel_prep(self, *args: Any) -> Any: """Pre-installs and prepares cPanel on the server""" op_name = 'cpanel_prep' try: # Clean up stale PyInstaller directories before starting prep self._cleanup_pyinstaller_temp_dirs() self._check_hostname_cpanel() self._yum_update_cpanel() self._download_cpanel() self._exclude_nydus_from_auto_restart_cpanel() self._install_cpanel() self._install_installatron_cpanel() self._config_cpanel() self._disable_secure_tmp_cpanel() except CPanelException as ex: return False, self.build_result_dict(ex.outs, ex.errs, op_name) return self.build_result_dict('cpanel_prep complete', '', op_name) def _check_hostname_cpanel(self) -> bool: """Checks that the server's fully qualified hostname is valid for CPanel `hostname` -> "PT-Test.secureserver.net" but `hostname -f` -> "localhost". /etc/hostname: pt-test.secureserver.net /etc/hosts: 127.0.0.1 localhost PT-Test.secureserver.net # cloud-controlled; do not change 10.192.28.50 pt-test.secureserver.net pt-test CHANGE: removed the PT-Test on the 127.0.0.1 line. `hostname` -> "PT-Test.secureserver.net" `hostname -f` -> "pt-test.secureserver.net" """ hostname = socket.gethostbyaddr(socket.gethostname())[0] if len(hostname.split('.')) < 3: raise CPanelException( '', 'cPanel installation requires a fully-qualified hostname ({} is not enough)'.format(hostname)) return True def _yum_update_cpanel(self) -> bool: """Performs a yum update on the server :raises CPanelException: If the command fails """ exit_code, outs, errs = self._yum_update() if exit_code != 0: raise CPanelException(outs, errs) return True def _download_cpanel(self) -> bool: """Downloads cPanel to the server :raises CPanelException: If the command fails """ exit_code, outs, errs = runCommand( ['wget', 'https://securedownloads.cpanel.net/latest', '-O', '/root/cpinstall'], 'download cpinstall') if exit_code != 0: raise CPanelException(outs, errs) return True def _exclude_nydus_from_auto_restart_cpanel(self) -> bool: # pylint:disable=invalid-name """Exclude Nydus services from cPanel's auto restarts. cPanel restarts services with outdated dependencies during install. It does this very generally, and Nydus services are included and stopped in the middle of install, causing terminal failure. This method adds Nydus services to the exclusion list so they are not restarted. See also, on a cPanel system: - /usr/local/cpanel/scripts/find_outdated_services - /usr/local/cpanel/Cpanel/ProcessCheck/Outdated.pm """ os.makedirs(os.path.dirname(self.AUTO_RESTART_EXCLUSION_FILE), exist_ok=True) content = '%s\n' % '\n'.join(self.AUTO_RESTART_EXCLUDE_SERVICES) create_file(self.AUTO_RESTART_EXCLUSION_FILE, content) return True def _install_cpanel(self) -> bool: """Installs cPanel on the server :raises CPanelException: If the command fails """ exit_code, outs, errs = runCommand(['bash', '/root/cpinstall'], 'install cpanel') if exit_code != 0: raise CPanelException(outs, errs) return True def _install_installatron_cpanel(self) -> bool: """Installs Installatron plugin on the server :raises CPanelException: If the command fails """ exit_code, outs, errs = runCommand( ['rpm', '-U', '-h', '-v', 'http://data.installatron.com/installatron-plugin-cpanel-latest.noarch.rpm'], 'install installatron') if exit_code != 0: raise CPanelException(outs, errs) return True def _config_cpanel(self) -> bool: """Adds initial cPanel configuration :raises CPanelException: If there is a problem running any commands """ try: replace_dict = {'NS2': 'NS2 ns2.secureserver.net\n', 'NS': 'NS ns1.secureserver.net\n', 'ETHDEV': 'ETHDEV eth0\n', 'CONTACTEMAIL': 'CONTACTEMAIL root@cpaneltmp.secureserver.net\n', '^ADDR .*': 'ADDR\n'} replace_file_lines_multiple('/etc/wwwacct.conf', replace_dict) mainipfile = '/var/cpanel/mainip' if os.path.exists(mainipfile): os.unlink(mainipfile) except OSError as ex: raise CPanelException('', str(ex)) from ex return True def _disable_secure_tmp_cpanel(self) -> bool: """Disables noexec on the /tmp directory :raises CPanelException: If the command fails """ exit_code, outs, errs = runCommand(['touch', '/var/cpanel/version/securetmp_disabled'], 'disable noexec /tmp') if exit_code != 0: raise CPanelException(outs, errs) return True def hulk_whitelist_cpanel(self, from_ip_addr: str, *args: Any) -> Tuple[bool, Dict[str, Any]]: """Allow cPanel access from customer IP :param from_ip_addr: The IP address from which the customer will access the cPanel instance """ op_name = 'hulk_whitelist_cpanel' exit_code, outs, errs = runCommand(['/scripts/cphulkdwhitelist', from_ip_addr], 'cphulk whitelist') return exit_code == 0, self.build_result_dict(outs, errs, op_name) def get_hash_cpanel(self, *args: Any) -> Any: """Set cPanel hash""" op_name = 'get_hash_cpanel' path = '/root/.accesshash' if os.path.exists(path) and os.path.getsize(path) == 0: os.unlink(path) if not os.path.exists(path): exit_code, outs, errs = runCommand(['/usr/local/cpanel/whostmgr/bin/whostmgr', 'setrhash'], 'generate hash') if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) with open(path, 'r', encoding='utf-8') as hashf: cphash = hashf.read() return ''.join(cphash.split('\n')) LETSENCRYPT_TOS_URL_PATTERN = re.compile( r'^https://letsencrypt\.org/documents/LE-SA-v\d+\.\d+-[A-Za-z]+-\d{1,2}-\d{4}\.pdf$' ) LETSENCRYPT_DEFAULT_TOS_URL = 'https://letsencrypt.org/documents/LE-SA-v1.6-August-18-2025.pdf' def accept_autossl_terms_cpanel(self, *args: Any, tos_url: str = None) -> Tuple[bool, Dict[str, Any]]: """Accepts the Let's Encrypt Terms of Service and re-registers the AutoSSL provider. Runs whmapi1 reset_autossl_provider to re-register the Let's Encrypt provider with the accepted Terms of Service. This resolves cases where Let's Encrypt updated their Terms of Service and certificates cannot be renewed until the new ToS is accepted and the provider is re-registered. :param tos_url: The Let's Encrypt Terms of Service URL. If not provided, uses the default URL. """ op_name = 'accept_autossl_terms_cpanel' if tos_url is None: tos_url = self.LETSENCRYPT_DEFAULT_TOS_URL # Validate the ToS URL matches the expected Let's Encrypt format if not self.LETSENCRYPT_TOS_URL_PATTERN.match(tos_url): errs = 'Invalid Let\'s Encrypt ToS URL: {}'.format(tos_url) LOG.error("LinuxCPanel.accept_autossl_terms_cpanel %s", errs) return False, self.build_result_dict('', errs, op_name) LOG.info("LinuxCPanel.accept_autossl_terms_cpanel re-registering Let's Encrypt provider") exit_code, outs, errs = runCommand( ['/usr/local/cpanel/bin/whmapi1', '--output=jsonpretty', 'reset_autossl_provider', 'provider=LetsEncrypt', 'x_terms_of_service_accepted={}'.format(tos_url)], 'reset autossl provider with accepted tos') if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) # Verify the whmapi1 call succeeded by parsing the JSON output try: parsed_outs = json.loads(outs) except (json.JSONDecodeError, TypeError): LOG.error("LinuxCPanel.accept_autossl_terms_cpanel failed to parse whmapi1 JSON output: %s", outs) return False, self.build_result_dict(outs, errs, op_name) metadata = parsed_outs.get('metadata', {}) if not isinstance(metadata, dict) or metadata.get('result') != 1: return False, self.build_result_dict(outs, errs, op_name) return True, self.build_result_dict(outs, errs, op_name) def renew_ssl_certs_cpanel(self, *args: Any) -> Tuple[bool, Dict[str, Any]]: """Triggers SSL certificate validation and renewal using cPanel's built-in AutoSSL mechanism. Executes /usr/local/cpanel/bin/checkallsslcerts which causes cPanel to check, renew, and reinstall SSL certificates as applicable. """ op_name = 'renew_ssl_certs_cpanel' LOG.info("LinuxCPanel.renew_ssl_certs_cpanel running checkallsslcerts") exit_code, outs, errs = runCommand( ['/usr/local/cpanel/bin/checkallsslcerts'], 'check and renew ssl certs') if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) return True, self.build_result_dict(outs, errs, op_name) def get_api_token_cpanel(self, *args: Any) -> Any: """Get cPanel API Token. Revoke any existing nydus-generated tokens before generating a new token.""" op_name = 'get_api_token_cpanel' token_prefix = 'dashboard_generated_' token_suffix = ShortUUID().random(length=SHORT_UUID_DEFAULT_LENGTH) token_name = token_prefix + token_suffix # Retrieve and revoke any existing tokens matching the token prefix exit_code, outs, errs = runCommand(['/usr/local/cpanel/bin/whmapi1', '--output=jsonpretty', 'api_token_list'], 'get api tokens') if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) tokens_json = json.loads(outs) tokens = tokens_json.get('data').get('tokens') for old_token_name in tokens.keys(): if old_token_name.startswith(token_prefix): # Revoke token runCommand(['/usr/local/cpanel/bin/whmapi1', '--output=jsonpretty', 'api_token_revoke', 'token_name={old_token_name}'.format(old_token_name=old_token_name)], 'revoke token') exit_code, outs, errs = runCommand(['/usr/local/cpanel/bin/whmapi1', '--output=jsonpretty', 'api_token_create', "token_name={token_name}".format(token_name=token_name)], 'generate api token') if exit_code != 0: return False, self.build_result_dict(outs, errs, op_name) token_json = json.loads(outs) token_data = token_json.get('data') if isinstance(token_data, dict): token = token_data.get('token') if token is not None: return self.encrypt(token).decode('utf-8') return False, self.build_result_dict('', 'Invalid token json returned from cpanel', op_name) class CentOSCPanel(CentOS, LinuxCPanel): pass class CentOS6CPanel(CentOS6, CentOSCPanel): # pylint: disable=too-many-ancestors def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called raise NotImplementedError class CentOS7CPanel(CentOS7, CentOSCPanel): # pylint: disable=too-many-ancestors pass class AlmaLinux8CPanel(AlmaLinux8, CentOSCPanel): # pylint: disable=too-many-ancestors pass class AlmaLinux9CPanel(AlmaLinux9, CentOSCPanel): # pylint: disable=too-many-ancestors pass class AlmaLinux10CPanel(AlmaLinux10, CentOSCPanel): # pylint: disable=too-many-ancestors pass class DebianCPanel(Debian, LinuxCPanel): def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called raise NotImplementedError class Debian8CPanel(Debian8, DebianCPanel): # pylint: disable=too-many-ancestors pass class Ubuntu1604CPanel(Ubuntu1604, DebianCPanel): # pylint: disable=too-many-ancestors pass