Skip to content

feat: Add agent-to-agent communication support (#625)#654

Open
kabir wants to merge 3 commits intoa2aproject:mainfrom
kabir:agent2agenttest
Open

feat: Add agent-to-agent communication support (#625)#654
kabir wants to merge 3 commits intoa2aproject:mainfrom
kabir:agent2agenttest

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Feb 13, 2026

Implements server-side agent-to-agent communication where an AgentExecutor can use a client to connect back to the same server, enabling agents to delegate work to other agents.

Fixes #625 🦕

Implements server-side agent-to-agent communication where an AgentExecutor
can use a client to connect back to the same server, enabling agents to
delegate work to other agents.
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant new capability: server-side agent-to-agent communication. This feature allows an agent, specifically an AgentExecutor, to act as a client and delegate work to other agents running on the same server. This enables more complex agent orchestration patterns where agents can request services from their peers. The changes include updates to various transport handlers to record the active protocol, a new ServerCallContext key for transport identification, and robust testing infrastructure to ensure the delegation works across different communication protocols.

Highlights

  • Agent-to-Agent Communication: Introduced server-side agent-to-agent communication, allowing an AgentExecutor to delegate tasks to other agents on the same server using a client.
  • Transport Protocol Identification: Added a TRANSPORT_KEY to ServerCallContext to store and retrieve the active transport protocol (JSONRPC, HTTP_JSON, GRPC) for incoming requests, enabling transport-aware client creation for delegation.
  • New Test Infrastructure: Implemented a comprehensive test case (testAgentToAgentCommunication) to validate the agent-to-agent delegation mechanism, including a new AgentToAgentClientFactory to dynamically create clients for different transport types.
  • Dependency Update: Added a new test dependency for a2a-java-sdk-client-transport-jsonrpc to support agent-to-agent communication tests.
Changelog
  • reference/jsonrpc/pom.xml
    • Added a2a-java-sdk-client-transport-jsonrpc as a test dependency.
  • reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java
    • Imported ServerCallContext.TRANSPORT_KEY and TransportProtocol.
    • Set the TRANSPORT_KEY in the ServerCallContext state to TransportProtocol.JSONRPC.
  • reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java
    • Imported ServerCallContext.TRANSPORT_KEY and TransportProtocol.
    • Set the TRANSPORT_KEY in the ServerCallContext state to TransportProtocol.HTTP_JSON.
  • server-common/src/main/java/io/a2a/server/ServerCallContext.java
    • Added a new public static final string TRANSPORT_KEY to identify the transport protocol in the state map.
  • tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
    • Added imports for UUID and Collectors.
    • Removed a System.out.println debugging statement.
    • Implemented testAgentToAgentCommunication to verify agent-to-agent delegation and local handling.
    • Added a private helper method extractTextFromTask to process task artifacts.
  • tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
    • Added imports for client-side components, TransportProtocol, and ServerCallContext.TRANSPORT_KEY.
    • Injected AgentCard to enable client creation.
    • Modified the execute method to handle agent-to-agent test messages, distinguishing between delegated and local tasks.
    • Introduced handleAgentToAgentTest, handleDelegation, and handleLocally private methods to manage the delegation logic.
    • Added getServerUrl helper method to determine the correct server URL based on the transport protocol for testing.
  • tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java
    • Added a new file AgentToAgentClientFactory to create A2A clients for different transport protocols dynamically.
    • Defined ClientTransportEnhancer interface and its implementations (GrpcClientEnhancer, JsonRpcClientEnhancer, RestClientEnhancer) to configure transport-specific client builders.
  • transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java
    • Imported ServerCallContext.TRANSPORT_KEY and TransportProtocol.
    • Set the TRANSPORT_KEY in the ServerCallContext state to TransportProtocol.GRPC.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively implements agent-to-agent communication by passing the transport protocol through the ServerCallContext. The new AgentExecutor logic correctly uses this context to create a client and delegate tasks. The addition of a comprehensive test case in AbstractA2AServerTest is great for verifying this new feature. My review includes a few suggestions to improve code maintainability by reducing duplication and clarifying documentation.

@kabir
Copy link
Collaborator Author

kabir commented Feb 13, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces agent-to-agent communication support by passing the transport protocol via the ServerCallContext. This enables an AgentExecutor to instantiate a client with a matching transport to delegate tasks to other agents on the same server. The implementation correctly spans the JSON-RPC, REST, and gRPC transport layers, and is accompanied by new tests verifying both delegation and local handling scenarios. My feedback focuses on enhancing code maintainability by addressing duplication in the test suite and leveraging modern Java features for more concise and robust code.

Comment on lines +2487 to +2499
BiConsumer<ClientEvent, AgentCard> delegationConsumer = (event, agentCard) -> {
Task task = null;
if (event instanceof TaskEvent taskEvent) {
task = taskEvent.getTask();
} else if (event instanceof TaskUpdateEvent taskUpdateEvent) {
task = taskUpdateEvent.getTask();
}

if (task != null && task.status().state().isFinal()) {
delegationResultRef.set(task);
delegationLatch.countDown();
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for this BiConsumer is duplicated in testAgentToAgentLocalHandling on lines 2560-2572. To improve maintainability and avoid code repetition, consider extracting this logic into a private helper method. This method could accept the AtomicReference<Task> and CountDownLatch as parameters and return the configured BiConsumer.

Comment on lines +178 to +250
Client client = null;
try {
client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol, serverUrl);

agentEmitter.startWork();

// Set up consumer to capture task result
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Task> resultRef = new AtomicReference<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();

BiConsumer<ClientEvent, AgentCard> consumer = (event, agentCard) -> {
Task task = null;
if (event instanceof TaskEvent taskEvent) {
task = taskEvent.getTask();
} else if (event instanceof TaskUpdateEvent taskUpdateEvent) {
task = taskUpdateEvent.getTask();
}

if (task != null && task.status().state().isFinal()) {
resultRef.set(task);
latch.countDown();
}
};

// Delegate to another agent (new task on same server)
// Add a marker so the receiving agent knows to complete the task
Message delegatedMessage = A2A.toUserMessage("#a2a-delegated#" + delegatedContent);
client.sendMessage(delegatedMessage, List.of(consumer), error -> {
errorRef.set(error);
latch.countDown();
});

// Wait for response
if (!latch.await(30, TimeUnit.SECONDS)) {
agentEmitter.fail(new InternalError("Timeout waiting for delegated response"));
return;
}

Task delegatedResult = resultRef.get();

// Check for error only if we didn't get a successful result
// (errors can occur after completion due to stream cleanup)
if (delegatedResult == null && errorRef.get() != null) {
agentEmitter.fail(new InternalError("Delegation failed: " + errorRef.get().getMessage()));
return;
}

if (delegatedResult == null) {
agentEmitter.fail(new InternalError("No result received from delegation"));
return;
}

// Extract artifacts from delegated task and add to current task
// NOTE: We cannot use emitter.addTask(delegatedResult) because it has a different taskId
if (delegatedResult.artifacts() != null && !delegatedResult.artifacts().isEmpty()) {
for (Artifact artifact : delegatedResult.artifacts()) {
agentEmitter.addArtifact(artifact.parts());
}
}

// Complete current task
agentEmitter.complete();
} catch (A2AClientException e) {
agentEmitter.fail(new InternalError("Failed to create client: " + e.getMessage()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
agentEmitter.fail(new InternalError("Interrupted while waiting for response"));
} finally {
if (client != null) {
client.close();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since Client implements AutoCloseable, you can use a try-with-resources statement here. This will make the code more concise and ensure that the client is always closed correctly, even if exceptions occur, by removing the need for a finally block.

                try (Client client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol, serverUrl)) {

                    agentEmitter.startWork();

                    // Set up consumer to capture task result
                    CountDownLatch latch = new CountDownLatch(1);
                    AtomicReference<Task> resultRef = new AtomicReference<>();
                    AtomicReference<Throwable> errorRef = new AtomicReference<>();

                    BiConsumer<ClientEvent, AgentCard> consumer = (event, agentCard) -> {
                        Task task = null;
                        if (event instanceof TaskEvent taskEvent) {
                            task = taskEvent.getTask();
                        } else if (event instanceof TaskUpdateEvent taskUpdateEvent) {
                            task = taskUpdateEvent.getTask();
                        }

                        if (task != null && task.status().state().isFinal()) {
                            resultRef.set(task);
                            latch.countDown();
                        }
                    };

                    // Delegate to another agent (new task on same server)
                    // Add a marker so the receiving agent knows to complete the task
                    Message delegatedMessage = A2A.toUserMessage("#a2a-delegated#" + delegatedContent);
                    client.sendMessage(delegatedMessage, List.of(consumer), error -> {
                        errorRef.set(error);
                        latch.countDown();
                    });

                    // Wait for response
                    if (!latch.await(30, TimeUnit.SECONDS)) {
                        agentEmitter.fail(new InternalError("Timeout waiting for delegated response"));
                        return;
                    }

                    Task delegatedResult = resultRef.get();

                    // Check for error only if we didn't get a successful result
                    // (errors can occur after completion due to stream cleanup)
                    if (delegatedResult == null && errorRef.get() != null) {
                        agentEmitter.fail(new InternalError("Delegation failed: " + errorRef.get().getMessage()));
                        return;
                    }

                    if (delegatedResult == null) {
                        agentEmitter.fail(new InternalError("No result received from delegation"));
                        return;
                    }

                    // Extract artifacts from delegated task and add to current task
                    // NOTE: We cannot use emitter.addTask(delegatedResult) because it has a different taskId
                    if (delegatedResult.artifacts() != null && !delegatedResult.artifacts().isEmpty()) {
                        for (Artifact artifact : delegatedResult.artifacts()) {
                            agentEmitter.addArtifact(artifact.parts());
                        }
                    }

                    // Complete current task
                    agentEmitter.complete();
                } catch (A2AClientException e) {
                    agentEmitter.fail(new InternalError("Failed to create client: " + e.getMessage()));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    agentEmitter.fail(new InternalError("Interrupted while waiting for response"));
                }

Comment on lines +42 to +55
ClientTransportEnhancer enhancer;
switch (transportProtocol) {
case JSONRPC:
enhancer = new JsonRpcClientEnhancer();
break;
case GRPC:
enhancer = new GrpcClientEnhancer();
break;
case HTTP_JSON:
enhancer = new RestClientEnhancer();
break;
default:
throw new IllegalArgumentException("Unsupported transport: " + transportProtocol);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This switch statement can be converted to a switch expression. This will make the code more concise and, since you're switching on an enum, it provides compile-time safety by ensuring all cases are handled.

        ClientTransportEnhancer enhancer = switch (transportProtocol) {
            case JSONRPC -> new JsonRpcClientEnhancer();
            case GRPC -> new GrpcClientEnhancer();
            case HTTP_JSON -> new RestClientEnhancer();
        };

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add tests using client from an AgentExecutor

1 participant