fix(a2a): Promote RemoteA2aAgent response to workflow node output#5852
fix(a2a): Promote RemoteA2aAgent response to workflow node output#5852Harineko0 wants to merge 2 commits into
Conversation
bf16db8 to
dda9793
Compare
|
Hi @rohityan thank you for the look. I noticed the
|
|
Hi @Harineko0 ,Thank you for your contribution! We appreciate you taking the time to submit this pull request. Please fix formatting errors by running autoformat.sh |
RemoteA2aAgent inherits BaseAgent._run_impl, which never sets event.output or message_as_output, so NodeRunner leaves ctx.output as None for A2A agent nodes. When a JoinNode aggregates parallel RemoteA2aAgent predecessors, every value in the joined dict comes back as None. Override _run_impl on RemoteA2aAgent to mirror LlmAgent: join the non-thought, non-function-call/response text parts of each yielded event into event.output and set message_as_output=True. Partial, foreign-author, and input-required (mock function call) events are skipped.
The v2 A2A response handler delegates to converters that do not mark streaming working-state text as thought=True, so the prior fix promoted every non-partial text event to event.output. NodeRunner sets ctx.output from the first one and raises "Output already set" on the next, breaking streaming RemoteA2aAgent workflow nodes before they reach the real final answer. Skip events whose A2A task state is submitted, working, input-required, auth-required, or unknown (read from custom_metadata['a2a:response']), and short-circuit further promotion in _run_impl after the first terminal event so trailing artifact updates on a completed task don't trigger the double-set.
ce548ea to
bb6690f
Compare
|
@rohityan Thank you for taking your time. I've rebased the PR to the latest main branch and confirmed that the |
Link to Issue or Description of Change
Problem
When a
RemoteA2aAgentis used as a static node in aWorkflowgraph that feeds into aJoinNode, the joined output containsNonefor everyRemoteA2aAgentpredecessor.Reproducer (simplified from a real coordinator graph):
Observed
JoinNodeinput:Root cause:
RemoteA2aAgentinherits the defaultBaseAgent._run_impl, which iteratesrun_asyncand yields events without ever settingevent.outputorevent.node_info.message_as_output. As a result,NodeRunner._track_event_in_contextleavesctx.outputasNone, andWorkflow._handle_completionnever records an entry inloop_state.node_outputsfor that predecessor.JoinNodethen seesNonefor it.LlmAgentalready solves the equivalent problem by overriding_run_impland promoting the model's text reply toevent.output(viaprocess_llm_agent_outputin_llm_agent_wrapper.py).RemoteA2aAgenthad no equivalent hook.Solution
Add a workflow-only override of
_run_implonRemoteA2aAgentthat mirrorsLlmAgent's behavior. For each event yielded byBaseAgent._run_impl, a new_promote_response_to_outputhelper joins the text of all parts that are not thoughts, function calls, or function responses, assigns it toevent.output, and setsevent.node_info.message_as_output = True(consistent withLlmAgent, preventsNodeRunner._flush_output_and_deltasfrom emitting a duplicate trailing output event).The helper skips:
event.outputis already setworking/submittedtask statuses that the legacy_handle_a2a_responsemarksthought=True)input_required/auth_requiredmock function call inserted by_create_mock_function_call_for_required_user_input— those should remain interrupts, not outputs)submitted,working,input-required,auth-required,unknown). The v2 integration path (_handle_a2a_response_v2) delegates to converters that do not mark streamingworkingtext asthought=True, so the thought filter alone is not enough. Without this guard, aworkingtext event and the subsequentcompletedtext event would each try to setevent.output, causingNodeRunnerto raiseValueError: Output already seton the second event and aborting the run before the real final answer ever surfaced. The state is read fromevent.custom_metadata['a2a:response']['status']['state'], which_run_async_implalready stamps before yield. PlainA2AMessageresponses (no status field) and terminal task states (completed,failed,canceled,rejected) still promote.In addition,
_run_implshort-circuits after the first successful promotion. This protects against the case where a server emits multiple terminal-state events for one run (e.g. acompletedstatus update followed by trailing artifact updates on the same already-completed task) — only the first terminal event becomes the node's output, subsequent ones pass through untouched.Scope is intentionally narrow: only the agent boundary is touched.
to_adk_event.pyand the workflow scheduler are unchanged, since the same workaround (promoting content → output at the agent layer) is whatLlmAgentdoes and what keeps the fix local.Testing Plan
Unit Tests
Added
TestRemoteA2aAgentWorkflowOutputintests/unittests/agents/test_remote_a2a_agent.py(20 cases).Manual End-to-End (E2E) Tests
I ran the failing workflow described in the Problem section against a real ADK app: a
Workflowgraph whoseSTARTfans out into multipleRemoteA2aAgentnodes that all feed into a singleJoinNode. Each remote specialist runs as its own A2A server; the coordinator runs the workflow and forwards the joined dict to a downstream synthesis step.Before the fix:
After the fix:
Checklist
Additional context
The fix mirrors the pattern
LlmAgentalready uses (process_llm_agent_outputinsrc/google/adk/workflow/_llm_agent_wrapper.py), keeping output-promotion at the agent boundary rather than touching the workflow scheduler or the A2A converters. This minimizes blast radius and avoids regressions in non-workflow usages ofRemoteA2aAgent, where the_run_implpath is not exercised.