Skip to content

Commit

Permalink
Multithreading proof of concept
Browse files Browse the repository at this point in the history
CPU usage is up, untested if performance has actually increased
  • Loading branch information
MythicManiac committed Jan 20, 2018
1 parent a90d4ab commit 9461417
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 19 deletions.
43 changes: 24 additions & 19 deletions moviepy/Clip.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ class Clip:
end:
When the clip is included in a composition, time of the
composition at which the clip starts playing (in seconds).
composition at which the clip stops playing (in seconds).
duration:
Duration of the clip (in seconds). Some clips are infinite, in
this case their duration will be ``None``.
"""

# prefix for all temporary video and audio files.
# You can overwrite it with
# You can overwrite it with
# >>> Clip._TEMP_FILES_PREFIX = "temp_"

_TEMP_FILES_PREFIX = 'TEMP_MPY_'
Expand All @@ -54,12 +54,14 @@ def __init__(self):
self.memoize = False
self.memoized_t = None
self.memoize_frame = None
self.generator_cls = None
self.generator_attrs = {}



def copy(self):
""" Shallow copy of the clip.
""" Shallow copy of the clip.
Returns a shallow copy of the clip whose mask and audio will
be shallow copies of the clip's mask and audio if they exist.
Expand All @@ -73,7 +75,7 @@ def copy(self):
newclip.audio = copy(self.audio)
if hasattr(self, 'mask'):
newclip.mask = copy(self.mask)

return newclip

@convert_to_seconds(['t'])
Expand All @@ -94,6 +96,7 @@ def get_frame(self, t):
else:
return self.make_frame(t)


def fl(self, fun, apply_to=None, keep_duration=True):
""" General processing of a clip.
Expand Down Expand Up @@ -474,25 +477,27 @@ def iter_frames(self, fps=None, with_times = False, progress_bar=False,
>>> print ( [frame[0,:,0].max()
for frame in myclip.iter_frames()])
"""

def generator():
for t in np.arange(0, self.duration, 1.0/fps):
frame = self.get_frame(t)
if (dtype is not None) and (frame.dtype != dtype):
frame = frame.astype(dtype)
if with_times:
yield t, frame
else:
yield frame
# Best ratio seems to be 3 ffmpeg threads per python thread
# This is effectively 3 * (3 ffmpeg threads + 1 py thread) == 12 threads
threads = 0
from moviepy.iterframes import iterframes

params = {
"clip": self,
"fps": fps,
"dtype": dtype,
"with_times": with_times,
"threads": threads
}

if progress_bar:
nframes = int(self.duration*fps)+1
return tqdm(generator(), total=nframes)
return tqdm(iterframes(**params), total=nframes)

return generator()
return iterframes(**params)

def close(self):
"""
"""
Release any resources that are in use.
"""

Expand Down
128 changes: 128 additions & 0 deletions moviepy/iterframes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from collections import defaultdict
from multiprocessing import Process, Queue
from queue import Empty
import signal
import platform
import numpy
import sys
import os


class Worker(object):

def __init__(self, target, *args):
self.queue_out = Queue()
self.queue_in = Queue()
self.target = target
self.args = args
self.process = Process(
target=self.target,
args=(self.queue_out, self.queue_in,) + self.args
)
self.process.daemon = True

def start(self):
try:
return self.process.start()
except BrokenPipeError as e:
print("=" * 30)
print("Ran into a broken pipe error")
print("=" * 30)
print("This can occur if you are calling functions directly from a module outside of any class/function")
print("Make sure you have your script entry point inside a function, for example:")
print("\n".join([
"",
"def main():",
" # code here",
"",
"if __name__ == '__main__':",
" main()"
]))
print("=" * 30)
print("Original exception:")
print("=" * 30)
raise

def get(self):
return self.queue_in.get()

def put(self, val):
return self.queue_out.put(val)

def join(self):
return self.process.join()


def iterframes_job(recv_queue, send_queue, times, clip_generator_cls, clip_generator_attrs, dtype):
generator = clip_generator_cls(**clip_generator_attrs)
clip = generator.get_clip()

for current in iterate_frames_at_times(clip, times, dtype):
send_queue.put(current, timeout=10)

# Avoiding running ahead of the main thread and filling up memory
# Timeout in 10 seconds in case the main thread has been killed
try:
recv_queue.get(timeout=10)
except Empty:
# For some reason sys.exit(), os._exit(), or raising an exception doesn't work
sig = signal.SIGTERM
if platform.system() == "Windows":
sig = signal.CTRL_C_EVENT
os.kill(os.getpid(), sig)


def iterate_frames_at_times(clip, times, dtype):
for time in times:
frame = clip.get_frame(time)
if (dtype is not None) and (frame.dtype != dtype):
frame = frame.astype(dtype)
yield time, frame


def get_clip_times(clip, fps):
return numpy.arange(0, clip.duration, 1.0 / fps)


def iterframes(threads, clip, fps, dtype, with_times):
attrs = {
"clip": clip,
"fps": fps,
"dtype": dtype,
}
if threads < 1:
generator = singlethread_iterframes
else:
generator = multithread_iterframes
attrs["threads"] = threads

for current in generator(**attrs):
if with_times:
yield current
else:
yield current[1]


def singlethread_iterframes(clip, fps, dtype):
for current in iterate_frames_at_times(clip, get_clip_times(clip, fps), dtype):
yield current


def multithread_iterframes(threads, clip, fps, dtype):
times = get_clip_times(clip, fps)
jobsets = defaultdict(list)
for index, time in enumerate(times):
jobsets[index % threads].append(time)

workers = [Worker(iterframes_job, jobsets[i], clip.generator_cls, clip.generator_attrs, dtype) for i in range(threads)]
for worker in workers:
worker.start()

for index, time in enumerate(times):
current = workers[index % threads].get()
workers[index % threads].put(True)
yield current
sys.stdout.flush()

for worker in workers:
worker.join()

0 comments on commit 9461417

Please sign in to comment.