Skip to content

Commit

Permalink
Use non-blocking streams instead of read timeouts (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
priitlatt authored Jun 2, 2020
1 parent 75a3999 commit 93c1e28
Showing 1 changed file with 14 additions and 18 deletions.
32 changes: 14 additions & 18 deletions src/codemagic/cli/cli_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

from __future__ import annotations

import fcntl
import os
import shlex
import subprocess
import sys
import threading
import time
import queue
from typing import IO
from typing import Optional
from typing import Sequence
Expand Down Expand Up @@ -55,24 +55,19 @@ def _log_exec_completed(self):
file_logger.debug('STDERR: %s', self.stderr)
self.logger.debug(f'Completed "{self.safe_form}" with returncode {self.returncode} in {duration}')

def _handle_stream(self, input_stream: IO, output_stream: IO, buffer_size: Optional[int] = None) -> str:
result: queue.Queue[bytes] = queue.Queue()

def read_result():
if buffer_size:
result.put(input_stream.read(buffer_size))
else:
result.put(input_stream.read())

stream_reader = threading.Thread(target=read_result)
stream_reader.start()
stream_reader.join(1) # Under normal circumstances reading the stream should never take full second
def _ensure_process_streams_are_non_blocking(self):
for stream in (self._process.stdout, self._process.stderr):
stream_descriptor = stream.fileno()
current_stream_flags = fcntl.fcntl(stream_descriptor, fcntl.F_GETFL)
fcntl.fcntl(stream_descriptor, fcntl.F_SETFL, current_stream_flags | os.O_NONBLOCK)

try:
chunk = result.get(block=False).decode()
except queue.Empty:
chunk = ''
def _handle_stream(self, input_stream: IO, output_stream: IO, buffer_size: Optional[int] = None) -> str:
if buffer_size:
bytes_chunk = input_stream.read(buffer_size)
else:
bytes_chunk = input_stream.read()

chunk = '' if bytes_chunk is None else bytes_chunk.decode()
if self._print_streams:
output_stream.write(chunk)
return chunk
Expand All @@ -93,6 +88,7 @@ def execute(self,
try:
if not self._dry_run:
self._process = subprocess.Popen(self._command_args, stdout=stdout, stderr=stderr)
self._ensure_process_streams_are_non_blocking()
while self._process.poll() is None:
self._handle_streams(self._buffer_size)
time.sleep(0.1)
Expand Down

0 comments on commit 93c1e28

Please sign in to comment.