1
1
from a2a .client import A2AClient
2
2
from typing import Any
3
3
from uuid import uuid4
4
- from a2a .types import SendTaskResponse , GetTaskResponse , SendTaskSuccessResponse , Task , TaskState
4
+ from a2a .types import (
5
+ SendTaskResponse ,
6
+ GetTaskResponse ,
7
+ SendTaskSuccessResponse ,
8
+ Task ,
9
+ TaskState ,
10
+ )
11
+ import httpx
5
12
6
13
AGENT_URL = 'http://localhost:10000'
7
14
8
- def create_send_task_payload (task_id : str , text : str , session_id : str | None = None ) -> dict [str , Any ]:
15
+
16
+ def create_send_task_payload (
17
+ task_id : str , text : str , session_id : str | None = None
18
+ ) -> dict [str , Any ]:
9
19
"""Helper function to create the payload for sending a task."""
10
20
payload : dict [str , Any ] = {
11
21
'id' : task_id ,
@@ -18,81 +28,106 @@ def create_send_task_payload(task_id: str, text: str, session_id: str | None = N
18
28
payload ['sessionId' ] = session_id
19
29
return payload
20
30
31
+
21
32
def print_json_response (response : Any , description : str ) -> None :
22
33
"""Helper function to print the JSON representation of a response."""
23
- print (f" --- { description } ---" )
34
+ print (f' --- { description } ---' )
24
35
if hasattr (response , 'root' ):
25
- print (f" { response .root .model_dump_json ()} \n " )
36
+ print (f' { response .root .model_dump_json ()} \n ' )
26
37
else :
27
- print (f"{ response .model_dump ()} \n " )
38
+ print (f'{ response .model_dump ()} \n ' )
39
+
28
40
29
41
async def run_single_turn_test (client : A2AClient ) -> str :
30
42
"""Runs a single-turn non-streaming test."""
31
43
task_id : str = uuid4 ().hex
32
- send_payload = create_send_task_payload (task_id , 'how much is 10 USD in CAD?' )
44
+ send_payload = create_send_task_payload (
45
+ task_id , 'how much is 10 USD in CAD?'
46
+ )
33
47
# Send Task
34
- send_response : SendTaskResponse = await client .send_task (payload = send_payload )
35
- print_json_response (send_response , "Single Turn Request Response" )
48
+ send_response : SendTaskResponse = await client .send_task (
49
+ payload = send_payload
50
+ )
51
+ print_json_response (send_response , 'Single Turn Request Response' )
36
52
37
- print (" ---Query Task---" )
53
+ print (' ---Query Task---' )
38
54
# query the task
39
55
task_id_payload = {'id' : task_id }
40
- get_response : GetTaskResponse = await client .get_task (payload = task_id_payload )
41
- print_json_response (get_response , "Query Task Response" )
42
- return task_id # Return task_id in case it's needed, though not used here
56
+ get_response : GetTaskResponse = await client .get_task (
57
+ payload = task_id_payload
58
+ )
59
+ print_json_response (get_response , 'Query Task Response' )
60
+ return task_id # Return task_id in case it's needed, though not used here
43
61
44
62
45
63
async def run_streaming_test (client : A2AClient ) -> None :
46
64
"""Runs a single-turn streaming test."""
47
65
task_id : str = uuid4 ().hex
48
- send_payload = create_send_task_payload (task_id , 'how much is 50 EUR in JPY?' )
66
+ send_payload = create_send_task_payload (
67
+ task_id , 'how much is 50 EUR in JPY?'
68
+ )
49
69
50
- print (" --- Single Turn Streaming Request ---" )
70
+ print (' --- Single Turn Streaming Request ---' )
51
71
stream_response = client .send_task_streaming (payload = send_payload )
52
72
async for chunk in stream_response :
53
- print_json_response (chunk , "Streaming Chunk" )
73
+ print_json_response (chunk , 'Streaming Chunk' )
74
+
54
75
55
76
async def run_multi_turn_test (client : A2AClient ) -> None :
56
77
"""Runs a multi-turn non-streaming test."""
57
- print (" --- Multi-Turn Request ---" )
78
+ print (' --- Multi-Turn Request ---' )
58
79
# --- First Turn ---
59
80
task_id : str = uuid4 ().hex
60
- first_turn_payload = create_send_task_payload (task_id , 'how much is 100 USD?' )
61
- first_turn_response : SendTaskResponse = await client .send_task (payload = first_turn_payload )
62
- print_json_response (first_turn_response , "Multi-Turn: First Turn Response" )
63
-
81
+ first_turn_payload = create_send_task_payload (
82
+ task_id , 'how much is 100 USD?'
83
+ )
84
+ first_turn_response : SendTaskResponse = await client .send_task (
85
+ payload = first_turn_payload
86
+ )
87
+ print_json_response (first_turn_response , 'Multi-Turn: First Turn Response' )
64
88
65
89
session_id : str | None = None
66
90
if isinstance (first_turn_response .root , SendTaskSuccessResponse ):
67
91
task : Task = first_turn_response .root .result
68
- session_id = task .sessionId # Capture session ID
92
+ session_id = task .sessionId # Capture session ID
69
93
70
94
# --- Second Turn (if input required) ---
71
95
if task .status .state == TaskState .input_required and session_id :
72
- print ("--- Multi-Turn: Second Turn (Input Required) ---" )
73
- second_turn_payload = create_send_task_payload (task_id , 'in GBP' , session_id )
74
- second_turn_response = await client .send_task (payload = second_turn_payload )
75
- print_json_response (second_turn_response , "Multi-Turn: Second Turn Response" )
96
+ print ('--- Multi-Turn: Second Turn (Input Required) ---' )
97
+ second_turn_payload = create_send_task_payload (
98
+ task_id , 'in GBP' , session_id
99
+ )
100
+ second_turn_response = await client .send_task (
101
+ payload = second_turn_payload
102
+ )
103
+ print_json_response (
104
+ second_turn_response , 'Multi-Turn: Second Turn Response'
105
+ )
76
106
elif not session_id :
77
- print (" Warning: Could not get session ID from first turn response." )
107
+ print (' Warning: Could not get session ID from first turn response.' )
78
108
else :
79
- print ("First turn completed, no further input required for this test case." )
109
+ print (
110
+ 'First turn completed, no further input required for this test case.'
111
+ )
80
112
81
113
82
114
async def main () -> None :
83
115
"""Main function to run the tests."""
84
- print (f" Connecting to agent at { AGENT_URL } ..." )
116
+ print (f' Connecting to agent at { AGENT_URL } ...' )
85
117
try :
86
- client = await A2AClient .get_client_from_agent_card_url (AGENT_URL )
87
- print ("Connection successful." )
118
+ async with httpx .AsyncClient () as httpx_client :
119
+ client = await A2AClient .get_client_from_agent_card_url (
120
+ httpx_client , AGENT_URL
121
+ )
122
+ print ('Connection successful.' )
88
123
89
- await run_single_turn_test (client )
90
- await run_streaming_test (client )
91
- await run_multi_turn_test (client )
124
+ await run_single_turn_test (client )
125
+ await run_streaming_test (client )
126
+ await run_multi_turn_test (client )
92
127
93
128
except Exception as e :
94
- print (f" An error occurred: { e } " )
95
- print (" Ensure the agent server is running." )
129
+ print (f' An error occurred: { e } ' )
130
+ print (' Ensure the agent server is running.' )
96
131
97
132
98
133
if __name__ == '__main__' :
0 commit comments