Skip to content

Commit

Permalink
Merge pull request #296 from phenobarbital/dev
Browse files Browse the repository at this point in the history
fix for sending coroutines to background tasks
  • Loading branch information
phenobarbital authored Sep 26, 2024
2 parents 2d757fb + 1cdff65 commit f947562
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
20 changes: 12 additions & 8 deletions navigator/background/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
SERVICE_NAME: str = 'service_queue'


def coroutine_in_thread(coro: coroutine):
def coroutine_in_thread(coro: coroutine, callback: Optional[coroutine] = None):
"""Run a coroutine in a new thread with its own event loop."""
done_event = threading.Event()

def run():
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
new_loop.run_until_complete(coro)
result = new_loop.run_until_complete(coro)
# if callback exists:
if callback:
new_loop.run_until_complete(callback(result))
new_loop.close()
done_event.set() # Signal that the coroutine has completed
thread = threading.Thread(target=run, daemon=True)
Expand Down Expand Up @@ -84,9 +87,10 @@ async def __call__(self):
# Delay the execution by jitter seconds
await asyncio.sleep(delay)
try:
result = await self.fn(
*self.args, **self.kwargs
)
with ThreadPoolExecutor(max_workers=1) as executor:
coro = self.fn(*self.args, **self.kwargs)
coroutine_in_thread(coro, self._callback_)
return True # end this
except Exception as e:
logging.error(
f"Error executing TaskWrapper {self.fn.__name__}: {e}"
Expand Down Expand Up @@ -269,7 +273,7 @@ async def _execute_taskwrapper(self, task: TaskWrapper):
try:
result = await task()
except Exception as e:
self.logger.execption(
self.logger.exception(
f"Error executing TaskWrapper {task!r}: {e}",
exc_info=True
)
Expand All @@ -290,7 +294,6 @@ async def _execute_coroutine(self, coro: coroutine):
else:
try:
loop = asyncio.get_running_loop()
# with ThreadPoolExecutor(max_workers=1) as executor:
result = await loop.run_in_executor(
self.executor,
asyncio.run,
Expand Down Expand Up @@ -371,6 +374,7 @@ async def process_queue(self):
)
continue
except Exception as e: # Catch all exceptions
print('ERROR > ', e)
self.logger.error(
f"Error executing task {func.__name__}: {e}"
)
Expand All @@ -393,7 +397,7 @@ async def process_queue(self):
Peak Memory Usage: {peak_memory / (1024 ** 2):.2f} MB
""")
except Exception as e:
print('ERROR > ', e)
print('LOG ERROR > ', e)
# Call your task completion callback (if any)
try:
await self._callback(task, result=result)
Expand Down
2 changes: 1 addition & 1 deletion navigator/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
__description__ = (
"Navigator Web Framework based on aiohttp, " "with batteries included."
)
__version__ = "2.10.20"
__version__ = "2.10.21"
__author__ = "Jesus Lara"
__author_email__ = "[email protected]"
__license__ = "BSD"

0 comments on commit f947562

Please sign in to comment.