HEX
Server: Apache
System: Linux nc-ph-0707-26.instaforreviews.com 3.10.0-1160.119.1.el7.tuxcare.els13.x86_64 #1 SMP Fri Nov 22 06:29:45 UTC 2024 x86_64
User: qirgxuo4hkuv (1004)
PHP: 8.3.27
Disabled: NONE
Upload Files
File: //usr/libexec/imunify360/cpanel_fileman_hook
#!/opt/imunify360/venv/bin/python3
"""This script is a cPanel hook script for several Filemanager related events.

Based on:

* https://documentation.cpanel.net/display/DD/Guide+to+Standardized+Hooks
* https://documentation.cpanel.net/display/DD/Guide+to+Standardized+Hooks+-+Hook+Action+Code
"""  # noqa: E501

import logging
import logging.handlers
import json
import signal
import sys
from tempfile import NamedTemporaryFile
from typing import Callable, List, TextIO

from defence360agent import sentry
from im360 import aibolit_job

logger = logging.getLogger("cpanel_fileman_hook")

# _PATH_HOOK contains the location of this hook on the server
_PATH_HOOK = "/usr/libexec/imunify360/cpanel_fileman_hook"

DESCRIBE_DATA = [
    {
        "blocking": 1,
        "escalateprivs": 0,
        "category": "Cpanel",
        "event": "UAPI::Fileman::upload_files",
        "stage": "pre",
        "hook": _PATH_HOOK + " --upload",
        "exectype": "script",
    },
    {
        "blocking": 1,
        "escalateprivs": 0,
        "category": "Cpanel",
        "event": "UAPI::Fileman::save_file_content",
        "stage": "pre",
        "hook": _PATH_HOOK + " --save",
        "exectype": "script",
    },
    {
        "blocking": 1,
        "escalateprivs": 0,
        "category": "Cpanel",
        "event": "Api2::Fileman::savefile",
        "stage": "pre",
        "hook": _PATH_HOOK + " --save",
        "exectype": "script",
    },
]


class Context:
    def __init__(
        self,
        stdin: TextIO,
        stdout: TextIO,
        stderr: TextIO,
        args: List[str],
        checker: Callable[[str], bool],
    ):
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr
        self.args = args
        self.checker = checker


def status_text(
    allowed: bool, method=None, filename=None, folder=None, user=None
) -> str:
    if not allowed:
        logger.info(
            "0 BAILOUT malware detected when %s '%s' in %s for user %s",
            method,
            filename,
            folder,
            user,
        )
    return "1" if allowed else "0 BAILOUT malware detected"


def status_code(allowed: bool) -> int:
    return 0


def describe_action(ctx: Context) -> int:
    ctx.stdout.write(json.dumps(DESCRIBE_DATA))
    return 0


def check_upload(ctx: Context) -> int:
    logger.info("upload action")
    suffix = "-key"
    path = ""
    filename = ""
    allowed = True
    data = json.loads(ctx.stdin.read())["data"]
    for k, v in data["args"].items():
        if k.endswith(suffix):
            path = data["args"][k[: -len(suffix)]]
            # "file-eicar.com-key":"file-0"
            #  [len("file-"):-len(suffix)] -> eicar.com
            filename = k[len("file-"):-len(suffix)]
            break
    if path != "":
        allowed = ctx.checker(path)
    ctx.stdout.write(
        status_text(
            allowed,
            "upload",
            filename,
            data["args"].get("dir"),
            data["user"]
        )
    )
    return status_code(allowed)


def check_save(ctx: Context) -> int:
    logger.info("save action")
    allowed = True
    data = json.loads(ctx.stdin.read())["data"]
    content = data["args"].get("content")
    if content:
        with NamedTemporaryFile(mode="w") as ntf:
            ntf.write(content)
            ntf.flush()
            allowed = ctx.checker(ntf.name)
    ctx.stdout.write(
        status_text(
            allowed,
            "save",
            data["args"].get("filename") or data["args"].get("file"),
            data["args"].get("dir") or data["args"].get("path"),
            data["user"],
        )
    )
    return status_code(allowed)


KNOWN_ACTIONS = {
    "describe": describe_action,
    "upload": check_upload,
    "save": check_save,
}


def aibolit_checker(file_to_scan: str) -> bool:
    # FOLLOWING IS MOSTLY COPIED FROM modsec_scan_real.py

    resident_dir_path = aibolit_job.RESIDENT_DIR
    # to include the import time, we could read the start time of the
    # process https://gist.github.com/westhood/1073585
    remaining_time = aibolit_job.create_remaining_time_func(
        aibolit_job.UPLOAD_TIMEOUT
    )

    # signals we'll be waiting for from aibolit
    sigset = {signal.SIGUSR1, signal.SIGUSR2}
    # block the signal in all threads
    signal.pthread_sigmask(signal.SIG_BLOCK, sigset)

    # submit the uploaded file for scanning
    # create PID.upload_job in the resident dir
    aibolit_job.create_upload_job(
        files=[file_to_scan],
        resident_dir_path=resident_dir_path,
        timeout=remaining_time(),
    )
    logger.info("file %s is sent for scanning", file_to_scan)

    # notify aibolit about the new job
    aibolit_job.notify_aibolit_start_it_if_necessary(timeout=remaining_time())

    # wait for response
    while True:
        # use sigtimedwait() instead of signal() to get the uid

        # note: ignore a possible race on retry inside sigtimedwait() on
        # receiving a signal (see sigtimedwait()'s Python docs)
        si = signal.sigtimedwait(sigset, remaining_time())
        if si is None:
            # timed out
            logger.warning("timed out while scanning %s", file_to_scan)
            return True
        if si.si_uid == 0:  # the signal is from root
            if si.si_signo == signal.SIGUSR1:
                return False
            elif si.si_signo == signal.SIGUSR2:
                return True
            else:
                assert 0, "shouldn't happen"  # pragma: no cover


def setup_logging() -> None:
    global logger
    logger.setLevel(logging.DEBUG)
    handler = logging.handlers.SysLogHandler("/dev/log")
    formatter = logging.Formatter("%(name)s[%(process)d]: %(message)s")
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    sentry.configure_sentry()


def do_main(ctx: Context) -> int:
    if len(ctx.args) < 2:
        print("No command is given.", file=ctx.stderr)
        return 1
    if not ctx.args[1].startswith("--"):
        print("Wrong argument:", ctx.args[1], file=ctx.stderr)
        return 1
    action = ctx.args[1][2:]
    if action not in KNOWN_ACTIONS:
        print("Unknown action:", action, file=ctx.stderr)
        return 1
    return KNOWN_ACTIONS[action](ctx)


def main(ctx: Context) -> int:
    try:
        return do_main(ctx)
    except Exception as e:
        print("1 Exception:", e, file=ctx.stderr)
        logger.exception("internal error: %s", e)
        return 1


if __name__ == "__main__":
    setup_logging()
    ctx = Context(
        sys.stdin,
        sys.stdout,
        sys.stderr,
        sys.argv,
        aibolit_checker,
    )
    code = main(ctx)
    ctx.stdout.flush()
    ctx.stderr.flush()
    logger.info("exiting with code %s", code)
    sys.exit(code)