What happened?
Apache Beam SDK Version: 2.40.0
SDK Language: Python
Runner: All (DirectRunner, DataflowRunner, PortableRunner etc.)
TL;DR: If a DoFn.process has yield <some value> as well as return <some iterable> statements, then it does not emit element for statement return <some iterable>.
The Apache Beam, Programming Guide, 4.2.1.2. Creating a DoFn states that:
Your process method should accept an argument element, which is the input element, and return an iterable with its output values. You can accomplish this by emitting individual elements with yield statements. You can also use a return statement with an iterable, like a list or a generator.
That statement is correct when a DoFn.process:
- either uses
yield <some value> only
- or, uses
return <some iterable> only
to emit an element, within the definition.
If the combination of yield and return are used in the DoFn.process() definition, then it does not comply with the statement made in the document.
See this example pipeline:
# dofn_issue.py
import argparse
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
logger = logging.getLogger(__name__)
logger.setLevel("INFO")
class Pipeline:
def run(self, args=None):
parser = argparse.ArgumentParser()
_, extra_args = parser.parse_known_args(args)
pipeline_options = PipelineOptions(extra_args, save_main_session=True)
with beam.Pipeline(options=pipeline_options) as pipeline:
data_to_process = pipeline | beam.Create(
[
{"key": 1},
{"key": 2},
{"key": None},
{"key": 4},
],
)
data_to_process | beam.ParDo(SetDefaultValFn1()) | "1" >> beam.ParDo(LogElementsFn(), 1)
data_to_process | beam.ParDo(SetDefaultValFn2()) | "2" >> beam.ParDo(LogElementsFn(), 2)
data_to_process | beam.ParDo(SetDefaultValFn3()) | "3" >> beam.ParDo(LogElementsFn(), 3)
data_to_process | beam.ParDo(SetDefaultValFn4()) | "4" >> beam.ParDo(LogElementsFn(), 4)
# NOT EXPECTED - `return` statement doesn't emit data
class SetDefaultValFn1(beam.DoFn):
def process(self, element):
data = element.get("key")
if not data:
return [{"key": -9999}]
yield element
# NOT EXPECTED - `yield` statement only emits data
class SetDefaultValFn2(beam.DoFn):
def process(self, element):
data = element.get("key")
if not data:
yield {"key": -9999}
return
return [element]
# EXPECTED
class SetDefaultValFn3(beam.DoFn):
def process(self, element):
data = element.get("key")
if not data:
return [{"key": -9999}]
return [element]
# EXPECTED
class SetDefaultValFn4(beam.DoFn):
def process(self, element):
data = element.get("key")
if not data:
yield {"key": -9999}
return
yield element
class LogElementsFn(beam.DoFn):
def process(self, element, where):
logger.info(f"From {where} found {element}")
yield element
if __name__ == "__main__":
pipeline = Pipeline()
pipeline.run()
Actual Output:
$ python dofn_issue.py \
--runner DirectRunner
INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
# NOTE: SetDefaultValFn1 silently skipped element {'key': -9999}
INFO:__main__:From 1 found {'key': 4}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 1}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 2}
INFO:__main__:From 2 found {'key': -9999}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 4}
INFO:__main__:From 3 found {'key': 1}
INFO:__main__:From 3 found {'key': 2}
INFO:__main__:From 3 found {'key': -9999}
INFO:__main__:From 3 found {'key': 4}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}
PS: Output is manually ordered for ease of interpretation.
Expected Output:
$ python dofn_issue.py \
--runner DirectRunner
INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
INFO:__main__:From 1 found {'key': -9999}
INFO:__main__:From 1 found {'key': 4}
INFO:__main__:From 2 found {'key': 1}
INFO:__main__:From 2 found {'key': 2}
INFO:__main__:From 2 found {'key': -9999}
INFO:__main__:From 2 found {'key': 4}
INFO:__main__:From 3 found {'key': 1}
INFO:__main__:From 3 found {'key': 2}
INFO:__main__:From 3 found {'key': -9999}
INFO:__main__:From 3 found {'key': 4}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}
PS: Output is manually ordered for ease of interpretation.
If the analogy of DoFn.process is similar to a Python generator, then on running a similar code (made purely with generator):
# generator_eg.py
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class SetDefaultVal1:
@staticmethod
def process(element):
data = element.get("key")
if not data:
return [{"key": -9999}]
yield element
class SetDefaultVal2:
@staticmethod
def process(element):
data = element.get("key")
if not data:
yield {"key": -9999}
return
return [element]
class SetDefaultVal3:
@staticmethod
def process(element):
data = element.get("key")
if not data:
return [{"key": -9999}]
return [element]
class SetDefaultVal4:
@staticmethod
def process(element):
data = element.get("key")
if not data:
yield {"key": -9999}
return
yield element
def run_and_log(Generator, element, where):
generator = Generator.process(element)
try:
processed_element = next(generator)
logger.info(f"From {where} found {processed_element}")
except Exception:
pass
if __name__ == "__main__":
for element in [
{"key": 1},
{"key": 2},
{"key": None},
{"key": 4},
]:
run_and_log(SetDefaultVal1, element, 1)
run_and_log(SetDefaultVal2, element, 2)
run_and_log(SetDefaultVal3, element, 3)
run_and_log(SetDefaultVal4, element, 4)
Outputs:
$ python generator_eg.py
INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
INFO:__main__:From 1 found {'key': 4}
INFO:__main__:From 2 found {'key': -9999}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}
# PS: Output is manually ordered for ease of interpretation.
If we compare behavior of DoFn.process with Python generator, for all 4 cases:
Case 4.
- Output is expected for
dofn_issue.py & generator_eg.py.
- Output is same for
dofn_issue.py & generator_eg.py.
Case 3.
- Output is expected for
dofn_issue.py & generator_eg.py.
- Output is NOT same for
dofn_issue.py & generator_eg.py.
Case 2.
- a) Output is NOT expected for
dofn_issue.py. b) Output is expected for generator_eg.py.
- Output is same for
dofn_issue.py & generator_eg.py.
Case 1.
- a) Output is NOT expected for
dofn_issue.py. b) Output is expected for generator_eg.py.
- Output is same for
dofn_issue.py & generator_eg.py.
Discrepancy
If dofn_issue.py for case 3 emits on statement return <some iterable> (which is NOT the case with generator_eg.py), then why it does NOT emit for case 1 and case2's statement return <some iterable>.
Issue Priority
Priority: 1
Issue Component
Component: sdk-py-core
What happened?
Apache Beam SDK Version: 2.40.0
SDK Language: Python
Runner: All (DirectRunner, DataflowRunner, PortableRunner etc.)
TL;DR: If a
DoFn.processhasyield <some value>as well asreturn <some iterable>statements, then it does not emit element for statementreturn <some iterable>.The Apache Beam, Programming Guide, 4.2.1.2. Creating a DoFn states that:
That statement is correct when a
DoFn.process:yield <some value>onlyreturn <some iterable>onlyto emit an element, within the definition.
If the combination of
yieldandreturnare used in theDoFn.process()definition, then it does not comply with the statement made in the document.See this example pipeline:
Actual Output:
$ python dofn_issue.py \ --runner DirectRunner INFO:__main__:From 1 found {'key': 1} INFO:__main__:From 1 found {'key': 2} # NOTE: SetDefaultValFn1 silently skipped element {'key': -9999} INFO:__main__:From 1 found {'key': 4} # NOTE: SetDefaultValFn2 silently skipped element {'key': 1} # NOTE: SetDefaultValFn2 silently skipped element {'key': 2} INFO:__main__:From 2 found {'key': -9999} # NOTE: SetDefaultValFn2 silently skipped element {'key': 4} INFO:__main__:From 3 found {'key': 1} INFO:__main__:From 3 found {'key': 2} INFO:__main__:From 3 found {'key': -9999} INFO:__main__:From 3 found {'key': 4} INFO:__main__:From 4 found {'key': 1} INFO:__main__:From 4 found {'key': 2} INFO:__main__:From 4 found {'key': -9999} INFO:__main__:From 4 found {'key': 4} PS: Output is manually ordered for ease of interpretation.Expected Output:
$ python dofn_issue.py \ --runner DirectRunner INFO:__main__:From 1 found {'key': 1} INFO:__main__:From 1 found {'key': 2} INFO:__main__:From 1 found {'key': -9999} INFO:__main__:From 1 found {'key': 4} INFO:__main__:From 2 found {'key': 1} INFO:__main__:From 2 found {'key': 2} INFO:__main__:From 2 found {'key': -9999} INFO:__main__:From 2 found {'key': 4} INFO:__main__:From 3 found {'key': 1} INFO:__main__:From 3 found {'key': 2} INFO:__main__:From 3 found {'key': -9999} INFO:__main__:From 3 found {'key': 4} INFO:__main__:From 4 found {'key': 1} INFO:__main__:From 4 found {'key': 2} INFO:__main__:From 4 found {'key': -9999} INFO:__main__:From 4 found {'key': 4} PS: Output is manually ordered for ease of interpretation.If the analogy of
DoFn.processis similar to a Pythongenerator, then on running a similar code (made purely withgenerator):Outputs:
$ python generator_eg.py INFO:__main__:From 1 found {'key': 1} INFO:__main__:From 1 found {'key': 2} INFO:__main__:From 1 found {'key': 4} INFO:__main__:From 2 found {'key': -9999} INFO:__main__:From 4 found {'key': 1} INFO:__main__:From 4 found {'key': 2} INFO:__main__:From 4 found {'key': -9999} INFO:__main__:From 4 found {'key': 4} # PS: Output is manually ordered for ease of interpretation.If we compare behavior of
DoFn.processwith Pythongenerator, for all 4 cases:Case 4.
dofn_issue.py&generator_eg.py.dofn_issue.py&generator_eg.py.Case 3.
dofn_issue.py&generator_eg.py.dofn_issue.py&generator_eg.py.Case 2.
dofn_issue.py. b) Output is expected forgenerator_eg.py.dofn_issue.py&generator_eg.py.Case 1.
dofn_issue.py. b) Output is expected forgenerator_eg.py.dofn_issue.py&generator_eg.py.Discrepancy
If
dofn_issue.pyfor case 3 emits on statementreturn <some iterable>(which is NOT the case withgenerator_eg.py), then why it does NOT emit for case 1 and case2's statementreturn <some iterable>.Issue Priority
Priority: 1
Issue Component
Component: sdk-py-core