-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ErrorHandler DLQ API to Python #31856
Changes from 5 commits
917e996
c4be92f
34e28f3
b69f4d8
daf28cd
5141f14
049e4b3
36e5eff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
"""Utilities for gracefully handling errors and excluding bad elements.""" | ||
|
||
import traceback | ||
|
||
from apache_beam import transforms | ||
|
||
|
||
class ErrorHandler: | ||
"""ErrorHandlers are used to skip and otherwise process bad records. | ||
|
||
Error handlers allow one to implement the "dead letter queue" pattern in | ||
a fluent manner, disaggregating the error processing specification from | ||
the main processing chain. | ||
|
||
This is typically used as follows:: | ||
|
||
with error_handling.ErrorHandler(WriteToSomewhere(...)) as error_handler: | ||
result = pcoll | SomeTransform().with_error_handler(error_handler) | ||
|
||
in which case errors encountered by `SomeTransform()`` in processing pcoll | ||
will be written by the PTransform `WriteToSomewhere(...)` and excluded from | ||
`result` rather than failing the pipeline. | ||
|
||
To implement `with_error_handling` on a PTransform, one caches the provided | ||
error handler for use in `expand`. During `expand()` one can invoke | ||
`error_handler.add_error_pcollection(...)` any number of times with | ||
PCollections containing error records to be processed by the given error | ||
handler, or (if applicable) simply invoke `with_error_handling(...)` on any | ||
subtransforms. | ||
|
||
The `with_error_handling` should accept `None` to indicate that error handling | ||
is not enabled (and make implementation-by-forwarding-error-handers easier). | ||
robertwb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
In this case, any non-recoverable errors should fail the pipeline (e.g. | ||
propagate exceptions in `process` methods) rather than silently ignore errors. | ||
""" | ||
def __init__(self, consumer): | ||
self._consumer = consumer | ||
self._creation_traceback = traceback.format_stack()[-2] | ||
self._error_pcolls = [] | ||
self._closed = False | ||
|
||
def __enter__(self): | ||
self._error_pcolls = [] | ||
self._closed = False | ||
return self | ||
|
||
def __exit__(self, *exec_info): | ||
if exec_info[0] is None: | ||
self.close() | ||
|
||
def close(self): | ||
"""Indicates all error-producing operations have reported any errors. | ||
|
||
Invokes the provided error consuming PTransform on any provided error | ||
PCollections. | ||
""" | ||
self._output = ( | ||
tuple(self._error_pcolls) | transforms.Flatten() | self._consumer) | ||
self._closed = True | ||
|
||
def output(self): | ||
"""Returns result of applying the error consumer to the error pcollections. | ||
""" | ||
if not self._closed: | ||
raise RuntimeError( | ||
"Cannot access the output of an error handler " | ||
"until it has been closed.") | ||
return self._output | ||
|
||
def add_error_pcollection(self, pcoll): | ||
"""Called by a class implementing error handling on the error records. | ||
""" | ||
pcoll.pipeline._register_error_handler(self) | ||
self._error_pcolls.append(pcoll) | ||
|
||
def verify_closed(self): | ||
"""Called at end of pipeline construction to ensure errors are not ignored. | ||
""" | ||
if not self._closed: | ||
raise RuntimeError( | ||
"Unclosed error handler initialized at %s" % self._creation_traceback) | ||
|
||
|
||
class _IdentityPTransform(transforms.PTransform): | ||
def expand(self, pcoll): | ||
return pcoll | ||
|
||
|
||
class CollectingErrorHandler(ErrorHandler): | ||
"""An ErrorHandler that simply collects all errors for further processing. | ||
|
||
This ErrorHandler requires the set of errors be retrieved via `output()` | ||
and consumed (or explicitly discarded). | ||
""" | ||
def __init__(self): | ||
super().__init__(_IdentityPTransform()) | ||
self._creation_traceback = traceback.format_stack()[-2] | ||
self._output_accessed = False | ||
|
||
def output(self): | ||
self._output_accessed = True | ||
return super().output() | ||
|
||
def verify_closed(self): | ||
if not self._output_accessed: | ||
raise RuntimeError( | ||
"CollectingErrorHandler requires the output to be retrieved. " | ||
"Initialized at %s" % self._creation_traceback) | ||
return super().verify_closed() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
import logging | ||
import unittest | ||
|
||
import apache_beam as beam | ||
from apache_beam.testing.util import assert_that | ||
from apache_beam.testing.util import equal_to | ||
from apache_beam.transforms import error_handling | ||
|
||
|
||
class PTransformWithErrors(beam.PTransform): | ||
def __init__(self, limit): | ||
self._limit = limit | ||
self._error_handler = None | ||
|
||
def with_error_handler(self, error_handler): | ||
self._error_handler = error_handler | ||
return self | ||
|
||
def expand(self, pcoll): | ||
limit = self._limit | ||
|
||
def process(element): | ||
if len(element) < limit: | ||
return element.title() | ||
else: | ||
return beam.pvalue.TaggedOutput('bad', element) | ||
|
||
def raise_on_everything(element): | ||
raise ValueError(element) | ||
|
||
good, bad = pcoll | beam.Map(process).with_outputs('bad', main='good') | ||
if self._error_handler: | ||
self._error_handler.add_error_pcollection(bad) | ||
else: | ||
# Will throw an exception if there are any bad elements. | ||
_ = bad | beam.Map(raise_on_everything) | ||
return good | ||
|
||
|
||
def exception_throwing_map(x, limit): | ||
if len(x) > limit: | ||
raise ValueError(x) | ||
else: | ||
return x.title() | ||
|
||
|
||
class ErrorHandlingTest(unittest.TestCase): | ||
def test_error_handling(self): | ||
with beam.Pipeline() as p: | ||
pcoll = p | beam.Create(['a', 'bb', 'cccc']) | ||
with error_handling.ErrorHandler( | ||
beam.Map(lambda x: "error: %s" % x)) as error_handler: | ||
result = pcoll | PTransformWithErrors(3).with_error_handler( | ||
error_handler) | ||
error_pcoll = error_handler.output() | ||
|
||
assert_that(result, equal_to(['A', 'Bb']), label='CheckGood') | ||
assert_that(error_pcoll, equal_to(['error: cccc']), label='CheckBad') | ||
|
||
def test_error_handling_pardo(self): | ||
with beam.Pipeline() as p: | ||
pcoll = p | beam.Create(['a', 'bb', 'cccc']) | ||
with error_handling.ErrorHandler( | ||
beam.Map(lambda x: "error: %s" % x[0])) as error_handler: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we access here the first element in the array ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
result = pcoll | beam.Map( | ||
exception_throwing_map, limit=3).with_error_handler(error_handler) | ||
error_pcoll = error_handler.output() | ||
|
||
assert_that(result, equal_to(['A', 'Bb']), label='CheckGood') | ||
assert_that(error_pcoll, equal_to(['error: cccc']), label='CheckBad') | ||
|
||
def test_error_on_unclosed_error_handler(self): | ||
with self.assertRaisesRegex(RuntimeError, r'.*Unclosed error handler.*'): | ||
with beam.Pipeline() as p: | ||
pcoll = p | beam.Create(['a', 'bb', 'cccc']) | ||
# Use this outside of a context to allow it to remain unclosed. | ||
error_handler = error_handling.ErrorHandler(beam.Map(lambda x: x)) | ||
_ = pcoll | PTransformWithErrors(3).with_error_handler(error_handler) | ||
|
||
def test_collecting_error_handler(self): | ||
with beam.Pipeline() as p: | ||
pcoll = p | beam.Create(['a', 'bb', 'cccc']) | ||
with error_handling.CollectingErrorHandler() as error_handler: | ||
result = pcoll | beam.Map( | ||
exception_throwing_map, limit=3).with_error_handler(error_handler) | ||
error_pcoll = error_handler.output() | beam.Map(lambda x: x[0]) | ||
|
||
assert_that(result, equal_to(['A', 'Bb']), label='CheckGood') | ||
assert_that(error_pcoll, equal_to(['cccc']), label='CheckBad') | ||
|
||
def test_error_on_collecting_error_handler_without_output_retrieval(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we also add a unit test with the CollectingErrorHandler that is closed but not consumed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is closed (due to the context) but not consumed. |
||
with self.assertRaisesRegex( | ||
RuntimeError, | ||
r'.*CollectingErrorHandler requires the output to be retrieved.*'): | ||
with beam.Pipeline() as p: | ||
pcoll = p | beam.Create(['a', 'bb', 'cccc']) | ||
with error_handling.CollectingErrorHandler() as error_handler: | ||
_ = pcoll | beam.Map( | ||
exception_throwing_map, | ||
limit=3).with_error_handler(error_handler) | ||
|
||
|
||
if __name__ == '__main__': | ||
logging.getLogger().setLevel(logging.INFO) | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also have a unit test that would show how the
**exception_handling_kwargs
can be used?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Done.