File: //proc/14315/root/usr/sbin/imunify360-watchdog
#!/opt/imunify360/venv/bin/python3
import json
import logging
import logging.handlers
import os.path
import shutil
import socket
import subprocess
import sys
import time
from typing import Optional
from defence360agent import sentry
logging.raiseExceptions = False
CONNECT_TIMEOUT = 10
LICENSE = "/var/imunify360/license.json"
REQUEST_TIMEOUT = 60
RETRY_DELAY = 10
MIGRATION_TIMEOUT = 4 * 60 * 60 # 4 hours
IMUNIFY360 = "imunify360"
IMUNIFY360_AGENT = "imunify360-agent"
SOCKET_PATH = "/var/run/defence360agent/simple_rpc.sock"
SERVICE = "service"
SUBPROCESS_TIMEOUT = 1800
RESTART = "restart"
STATUS = "status"
SHOW = "show"
AGENT_IN_MIGRATION_STATE = "Applying database migrations"
def run(cmd, *, timeout=SUBPROCESS_TIMEOUT, check=False, **kwargs):
"""Run *cmd* with *timeout* without raising TimeoutExpired.
On timeout, return CompletedProcess with returncode equal to None.
"""
try:
return subprocess.run(cmd, timeout=timeout, check=check, **kwargs)
except subprocess.TimeoutExpired as e:
return subprocess.CompletedProcess(
e.cmd, returncode=None, stdout=e.stdout, stderr=e.stderr
)
def service_is_running(systemctl_exec: Optional[str], name: str) -> bool:
"""Check with help of [systemctl|service] command status of service"""
if systemctl_exec:
cmd = [systemctl_exec, STATUS, name]
else:
cmd = [SERVICE, name, STATUS]
cp = run(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return cp.returncode == 0
def restart_service(systemctl_exec: Optional[str], name: str) -> None:
"""Check with help of [systemctl|service] command status of service"""
if systemctl_exec:
cmd = [systemctl_exec, RESTART, name]
else:
cmd = [SERVICE, name, RESTART]
run(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def restart_imunify360(systemctl_exec: Optional[str]) -> None:
"""Restart both [non-]resident imunify360 services"""
restart_service(systemctl_exec, IMUNIFY360)
restart_service(systemctl_exec, IMUNIFY360_AGENT)
def setup_logging(level) -> logging.Logger:
logger = logging.getLogger("imunify360-watchdog")
logger.setLevel(level)
handler = logging.handlers.SysLogHandler("/dev/log")
formatter = logging.Formatter("%(name)s: %(message)s")
handler.formatter = formatter
logger.addHandler(handler)
sentry.configure_sentry()
return logger
def rpc_request(*args, **kwargs):
r = send_to_agent_socket(list(args), kwargs)
if r["result"] != "success":
raise ValueError(r.get("messages", "Unknown error"))
return r.get("data")
def send_to_agent_socket(command: list, params: dict):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.settimeout(CONNECT_TIMEOUT)
sock.connect(SOCKET_PATH)
msg = json.dumps({"command": command, "params": params}) + "\n"
start_time = time.monotonic()
sock.settimeout(REQUEST_TIMEOUT)
sock.sendall(msg.encode())
remaining_time = start_time + REQUEST_TIMEOUT - time.monotonic()
if remaining_time <= 0:
raise socket.timeout()
sock.settimeout(remaining_time)
with sock.makefile(encoding="utf-8") as file:
response = file.readline()
if not response:
raise ValueError("Empty response from socket")
return json.loads(response)
def rpc_request_with_retries(rpc_timeout: int) -> Optional[dict]:
start = time.time()
while True:
try:
return rpc_request("health")
except Exception:
if time.time() - start >= rpc_timeout:
raise
time.sleep(RETRY_DELAY)
def systemctl_executable() -> Optional[str]:
"""Try to find systemctl in default PATH and return None if failed."""
return shutil.which("systemctl", path=os.defpath)
def service_is_migrating(systemctl_exec, name, logger):
"""
Check that service in "apply migrations" state and do not exhaust timeout
"""
if systemctl_exec:
cmd = [
systemctl_exec,
SHOW,
name,
"-p",
"StatusText",
"-p",
"ExecMainStartTimestampMonotonic",
]
else:
cmd = [SERVICE, name, SHOW]
cp = run(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
# Parse only main parameters from output, other lines ignored
params = {
key: value
for (key, value) in [
key_value.split("=", 1)
for key_value in cp.stdout.decode().splitlines()
]
if key in ["StatusText", "ExecMainStartTimestampMonotonic"]
}
if AGENT_IN_MIGRATION_STATE in params["StatusText"]:
migration_duration = (
time.monotonic()
- int(params["ExecMainStartTimestampMonotonic"]) / 1e6
)
logger.info("%s migrating for %d sec", name, migration_duration)
if migration_duration < MIGRATION_TIMEOUT:
return True
logger.error("Migration took too long")
return False
def main(rpc_timeout, log_level=logging.INFO):
logger = setup_logging(log_level)
systemctl_exec = systemctl_executable()
if not service_is_running(systemctl_exec, IMUNIFY360):
logger.info("%s is not running", IMUNIFY360)
return
elif service_is_migrating(systemctl_exec, IMUNIFY360, logger):
return
try:
response = rpc_request_with_retries(rpc_timeout)
except Exception:
logger.exception("Restarting due to RPC failures")
restart_imunify360(systemctl_exec)
return
if not response.get("healthy", False):
logger.error(
"Restarting due to health report: %s", response.get("why")
)
restart_imunify360(systemctl_exec)
else:
logger.info("%s is healthy: %s", IMUNIFY360, response.get("why"))
if __name__ == "__main__":
rpc_timeout = int(sys.argv[1])
main(rpc_timeout)