✘✘ GRAYBYTE WORDPRESS FILE MANAGER ✘✘

​🇳​​🇦​​🇲​​🇪♯➤ server303.web-hosting.com ​🇻​♯➤ 4.18.0-553.54.1.lve.el8.x86_64 #1 SMP 🇾​♯➤ 2025

𝗛𝗢𝗠𝗘 𝗜𝗗 ♯➤ 199.188.205.31 ♯➤ 𝗔𝗗𝗠𝗜𝗡 𝗜𝗗 216.73.217.38
𝗢𝗣𝗧𝗜𝗢𝗡𝗦 ♯ CRL ♯➤ 𝗢𝗞 ┃ WGT ♯➤ 𝗢𝗞 ┃ SDO ♯➤ 𝗢𝗙𝗙 ┃ PKEX ♯➤ 𝗢𝗙𝗙
𝗗𝗘𝗔𝗖𝗧𝗜𝗩𝗔𝗧𝗘𝗗 ♯➤ 𝗔𝗟𝗟 𝗪𝗢𝗥𝗞𝗜𝗡𝗚....

𝗛𝗢𝗠𝗘
𝗖𝗨𝗥𝗥𝗘𝗡𝗧 𝗙𝗜𝗟𝗘 : /opt/cloudlinux/venv/lib/python3.11/site-packages/hc_json_rpc_client//client.py
import getpass
import json
import os
import socket
import time
from typing import Union

from .exceptions import InvalidArgumentError, JsonRpcClientError, MethodNotFoundError
from .payload import Params, ParamsType, Payload

DEFAULT_ROOT_SOCKET_PATH = "/var/run/hc_mds/root_mds.socket"
DEFAULT_USER_SOCKET_PATH = "/var/run/hc_mds/user_mds.socket"
DEFAULT_USER_TOKEN_FILE_FMT = (
    "/var/cpanel/userdata/{}/hc_mds.token"  # nosec bandit B105
)
RPC_VERSION = "2.0"


class Client:
    def __init__(
        self, socket_path: Union[str, None] = None, token: Union[str, None] = None
    ):
        self._socket_path = socket_path
        self._token = token
        self._transport = None

    @classmethod
    def is_root(cls) -> bool:
        return os.geteuid() == 0

    @property
    def token(self):
        if not self._token and not self.is_root():
            with open(
                DEFAULT_USER_TOKEN_FILE_FMT.format(getpass.getuser()),
                "r",
                encoding="utf-8",
            ) as f:
                self._token = f.read().strip()
        return self._token

    @classmethod
    def _get_socket_path(cls) -> str:
        if cls.is_root():
            return DEFAULT_ROOT_SOCKET_PATH
        return DEFAULT_USER_SOCKET_PATH

    @property
    def socket_path(self) -> str:
        if not self._socket_path:
            self._socket_path = self._get_socket_path()
        return self._socket_path

    def _prepare_payload(
        self, method: str, params: Union[ParamsType, "Params"]
    ) -> dict:
        if not isinstance(params, Params):
            _params = Params.from_dict(params)
        else:
            _params = params.model_copy(deep=True)

        _params.metadata.token = self.token  # type: ignore
        return Payload(
            method=method,
            params=_params,  # type: ignore
            jsonrpc=RPC_VERSION,
            id=int(time.time()),
        ).model_dump()

    @staticmethod
    def _encode_json(payload: dict) -> bytes:
        return json.dumps(payload).encode("utf-8")

    @staticmethod
    def _decode_json(payload: bytes) -> dict:
        return json.loads(payload.decode("utf-8"))

    @property
    def transport(self):
        if not self._transport:
            self._transport = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            self._transport.connect(self.socket_path)
        return self._transport

    def _execute(self, payload: dict) -> dict:
        self.transport.sendall(self._encode_json(payload))
        response = self.transport.recv(1024)
        return self._decode_json(response)

    @staticmethod
    def _validate_response(response: dict) -> None:
        if error := response.get("error", None):
            if error["code"] == -32601:
                raise MethodNotFoundError(error["message"])
            if error["code"] == -32602:
                raise InvalidArgumentError(error["message"])
            raise JsonRpcClientError(response["error"]["message"])

    def send(self, params: Union[ParamsType, "Params"], method: str = "event") -> dict:
        response = self._execute(self._prepare_payload(method, params))
        self._validate_response(response)
        return response


Current_dir [ 𝗡𝗢𝗧 𝗪𝗥𝗜𝗧𝗘𝗔𝗕𝗟𝗘 ] Document_root [ 𝗪𝗥𝗜𝗧𝗘𝗔𝗕𝗟𝗘 ]


[ Back ]
𝗡𝗔𝗠𝗘
𝗦𝗜𝗭𝗘
𝗟𝗔𝗦𝗧 𝗧𝗢𝗨𝗖𝗛
𝗨𝗦𝗘𝗥
𝗦𝗧𝗔𝗧𝗨𝗦
𝗙𝗨𝗡𝗖𝗧𝗜𝗢𝗡𝗦
..
--
25 Jun 2026 8.32 AM
root / root
0755
__pycache__
--
25 Jun 2026 8.32 AM
root / root
0755
__init__.py
0.156 KB
25 Jun 2026 8.32 AM
root / root
0644
client.py
3.146 KB
25 Jun 2026 8.32 AM
root / root
0644
exceptions.py
0.159 KB
25 Jun 2026 8.32 AM
root / root
0644
payload.py
1.288 KB
25 Jun 2026 8.32 AM
root / root
0644

✘✘ GRAYBYTE WORDPRESS FILE MANAGER @ 2026 CONTACT ME ✘✘
Static GIF Static GIF