Files
karkas/src/karkas_core/karkas_core/modules_system/loaders/fs_loader/FSLoader.py

92 lines
3.0 KiB
Python

import types
from pathlib import Path
from RestrictedPython import compile_restricted_exec
# from karkas_core.logger import log
from karkas_core.modules_system.loaders.unsafe_fs_loader import UnsafeFSLoader
from karkas_core.modules_system.safe.policy import (
ALLOWED_IMPORTS,
BUILTINS,
RestrictedPythonPolicy,
)
class FSLoader(UnsafeFSLoader):
def __init__(self, path):
super().__init__(path)
self.builtins = BUILTINS.copy()
self.builtins["__import__"] = self._hook_import
self.module_info = self.info()
self.allowed_python_dependencies = self._get_allowed_python_dependencies()
def load(self):
if self.module_info.privileged:
raise Exception("Only non privileged modules are allowed to be imported")
self.module_id = self.module_info.id
return self._hook_import(".")
def _get_allowed_python_dependencies(self):
allowed = {}
if self.module_info.pythonDependencies:
if self.module_info.pythonDependencies.required:
allowed.update(self.module_info.pythonDependencies.required)
if self.module_info.pythonDependencies.optional:
allowed.update(self.module_info.pythonDependencies.optional)
for allowed_module in ALLOWED_IMPORTS:
allowed[allowed_module] = "*"
return allowed
def _resolve_module_from_path(self, module_name: str):
path = Path(self.path)
if module_name != ".":
path = path.joinpath(module_name.replace(".", "/"))
if path.is_dir():
init_file_path = path / "__init__.py"
if not init_file_path.exists():
raise FileNotFoundError(f"File {init_file_path} does not exist.")
file_path = init_file_path
else:
path = path.with_suffix(".py")
if path.is_file():
file_path = path
else:
raise ValueError(f"Module not found: {module_name}")
return file_path
def _hook_import(self, name: str, *args, **kwargs):
if name == "karkas_core.modules_system.public_api":
module = __import__(name, *args, **kwargs)
module.__karkas_block_id__ = self.module_id
return module
for key in self.allowed_python_dependencies.keys():
if name == key or name.startswith(f"{key}."):
return __import__(name, *args, **kwargs)
module_file_path = self._resolve_module_from_path(name)
with open(module_file_path, "r") as f:
src = f.read()
module = types.ModuleType(name)
module.__dict__.update(
{"__builtins__": self.builtins, "__karkas_block_id__": self.module_id}
)
result = compile_restricted_exec(src, "<string>", policy=RestrictedPythonPolicy)
if result.errors:
for error in result.errors:
print(error)
exec(result.code, module.__dict__) # nosec
return module