From 0196ac46b8c0e749d1f6150df6fcf1f6c32857c8 Mon Sep 17 00:00:00 2001 From: Mythic Date: Fri, 19 Jan 2018 02:22:51 +0200 Subject: [PATCH] Multithreading proof of concept CPU usage is up, untested if performance has actually increased --- moviepy/Clip.py | 40 +++++++------ moviepy/iterframes.py | 128 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+), 18 deletions(-) create mode 100644 moviepy/iterframes.py diff --git a/moviepy/Clip.py b/moviepy/Clip.py index 015ecf513..a1386c911 100644 --- a/moviepy/Clip.py +++ b/moviepy/Clip.py @@ -38,9 +38,9 @@ class Clip: this case their duration will be ``None``. """ - + # prefix for all temporary video and audio files. - # You can overwrite it with + # You can overwrite it with # >>> Clip._TEMP_FILES_PREFIX = "temp_" _TEMP_FILES_PREFIX = 'TEMP_MPY_' @@ -54,12 +54,14 @@ def __init__(self): self.memoize = False self.memoized_t = None self.memoize_frame = None + self.generator_cls = None + self.generator_attrs = {} def copy(self): - """ Shallow copy of the clip. - + """ Shallow copy of the clip. + Returns a shallow copy of the clip whose mask and audio will be shallow copies of the clip's mask and audio if they exist. @@ -73,7 +75,7 @@ def copy(self): newclip.audio = copy(self.audio) if hasattr(self, 'mask'): newclip.mask = copy(self.mask) - + return newclip @convert_to_seconds(['t']) @@ -474,25 +476,27 @@ def iter_frames(self, fps=None, with_times = False, progress_bar=False, >>> print ( [frame[0,:,0].max() for frame in myclip.iter_frames()]) """ - - def generator(): - for t in np.arange(0, self.duration, 1.0/fps): - frame = self.get_frame(t) - if (dtype is not None) and (frame.dtype != dtype): - frame = frame.astype(dtype) - if with_times: - yield t, frame - else: - yield frame + # Best ratio seems to be 3 ffmpeg threads per python thread + # This is effectively 3 * (3 ffmpeg threads + 1 py thread) == 12 threads + threads = 0 + from moviepy.iterframes import iterframes + + params = { + "clip": self, + "fps": fps, + "dtype": dtype, + "with_times": with_times, + "threads": threads + } if progress_bar: nframes = int(self.duration*fps)+1 - return tqdm(generator(), total=nframes) + return tqdm(iterframes(**params), total=nframes) - return generator() + return iterframes(**params) def close(self): - """ + """ Release any resources that are in use. """ diff --git a/moviepy/iterframes.py b/moviepy/iterframes.py new file mode 100644 index 000000000..a45eb91fa --- /dev/null +++ b/moviepy/iterframes.py @@ -0,0 +1,128 @@ +from collections import defaultdict +from multiprocessing import Process, Queue +from queue import Empty +import signal +import platform +import numpy +import sys +import os + + +class Worker(object): + + def __init__(self, target, *args): + self.queue_out = Queue() + self.queue_in = Queue() + self.target = target + self.args = args + self.process = Process( + target=self.target, + args=(self.queue_out, self.queue_in,) + self.args + ) + self.process.daemon = True + + def start(self): + try: + return self.process.start() + except BrokenPipeError as e: + print("=" * 30) + print("Ran into a broken pipe error") + print("=" * 30) + print("This can occur if you are calling functions directly from a module outside of any class/function") + print("Make sure you have your script entry point inside a function, for example:") + print("\n".join([ + "", + "def main():", + " # code here", + "", + "if __name__ == '__main__':", + " main()" + ])) + print("=" * 30) + print("Original exception:") + print("=" * 30) + raise + + def get(self): + return self.queue_in.get() + + def put(self, val): + return self.queue_out.put(val) + + def join(self): + return self.process.join() + + +def iterframes_job(recv_queue, send_queue, times, clip_generator_cls, clip_generator_attrs, dtype): + generator = clip_generator_cls(**clip_generator_attrs) + clip = generator.get_clip() + + for current in iterate_frames_at_times(clip, times, dtype): + send_queue.put(current, timeout=10) + + # Avoiding running ahead of the main thread and filling up memory + # Timeout in 10 seconds in case the main thread has been killed + try: + recv_queue.get(timeout=10) + except Empty: + # For some reason sys.exit(), os._exit(), or raising an exception doesn't work + sig = signal.SIGTERM + if platform.system() == "Windows": + sig = signal.CTRL_C_EVENT + os.kill(os.getpid(), sig) + + +def iterate_frames_at_times(clip, times, dtype): + for time in times: + frame = clip.get_frame(time) + if (dtype is not None) and (frame.dtype != dtype): + frame = frame.astype(dtype) + yield time, frame + + +def get_clip_times(clip, fps): + return numpy.arange(0, clip.duration, 1.0 / fps) + + +def iterframes(threads, clip, fps, dtype, with_times): + attrs = { + "clip": clip, + "fps": fps, + "dtype": dtype, + } + if threads < 1: + generator = singlethread_iterframes + else: + generator = multithread_iterframes + attrs["threads"] = threads + + for current in generator(**attrs): + if with_times: + yield current + else: + yield current[1] + + +def singlethread_iterframes(clip, fps, dtype): + for current in iterate_frames_at_times(clip, get_clip_times(clip, fps), dtype): + yield current + + +def multithread_iterframes(threads, clip, fps, dtype): + times = get_clip_times(clip, fps) + jobsets = defaultdict(list) + for index, time in enumerate(times): + jobsets[index % threads].append(time) + + workers = [Worker(iterframes_job, jobsets[i], clip.generator_cls, clip.generator_attrs, dtype) for i in range(threads)] + for worker in workers: + worker.start() + + for index, time in enumerate(times): + current = workers[index % threads].get() + workers[index % threads].put(True) + yield current + sys.stdout.flush() + + for worker in workers: + worker.join()