-
Notifications
You must be signed in to change notification settings - Fork 405
java: use message/stream + SSE parsing; keep message/send #359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @Devenvaruv, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request refines the Java client and server implementations to fully support the A2A protocol's streaming capabilities (v0.3.0). It introduces a dedicated "message/stream" method for streaming RPCs, updates the client's Server-Sent Events (SSE) parsing for accurate data extraction, and configures the server to correctly enforce the new streaming method for its "/a2a/stream" endpoint. These changes ensure proper communication for both streaming and non-streaming interactions.
Highlights
- Protocol Alignment: The Java sample now aligns with A2A protocol version v0.3.0, distinguishing between "message/stream" for streaming RPCs and "message/send" for non-streaming RPCs.
- Client Streaming Update: The A2AClient now uses "message/stream" when POSTing to "/a2a/stream" for streaming requests, ensuring the correct method is invoked for server-sent events.
- Improved SSE Parsing: The client's Server-Sent Events (SSE) parsing logic has been enhanced to specifically read "data:" lines and process JSON payloads upon encountering blank-line boundaries, ensuring robust and accurate handling of streaming responses.
- Server Endpoint Enforcement: The server's "/a2a/stream" endpoint now strictly requires the "message/stream" method, preventing incorrect method usage for streaming interactions.
- Bug Fix: This change addresses and fixes issue #358.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request updates the Java sample to align with A2A protocol version v0.3.0, primarily by changing the streaming RPC method to message/stream
and implementing proper SSE parsing on the client. The changes look good and correctly implement the protocol update.
I've provided a few suggestions for the client implementation:
- A high-severity bug fix for the SSE parsing logic, where an error in the final message of a stream would be ignored.
- A medium-severity suggestion to improve the streaming implementation to be more efficient and correct by processing the stream line-by-line instead of buffering the entire response.
- A low-severity style suggestion to improve code consistency.
Overall, this is a good update that brings the Java sample up to date.
if (buf.length() > 0) { | ||
SendTaskStreamingResponse msg = objectMapper.readValue(buf.toString(), | ||
SendTaskStreamingResponse.class); | ||
if (msg.result() != null) | ||
listener.onEvent(msg.result()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic for handling the final part of the stream is missing error handling. If the last message from the server is an error, it will be ignored and onComplete()
will be called, which is incorrect. You should add the same error handling logic that is present in the main loop (lines 151-155). Also, as a matter of style, it's good practice to use braces for the if
statement on line 172.
if (buf.length() > 0) {
SendTaskStreamingResponse msg = objectMapper.readValue(buf.toString(),
SendTaskStreamingResponse.class);
if (msg.error() != null) {
Integer code = msg.error().code() != null ? msg.error().code().getValue() : null;
listener.onError(new A2AClientException(msg.error().message(), code));
return;
}
if (msg.result() != null) {
listener.onEvent(msg.result());
}
}
.POST(HttpRequest.BodyPublishers.ofString(requestBody)) | ||
.build(); | ||
|
||
HttpResponse<String> response = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation reads the entire response body into memory using HttpResponse.BodyHandlers.ofString()
before processing. This can be inefficient for large streams and isn't a true streaming implementation. Consider using HttpResponse.BodyHandlers.ofLines()
which provides a Stream<String>
to process the response line-by-line as it arrives, without buffering the whole body in memory.
if (msg.result() != null) | ||
listener.onEvent(msg.result()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{ "method": "message/stream" }
to/a2a/stream
.data:
lines and parses on blank-line boundaries./a2a/stream
requiresmessage/stream
./a2a
withmessage/send
.Protocol alignment (current)
message/stream
message/send
Fixes #358 🦕