A2A 프로토콜 서버/클라이언트 구현 가이드

개요

A2A(Agent-to-Agent) 프로토콜은 에이전트 간 상호작용을 위해 설계된 JSON-RPC 기반 통신 프로토콜입니다. 이 문서는 A2A 프로토콜을 준수하는 서버 및 클라이언트 컴포넌트를 개발하기 위한 실용적인 지침을 제공합니다.

프로토콜 기초

A2A 프로토콜은 JSON-RPC 2.0 사양을 기반으로 하며, 에이전트 간 통신을 위한 메서드 집합을 정의합니다.

메시지 구조

모든 A2A 메시지는 JSON-RPC 2.0 형식을 따르며, 기본 구조는 다음과 같습니다:

interface RpcMessage {
  jsonrpc?: "2.0";
  id?: number | string | null;
  method?: string;
  params?: unknown;
  result?: unknown;
  error?: RpcError;
}

핵심 메서드

프로토콜은 다음과 같은 주요 메서드를 지원합니다:

  • tasks/send : 에이전트에 작업 메시지 전송
  • tasks/get : 작업 상태 조회
  • tasks/cancel : 실행 중인 작업 취소
  • tasks/pushNotification/set : 작업 푸시 알림 설정
  • tasks/pushNotification/get : 푸시 알림 설정 조회
  • tasks/resubscribe : 작업 업데이트 재구독
  • tasks/sendSubscribe : 작업 메시지 전송 및 업데이트 구독

작업 상태

작업은 다음 상태 중 하나를 가집니다:

  • submitted : 제출됨
  • working : 처리 중
  • input-required : 입력 필요
  • completed : 완료됨
  • canceled : 취소됨
  • failed : 실패
  • unknown : 알 수 없음

서버 구현

핵심 컴포넌트

서버 구현은 다음 컴포넌트로 구성됩니다:

  1. 서버 (server.ts) : HTTP 요청을 처리하는 메인 서버
  2. 핸들러 (handler.ts) : A2A 메시지를 처리하는 요청 핸들러
  3. 저장소 (store.ts) : 작업 데이터 저장 및 관리
  4. 유틸리티 (utils.ts) : 공통 유틸리티 함수
  5. 에러 처리 (error.ts) : 에러 정의 및 처리 로직

기본 사용 예시

import {
  A2AServer,
  InMemoryTaskStore,
  TaskContext,
  TaskYieldUpdate,
} from "./index";

// 1. TaskHandler 형태로 에이전트 로직 정의
async function* myAgentLogic(
  context: TaskContext
): AsyncGenerator<TaskYieldUpdate> {
  console.log(`작업 처리 중: ${context.task.id}`);
  yield {
    state: "working",
    message: { role: "agent", parts: [{ text: "처리 중..." }] },
  };

  // 작업 시뮬레이션
  await new Promise((resolve) => setTimeout(resolve, 1000));

  if (context.isCancelled()) {
    console.log("작업 취소됨!");
    yield { state: "canceled" };
    return;
  }

  // 결과 산출물 생성
  yield {
    name: "result.txt",
    mimeType: "text/plain",
    parts: [{ text: `작업 ${context.task.id} 완료.` }],
  };

  // 최종 상태 전달
  yield {
    state: "completed",
    message: { role: "agent", parts: [{ text: "완료!" }] },
  };
}

// 2. 서버 생성 및 시작
const store = new InMemoryTaskStore(); // 또는 new FileStore()
const server = new A2AServer(myAgentLogic, { taskStore: store });

server.start(); // 기본 포트 41241에서 수신 시작
console.log("A2A 서버 시작됨.");

클라이언트 구현

주요 특징

  • JSON-RPC 통신 : JSON-RPC 2.0 사양에 따라 요청 전송 및 응답 수신 처리 (표준 응답 및 SSE 스트리밍 응답 지원)
  • A2A 메서드 : sendTask, sendTaskSubscribe, getTask, cancelTask, setTaskPushNotification, getTaskPushNotification, resubscribeTask 구현
  • 에러 처리 : 네트워크 문제 및 JSON-RPC 에러에 대한 기본 처리 제공
  • 스트림 지원 : SSE(Server-Sent Events)를 통한 실시간 작업 업데이트 처리 (sendTaskSubscribe, resubscribeTask)
  • 확장성 : Node.js 등 다양한 환경을 위한 커스텀 fetch 구현 지원

