feat: Add agent-to-agent communication support (#625)#654
feat: Add agent-to-agent communication support (#625)#654kabir wants to merge 3 commits intoa2aproject:mainfrom
Conversation
Implements server-side agent-to-agent communication where an AgentExecutor can use a client to connect back to the same server, enabling agents to delegate work to other agents.
Summary of ChangesHello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant new capability: server-side agent-to-agent communication. This feature allows an agent, specifically an Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request effectively implements agent-to-agent communication by passing the transport protocol through the ServerCallContext. The new AgentExecutor logic correctly uses this context to create a client and delegate tasks. The addition of a comprehensive test case in AbstractA2AServerTest is great for verifying this new feature. My review includes a few suggestions to improve code maintainability by reducing duplication and clarifying documentation.
tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
Outdated
Show resolved
Hide resolved
tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
Show resolved
Hide resolved
tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java
Show resolved
Hide resolved
tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java
Outdated
Show resolved
Hide resolved
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces agent-to-agent communication support by passing the transport protocol via the ServerCallContext. This enables an AgentExecutor to instantiate a client with a matching transport to delegate tasks to other agents on the same server. The implementation correctly spans the JSON-RPC, REST, and gRPC transport layers, and is accompanied by new tests verifying both delegation and local handling scenarios. My feedback focuses on enhancing code maintainability by addressing duplication in the test suite and leveraging modern Java features for more concise and robust code.
| BiConsumer<ClientEvent, AgentCard> delegationConsumer = (event, agentCard) -> { | ||
| Task task = null; | ||
| if (event instanceof TaskEvent taskEvent) { | ||
| task = taskEvent.getTask(); | ||
| } else if (event instanceof TaskUpdateEvent taskUpdateEvent) { | ||
| task = taskUpdateEvent.getTask(); | ||
| } | ||
|
|
||
| if (task != null && task.status().state().isFinal()) { | ||
| delegationResultRef.set(task); | ||
| delegationLatch.countDown(); | ||
| } | ||
| }; |
There was a problem hiding this comment.
The logic for this BiConsumer is duplicated in testAgentToAgentLocalHandling on lines 2560-2572. To improve maintainability and avoid code repetition, consider extracting this logic into a private helper method. This method could accept the AtomicReference<Task> and CountDownLatch as parameters and return the configured BiConsumer.
| Client client = null; | ||
| try { | ||
| client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol, serverUrl); | ||
|
|
||
| agentEmitter.startWork(); | ||
|
|
||
| // Set up consumer to capture task result | ||
| CountDownLatch latch = new CountDownLatch(1); | ||
| AtomicReference<Task> resultRef = new AtomicReference<>(); | ||
| AtomicReference<Throwable> errorRef = new AtomicReference<>(); | ||
|
|
||
| BiConsumer<ClientEvent, AgentCard> consumer = (event, agentCard) -> { | ||
| Task task = null; | ||
| if (event instanceof TaskEvent taskEvent) { | ||
| task = taskEvent.getTask(); | ||
| } else if (event instanceof TaskUpdateEvent taskUpdateEvent) { | ||
| task = taskUpdateEvent.getTask(); | ||
| } | ||
|
|
||
| if (task != null && task.status().state().isFinal()) { | ||
| resultRef.set(task); | ||
| latch.countDown(); | ||
| } | ||
| }; | ||
|
|
||
| // Delegate to another agent (new task on same server) | ||
| // Add a marker so the receiving agent knows to complete the task | ||
| Message delegatedMessage = A2A.toUserMessage("#a2a-delegated#" + delegatedContent); | ||
| client.sendMessage(delegatedMessage, List.of(consumer), error -> { | ||
| errorRef.set(error); | ||
| latch.countDown(); | ||
| }); | ||
|
|
||
| // Wait for response | ||
| if (!latch.await(30, TimeUnit.SECONDS)) { | ||
| agentEmitter.fail(new InternalError("Timeout waiting for delegated response")); | ||
| return; | ||
| } | ||
|
|
||
| Task delegatedResult = resultRef.get(); | ||
|
|
||
| // Check for error only if we didn't get a successful result | ||
| // (errors can occur after completion due to stream cleanup) | ||
| if (delegatedResult == null && errorRef.get() != null) { | ||
| agentEmitter.fail(new InternalError("Delegation failed: " + errorRef.get().getMessage())); | ||
| return; | ||
| } | ||
|
|
||
| if (delegatedResult == null) { | ||
| agentEmitter.fail(new InternalError("No result received from delegation")); | ||
| return; | ||
| } | ||
|
|
||
| // Extract artifacts from delegated task and add to current task | ||
| // NOTE: We cannot use emitter.addTask(delegatedResult) because it has a different taskId | ||
| if (delegatedResult.artifacts() != null && !delegatedResult.artifacts().isEmpty()) { | ||
| for (Artifact artifact : delegatedResult.artifacts()) { | ||
| agentEmitter.addArtifact(artifact.parts()); | ||
| } | ||
| } | ||
|
|
||
| // Complete current task | ||
| agentEmitter.complete(); | ||
| } catch (A2AClientException e) { | ||
| agentEmitter.fail(new InternalError("Failed to create client: " + e.getMessage())); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| agentEmitter.fail(new InternalError("Interrupted while waiting for response")); | ||
| } finally { | ||
| if (client != null) { | ||
| client.close(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Since Client implements AutoCloseable, you can use a try-with-resources statement here. This will make the code more concise and ensure that the client is always closed correctly, even if exceptions occur, by removing the need for a finally block.
try (Client client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol, serverUrl)) {
agentEmitter.startWork();
// Set up consumer to capture task result
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Task> resultRef = new AtomicReference<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();
BiConsumer<ClientEvent, AgentCard> consumer = (event, agentCard) -> {
Task task = null;
if (event instanceof TaskEvent taskEvent) {
task = taskEvent.getTask();
} else if (event instanceof TaskUpdateEvent taskUpdateEvent) {
task = taskUpdateEvent.getTask();
}
if (task != null && task.status().state().isFinal()) {
resultRef.set(task);
latch.countDown();
}
};
// Delegate to another agent (new task on same server)
// Add a marker so the receiving agent knows to complete the task
Message delegatedMessage = A2A.toUserMessage("#a2a-delegated#" + delegatedContent);
client.sendMessage(delegatedMessage, List.of(consumer), error -> {
errorRef.set(error);
latch.countDown();
});
// Wait for response
if (!latch.await(30, TimeUnit.SECONDS)) {
agentEmitter.fail(new InternalError("Timeout waiting for delegated response"));
return;
}
Task delegatedResult = resultRef.get();
// Check for error only if we didn't get a successful result
// (errors can occur after completion due to stream cleanup)
if (delegatedResult == null && errorRef.get() != null) {
agentEmitter.fail(new InternalError("Delegation failed: " + errorRef.get().getMessage()));
return;
}
if (delegatedResult == null) {
agentEmitter.fail(new InternalError("No result received from delegation"));
return;
}
// Extract artifacts from delegated task and add to current task
// NOTE: We cannot use emitter.addTask(delegatedResult) because it has a different taskId
if (delegatedResult.artifacts() != null && !delegatedResult.artifacts().isEmpty()) {
for (Artifact artifact : delegatedResult.artifacts()) {
agentEmitter.addArtifact(artifact.parts());
}
}
// Complete current task
agentEmitter.complete();
} catch (A2AClientException e) {
agentEmitter.fail(new InternalError("Failed to create client: " + e.getMessage()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
agentEmitter.fail(new InternalError("Interrupted while waiting for response"));
}| ClientTransportEnhancer enhancer; | ||
| switch (transportProtocol) { | ||
| case JSONRPC: | ||
| enhancer = new JsonRpcClientEnhancer(); | ||
| break; | ||
| case GRPC: | ||
| enhancer = new GrpcClientEnhancer(); | ||
| break; | ||
| case HTTP_JSON: | ||
| enhancer = new RestClientEnhancer(); | ||
| break; | ||
| default: | ||
| throw new IllegalArgumentException("Unsupported transport: " + transportProtocol); | ||
| } |
There was a problem hiding this comment.
This switch statement can be converted to a switch expression. This will make the code more concise and, since you're switching on an enum, it provides compile-time safety by ensuring all cases are handled.
ClientTransportEnhancer enhancer = switch (transportProtocol) {
case JSONRPC -> new JsonRpcClientEnhancer();
case GRPC -> new GrpcClientEnhancer();
case HTTP_JSON -> new RestClientEnhancer();
};
Implements server-side agent-to-agent communication where an AgentExecutor can use a client to connect back to the same server, enabling agents to delegate work to other agents.
Fixes #625 🦕