Source code for rtorrent_rpc._jsonrpc

"""json rpc implement for rtorrent


!! This is not a general propose json-rpc client.
"""

import json
import threading
from typing import Any

from rtorrent_rpc._transport import Transport

try:
    import orjson
except ImportError:
    orjson = None  # type: ignore[assignment]

if orjson is None:

    def _decode_json(o: bytes) -> Any:
        return json.loads(o)

    def _encode_json(o: Any) -> bytes:
        return json.dumps(o).encode()

else:

    def _decode_json(o: bytes) -> Any:
        return orjson.loads(o)

    def _encode_json(o: Any) -> bytes:
        return orjson.dumps(o)


__all__ = ["JSONRpc", "JSONRpcError"]


class JSONRpcError(Exception):
    code: int
    message: str
    data: Any
    id: int

    def __init__(self, code: int, message: str, data: Any, id: int):
        if data:
            super().__init__(code, message, data)
        else:
            super().__init__(code, message)
        self.code = code
        self.message = message
        self.data = data
        self.id = id


[docs] class JSONRpc: """ this class is exposed as ``RTorrent(...).jsonrpc``, you do not need to construct it by yourself. """ _id: int _lock: threading.Lock _transport: Transport __slots__ = ("_id", "_lock", "_transport")
[docs] def __init__(self, transport: Transport): self._transport = transport self._id = 0 self._lock = threading.Lock()
[docs] def call(self, method: str, params: Any = None) -> Any: """send a json-rpc call""" with self._lock: id = self._id # unlikely to have 100w concurrent request... self._id = (id + 1) % 1000000 req = _encode_json( {"jsonrpc": "2.0", "id": id, "method": method, "params": params} ) res = self._transport.request(req, "application/json") data = _decode_json(res) assert data["id"] == id, "response.id doesn't match request.id" if "error" in data: raise JSONRpcError( data["error"]["code"], data["error"]["message"], data["error"].get("data"), data["id"], ) return data["result"]