This commit is contained in:
hhh
2025-01-04 11:54:01 +02:00
commit c5ce21f5fa
11 changed files with 199 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
from . import modules
__all__ = [modules]

View File

@@ -0,0 +1,37 @@
import asyncio
from asyncio import subprocess
import psutil
from typing import Callable
def kill(proc_pid):
try:
process = psutil.Process(proc_pid)
for proc in process.children(recursive=True):
proc.kill()
process.kill()
except psutil.NoSuchProcess:
return
async def run_process(
command: str, env: dict,
try_to_fix_flush: bool = True
) -> subprocess.Process:
process = await subprocess.create_subprocess_shell(
cmd=command,
env=env | {"PYTHONUNBUFFERED": "1"} if try_to_fix_flush else env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return process
async def stream_handler(stream: asyncio.streams.StreamReader, real_handler: Callable):
while True:
line = (await stream.readline()).decode().strip()
if not line:
return
await real_handler(line)

View File

@@ -0,0 +1,2 @@
async def log_handler(line: str):
print(line)

View File

@@ -0,0 +1,2 @@
from .interfaces import RunControllerHandlersInterface
from .types import CommandRunner

View File

@@ -0,0 +1,23 @@
from neko_run_controller_std.neko.types import LogsHandler
from typing import Type
import dataclasses
from neko_run_controller_std.neko.interfaces import RunControllerHandlersInterface
@dataclasses.dataclass
class __HiddenStorage:
logs_handlers: list[Type[LogsHandler]] = dataclasses.field(default_factory=list)
_storage = __HiddenStorage()
class RRunControllerHandlersInterface(RunControllerHandlersInterface):
@staticmethod
def add_logs_handler(handler: Type[LogsHandler]):
_storage.logs_handlers.append(handler)
RunControllerHandlersInterface = RRunControllerHandlersInterface
__all__ = [RunControllerHandlersInterface]

View File

@@ -0,0 +1,61 @@
from neko_run_controller_std.neko.types import CommandRunner, LogsHandler
from ..modules.controller import run_process, stream_handler, kill
import os
import asyncio
from .interfaces import _storage
import warnings
class RCommandRunner(CommandRunner):
def __init__(self, name: str, command: str, env: dict = None, chdir: str = None):
self.name = name
self.command = command
self.env = env
self.chdir = chdir
self.pid = None
if not env:
self.env = dict(os.environ)
async def __start(self):
__start_dir = os.getcwd()
if self.chdir:
os.chdir(self.chdir)
self.process = await run_process(
command=self.command,
env=self.env,
)
self.__log_tasks = list()
for handler_builder in _storage.logs_handlers:
handler = handler_builder(runner=self)
self.__log_tasks.append(
asyncio.create_task(stream_handler(self.process.stdout, handler.stdout))
)
self.__log_tasks.append(
asyncio.create_task(stream_handler(self.process.stderr, handler.stderr))
)
self.pid = self.process.pid
async def wait(self):
await self.process.wait()
async def start(self):
await self.__start()
async def kill(self):
if not self.pid:
warnings.warn('Process is not started', RuntimeWarning)
return
[task.cancel() for task in self.__log_tasks]
kill(self.pid)
await self.process.wait()
CommandRunner = RCommandRunner
__all__ = [CommandRunner, LogsHandler]