기본 사용 예시

import { A2AClient, Task, TaskQueryParams, TaskSendParams } from "./client";
import { v4 as uuidv4 } from "uuid";

const client = new A2AClient("http://localhost:41241");

async function run() {
  try {
    // 간단한 작업 전송
    const taskId = uuidv4();
    const sendParams: TaskSendParams = {
      id: taskId,
      message: { role: "user", parts: [{ text: "안녕, 에이전트!" }] },
    };
    const taskResult: Task | null = await client.sendTask(sendParams);
    console.log("작업 전송 결과:", taskResult);

    // 작업 상태 조회
    const getParams: TaskQueryParams = { id: taskId };
    const getTaskResult: Task | null = await client.getTask(getParams);
    console.log("작업 조회 결과:", getTaskResult);
  } catch (error) {
    console.error("A2A 클라이언트 에러:", error);
  }
}

run();

스트리밍 사용 예시

import {
  A2AClient,
  TaskStatusUpdateEvent,
  TaskArtifactUpdateEvent,
  TaskSendParams,
} from "./client";
import { v4 as uuidv4 } from "uuid";

const client = new A2AClient("http://localhost:41241");

async function streamTask() {
  const streamingTaskId = uuidv4();
  try {
    console.log(`\n--- 스트리밍 작업 시작: ${streamingTaskId} ---`);
    const streamParams: TaskSendParams = {
      id: streamingTaskId,
      message: { role: "user", parts: [{ text: "업데이트를 스트리밍해 주세요!" }] },
    };
    const stream = client.sendTaskSubscribe(streamParams);

    for await (const event of stream) {
      if ("status" in event) {
        const statusEvent = event as TaskStatusUpdateEvent;
        console.log(
          `[${streamingTaskId}] 상태 업데이트: ${statusEvent.status.state} - ${
            statusEvent.status.message?.parts[0]?.text ?? "메시지 없음"
          }`
        );
        if (statusEvent.final) {
          console.log(`[${streamingTaskId}] 스트림 종료 신호 수신.`);
          break;
        }
      } else if ("artifact" in event) {
        const artifactEvent = event as TaskArtifactUpdateEvent;
        console.log(
          `[${streamingTaskId}] 산출물 업데이트: ${
            artifactEvent.artifact.name ?? `인덱스 ${artifactEvent.artifact.index}`
          } - 파트 수: ${artifactEvent.artifact.parts.length}`
        );
      } else {
        console.warn("알 수 없는 이벤트 구조:", event);
      }
    }
    console.log(`--- 스트리밍 작업 ${streamingTaskId} 완료 ---`);
  } catch (error) {
    console.error(`스트리밍 작업 ${streamingTaskId} 에러:`, error);
  }
}

streamTask();

에러 처리

A2A 프로토콜은 다음과 같은 표준 에러 코드를 정의합니다:

  • -32700 : 파싱 에러
  • -32600 : 잘못된 요청
  • -32601 : 메서드를 찾을 수 없음
  • -32602 : 잘못된 파라미터
  • -32603 : 내부 에러
  • -32000 : 작업을 찾을 수 없음
  • -32001 : 작업을 취소할 수 없음
  • -32002 : 푸시 알림 미지원
  • -32003 : 지원되지 않는 작업

모범 사례

  1. 에러 처리 : 모든 A2A 메서드에 대해 적절한 에러 처리를 구현하세요.
  2. 인증 : 안전한 통신을 위해 적절한 인증 메커니즘을 도입하세요.
  3. 작업 관리 : 작업 상태 관리와 정리 로직을 철저히 유지하세요.
  4. 푸시 알림 : 지원 가능한 경우 푸시 알림을 구현하여 실시간 업데이트를 제공하세요.
  5. 로깅 : 디버깅 및 모니터링을 위해 포괄적인 로깅을 구현하세요.

참고 자료

  • JSON-RPC 2.0 명세
  • A2A 프로토콜 패턴
  • 서버 구현 예제
  • 클라이언트 구현 예제
  • Coder 에이전트 구현 예제

태그: A2A json-rpc Agent-to-Agent TypeScript Node.js

5월 24일 16:39에 게시됨