/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
plugins
/
up file
home
""" Feature flags synchronisation plugin (AV mode only). In IM360 mode the Go resident-agent handles feature-flag sync. In AV mode there is no resident-agent, so this plugin takes over. Periodically POSTs the local file checksum to the API and writes back any updated flags to ``/var/imunify360/feature_flags.json`` (legacy map ``{flag: true}`` on disk) and ``/var/imunify360/feature_flags`` (plain names, one per line). The POSTed checksum is over the canonical JSON **array** of enabled names, matching the correlation sync API—not over the on-disk map bytes. """ import asyncio import json import logging import os import urllib.error import urllib.request from defence360agent.contracts.config import Core from defence360agent.contracts.plugins import MessageSource from defence360agent.internals.feature_flags import ( FLAGS_PATH, FLAGS_PLAIN_PATH, enabled_flag_names_sorted, legacy_feature_flags_map_bytes, plain_text_payload_for_enabled_flags, serialize_feature_flags_file_payload, sync_checksum_hex_from_flags_file, ) from defence360agent.internals.iaid import ( IAIDTokenError, IndependentAgentIDAPI, ) from defence360agent.utils import Scope, atomic_rewrite logger = logging.getLogger(__name__) _SYNC_URL = "/api/sync/v1/feature-flags" def _env_int(name: str, default: int) -> int: """Read an int env var tolerantly. A non-numeric value (empty string, typo, etc.) must NOT raise at import time — the plugin lives in the AV agent entry point and a bad env var would otherwise kill the whole agent. """ raw = os.environ.get(name) if not raw: return default try: return int(raw) except ValueError: logger.warning( "feature-flags: %s=%r is not an int, using default %d", name, raw, default, ) return default _SYNC_INTERVAL = _env_int("I360_FEATURE_FLAGS_SYNC_INTERVAL_SEC", 3600) _INITIAL_DELAY = 10 _UNREGISTERED_DELAY = 30 _HTTP_TIMEOUT = 30 class FeatureFlagsSync(MessageSource): SCOPE = Scope.AV async def create_source(self, loop, sink): self._loop = loop self._sink = sink self._task = loop.create_task(self._sync_loop()) async def shutdown(self): if self._task is not None: self._task.cancel() try: await self._task except asyncio.CancelledError: pass def _local_checksum(self) -> str: return sync_checksum_hex_from_flags_file(FLAGS_PATH) async def _sync_loop(self): await asyncio.sleep(_INITIAL_DELAY) while True: delay = _SYNC_INTERVAL try: if not IndependentAgentIDAPI.is_registered(): delay = _UNREGISTERED_DELAY else: delay = await self._do_sync() or _SYNC_INTERVAL except asyncio.CancelledError: raise except Exception: logger.warning("feature flags sync failed", exc_info=True) await asyncio.sleep(delay) async def _do_sync(self) -> int: try: token = await IndependentAgentIDAPI.get_token() except IAIDTokenError: logger.warning("no IAID token, skipping feature flags sync") return 0 loop = asyncio.get_event_loop() checksum = await loop.run_in_executor(None, self._local_checksum) payload = json.dumps({"checksum": checksum}).encode() url = Core.API_BASE_URL.rstrip("/") + _SYNC_URL req = urllib.request.Request( url, data=payload, headers={ "Content-Type": "application/json", "X-Auth": token, }, method="POST", ) try: resp_body = await loop.run_in_executor( None, self._blocking_request, req ) except urllib.error.HTTPError as e: # Non-5xx (404/403/4xx) is usually a server-side routing or # auth state, not an agent bug — keep it a one-line WARNING. # 5xx means the server actually misbehaved; keep the traceback. if 500 <= e.code < 600: logger.error( "feature flags sync HTTP %s on %s: %s", e.code, url, e.reason, exc_info=e, ) else: logger.warning( "feature flags sync HTTP %s on %s: %s", e.code, url, e.reason, ) return 0 except urllib.error.URLError as e: # DNS, connection refused, TLS, timeout — transient network # conditions, not bugs. One-line WARNING so logs stay readable. logger.warning( "feature flags sync connection failed on %s: %s", url, e.reason, ) return 0 except Exception: logger.error( "feature flags sync request failed on %s", url, exc_info=True, ) return 0 try: result = json.loads(resp_body) except json.JSONDecodeError: logger.error("failed to parse feature flags response") return 0 server_delay = result.get("delay", 0) if result.get("changed") is False: logger.debug("feature flags unchanged, skipping write") return server_delay flags = result.get("flags") if flags is not None: await loop.run_in_executor(None, self._write_flags, flags) return server_delay @staticmethod def _blocking_request(req: urllib.request.Request) -> bytes: with urllib.request.urlopen(req, timeout=_HTTP_TIMEOUT) as resp: return resp.read() @staticmethod def _write_flags(flags) -> None: """Persist flags: map on disk; checksum for next sync uses canonical array.""" try: if isinstance(flags, list): data = legacy_feature_flags_map_bytes(flags) else: data = serialize_feature_flags_file_payload(flags) except TypeError: logger.warning( "feature flags sync: unexpected flags type %r, skipping write", type(flags).__name__, ) return n_active = len(enabled_flag_names_sorted(flags)) try: os.makedirs(os.path.dirname(FLAGS_PATH), exist_ok=True) # Atomic write-to-temp + rename so a crash mid-write can't # leave the flags file truncated/corrupt — otherwise readers # would fall back to defaults until the next sync. atomic_rewrite(FLAGS_PATH, data, backup=False) plain = plain_text_payload_for_enabled_flags(flags) atomic_rewrite(FLAGS_PLAIN_PATH, plain, backup=False) logger.info("feature flags synced: %d flags active", n_active) except OSError: logger.error("failed to write flags file", exc_info=True)