-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement multithreading #698
Closed
Closed
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
from collections import defaultdict | ||
from multiprocessing import Process, Queue | ||
|
||
# Queue has been renamed to queue | ||
try: | ||
from queue import Empty | ||
except ImportError: | ||
from Queue import Empty | ||
|
||
import signal | ||
import platform | ||
import numpy | ||
import sys | ||
import os | ||
|
||
|
||
class Worker(object): | ||
|
||
def __init__(self, target, *args): | ||
self.queue_out = Queue() | ||
self.queue_in = Queue() | ||
self.target = target | ||
self.args = args | ||
self.process = Process( | ||
target=self.target, | ||
args=(self.queue_out, self.queue_in,) + self.args | ||
) | ||
self.process.daemon = True | ||
|
||
def start(self): | ||
try: | ||
return self.process.start() | ||
except BrokenPipeError as e: | ||
print("=" * 30) | ||
print("Ran into a broken pipe error") | ||
print("=" * 30) | ||
print("This can occur if you are calling functions " + | ||
"directly from a module outside of any class/function") | ||
print("Make sure you have your script entry point inside " + | ||
"a function, for example:") | ||
print("\n".join([ | ||
"", | ||
"def main():", | ||
" # code here", | ||
"", | ||
"if __name__ == '__main__':", | ||
" main()" | ||
])) | ||
print("=" * 30) | ||
print("Original exception:") | ||
print("=" * 30) | ||
raise | ||
|
||
def get(self): | ||
return self.queue_in.get() | ||
|
||
def put(self, val): | ||
return self.queue_out.put(val) | ||
|
||
def join(self): | ||
return self.process.join() | ||
|
||
|
||
def iterframes_job(recv_queue, send_queue, times, | ||
generator, generator_args, dtype): | ||
clip = generator(**generator_args) | ||
|
||
for current in iterate_frames_at_times(clip, times, dtype): | ||
send_queue.put(current, timeout=10) | ||
|
||
# Avoiding running ahead of the main thread and filling up memory | ||
# Timeout in 10 seconds in case the main thread has been killed | ||
try: | ||
recv_queue.get(timeout=10) | ||
except Empty: | ||
# For some reason sys.exit(), os._exit() doesn't work | ||
sig = signal.SIGTERM | ||
if platform.system() == "Windows": | ||
sig = signal.CTRL_C_EVENT | ||
os.kill(os.getpid(), sig) | ||
|
||
|
||
def iterate_frames_at_times(clip, times, dtype): | ||
for time in times: | ||
frame = clip.get_frame(time) | ||
if (dtype is not None) and (frame.dtype != dtype): | ||
frame = frame.astype(dtype) | ||
yield time, frame | ||
|
||
|
||
def get_clip_times(clip, fps): | ||
return numpy.arange(0, clip.duration, 1.0 / fps) | ||
|
||
|
||
def iterframes(threads, clip, fps, dtype, with_times): | ||
attrs = { | ||
"clip": clip, | ||
"fps": fps, | ||
"dtype": dtype, | ||
} | ||
if threads < 1: | ||
generator = singlethread_iterframes | ||
else: | ||
generator = multithread_iterframes | ||
attrs["threads"] = threads | ||
|
||
for current in generator(**attrs): | ||
if with_times: | ||
yield current | ||
else: | ||
yield current[1] | ||
|
||
|
||
def singlethread_iterframes(clip, fps, dtype): | ||
times = get_clip_times(clip, fps) | ||
for current in iterate_frames_at_times(clip, times, dtype): | ||
yield current | ||
|
||
|
||
def multithread_iterframes(threads, clip, fps, dtype): | ||
times = get_clip_times(clip, fps) | ||
jobsets = defaultdict(list) | ||
for index, time in enumerate(times): | ||
jobsets[index % threads].append(time) | ||
|
||
workers = [ | ||
Worker( | ||
iterframes_job, | ||
jobsets[i], | ||
clip.generator, | ||
clip.generator_args, | ||
dtype | ||
) for i in range(threads)] | ||
|
||
for worker in workers: | ||
worker.start() | ||
|
||
for index, time in enumerate(times): | ||
current = workers[index % threads].get() | ||
workers[index % threads].put(True) | ||
yield current | ||
sys.stdout.flush() | ||
|
||
for worker in workers: | ||
worker.join() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
|
||
|
||
def multithread_write_videofile( | ||
filename, clip_generator, clip_generator_args={}, | ||
ffmpeg_threads=6, moviepy_threads=2, **kwargs): | ||
clip = clip_generator(**clip_generator_args) | ||
clip.generator = clip_generator | ||
clip.generator_args = clip_generator_args | ||
|
||
kwargs.update({ | ||
"filename": filename, | ||
"threads": ffmpeg_threads, | ||
"moviepy_threads": moviepy_threads | ||
}) | ||
clip.write_videofile(**kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
import sys | ||
from os import path | ||
from moviepy.video.io.VideoFileClip import VideoFileClip | ||
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip | ||
from moviepy.video.VideoClip import ColorClip | ||
from moviepy.multithreading import multithread_write_videofile | ||
|
||
|
||
sys.path.append("tests") | ||
|
||
|
||
def _get_final_clip(): | ||
clip1 = VideoFileClip("media/big_buck_bunny_432_433.webm") | ||
clip2 = ColorClip((640, 480), color=(255, 0, 0)).set_duration(1) | ||
final = CompositeVideoClip([clip1, clip2]) | ||
final.fps = 24 | ||
return final | ||
|
||
|
||
def test_multithread_rendering(): | ||
from test_helper import TMP_DIR | ||
multithread_write_videofile( | ||
path.join(TMP_DIR, "test-multithread-rendering.mp4"), | ||
_get_final_clip) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like a code smell to me. It's very ugly and hard to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do more or less agree, what would you suggest as an alternative?
Additionally, I never did figure out the actual cause of this broken pipe error, but I assume it has to do with how Python handles multiple processes and passing data between them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use dedent with a unique multiline text