계기
진행중인 프로젝트에서 AWS Lambda는 ChatGPT API 를 호출하는 핵심적인 역할을 담당한다. 사용자 요청을 받아 ChatGPT API를 호출하고, 그 결과를 분석해 반환하게 되는데, 이 기능은 프로젝트의 핵심 서비스인만큼 사용자들이 주로 사용하는 기능으로 가장 많은 요청을 받을 것으로 예상된다. 이 때문에 Lambda 함수가 요청을 처리하는 동안 대기해야하고 이 과정에서 문제점이 발생할 수 있다.
AWS Lambda는 여러 요청이 들어오면 자동으로 실행 환경 수를 조정해 동시성 한도까지 스케일링을 한다. 그러나 이 동시성에는 제한이 있는데 AWS 리전 내에서 최대 1,000개의 동시 실행 환경을 제공하지만 이 한도를 초과한다면 새로운 요청이 거부되거나 대기 시간이 발생할 수 있다.
프로젝트가 성공적으로 진행되어 많은 사용자들이 동시에 이 API 를 호출한다고 가정하면, 동시성의 한도를 고려하여야 한다.
또한, 시스템에 장애나 오류가 발생했을 때의 데이터 손실 위험도 있다.
요청은 전달됐으나 Lambda 가 해당 요청을 처리하다가 실패한다면 데이터가 손실될 가능성이 있고 이는 프로젝트의 주된 서비스에서 적절한 결과를 받지 못해 사용자에게 아주 큰 불편을 초래할 수 있다.
왜 SQS?
이러한 문제들을 예방하기 위해 AWS SQS 를 적용하기로 했다.
SQS 는 메시지를 큐에 저장하고, Lambda 함수가 이를 처리할 수 있도록 연결해주는 서비스이다.
SQS 는 요청이 들어오면 이를 큐에 저장한 후, Lambda 함수가 처리 가능할 때에 메시지를 전달한다. 이 때문에 Lambda 함수가 동시성 한도에 도달하지 않도록 해서, 트래픽이 급증하더라도 안정적으로 요청을 처리할 수 있게 한다.
이는 결국 Lambda 가 처리할 수 있는 속도로 메시지를 전달하기 때문에 Lmabda 의 스케일링 지연 문제를 방지할 수 있다.
또한, SQS 는 Lambda 에 메시지를 전달했더라도 Lambda 가 해당 요청을 처리하지 못했을 때, 해당 메시지(요청)를 삭제하지 않고 큐에 대기 상태로 남겨두기 때문에 이후에 재시도할 수 있다. 이는 데이터 손실을 방지하고, Lambda 또는, GPT API 에 장애 발생 시에도 안정적으로 복구될 수 있도록 할 수 있다.
추가적으로, SQS 는 작업의 우선순위를 조정할 수 있는 기능도 있어 메시지 처리 순서를 보장할 수 있다.
SQS 적용
1 대기열 생성
AWS Simple Queue Service 에서 새로운 대기열을 생성한다.
2 대기열 유형
대기열 유형을 선택한다.
표준 대기열
- 순서 비보장: 순서가 중요한 것이 아닌 대량의 메시지를 처리하는 경우에 적합 (로그 처리, 대규모 데이터 처리 등)
- 중복 가능성: 메시지 중복 처리에 대해 서비스 자체적으로 대응할 수 있는 경우 적합
- 높은 처리량: 초당 수천 건의 메시지 처리 가능
FIFO
- 순서 보장: 메시지가 전송된 순서대로 정확히 전달될 것을 보장
- 중복 없음: 메시지가 반드시 한 번만 처리되어야 하는 경우에 적합
- 제한된 처리량: 표준 대기열에 비해 상대적으로 처리량이 낮은데 이는 순서를 유지하고 중복을 방지하기 위함
3 구성
표시 제한 시간: 연결된 곳 (Lambda 함수 등)에 전달되면 다른 곳에서는 그 메시지를 처리하지 못하도록 잠그는 시간. 너무 짧으면 메시지가 중복 처리될 수 있고, 너무 길면 메시지 처리 지연이 발생할 수 있다.
전송 지연: 메시지가 SQS 큐에 도착한 후, 지정된 시간 동안 대기했다가 전달되도록 하는 시간. 메시지 처리를 지연시키고 싶을 때. 0초면 즉시 처리됨
메시지 수신 대기 시간: 큐가 비어있을 경우 새로운 메시지가 도착할 때까지 기다리는 최대 시간. 0일 경우 메시지가 없을 땐 즉시 반환됨. 이 시간을 늘리면 새로운 메시지가 도착할 때까지 대기함으로써 생기는 불필요한 요청을 줄일 수 있음
메시지 보존 기간: 큐에 저장된 메시지가 삭제되지 않고 유지되는 최대 시간
-> 메시지 보존 기간 동안 재시도가 이루어진다.
최대 메시지 크기: 최대 256 KB
현재 람다함수의 타임아웃을 60초, SQS 의 표시 제한 시간을 70초로 설정해놓았다.
람다 함수 실패 + 람다 함수 호출 시작으로부터 70초가 경과하면 메시지가 다시 전송되고
이 과정에 메시지 보존 기간동안 반복된다.
람다 함수가 실패하는 경우는
프롬프트 결과의 파싱 에러가 대부분을 차지하고 그 외에는 단순 로직 오류 였다.
파싱 에러의 경우 재시도 하면 성공할 가능성이 있고 결과를 받아내는 것이 가장 중요하기 때문에 계속 요청을 넣는 것으로 하였고
단순 로직 오류의 경우도 마찬가지로 로그 그룹에서 확인해서 오류를 수정하는 작업을 한다.
4 암호화 설정
비활성화: 메시지가 평문으로 저장됨
SSE-SQS: SQS 에서 자동 생성, 관리 암호화. 사용자의 별도 설정 없이 가능
SSE-KMS: 보다 높은 수준의 보안 관리와 유연성 제공
5 액세스 정책 설정
6 Lambda 트리거 설정
SQS 생성 후, Lambda 트리거 탭에서 새로운 구성을 만든다.
트리거가 될 Lambda 함수를 지정해서 저장
코드에서 SQS 호출하기 (NestJS && SDK)
npm install aws-sdk
aws-sdk 설치
@Injectable()
export class SqsService {
private sqs: SQS;
private queueUrl: string;
constructor(private configService: ConfigService, private prisma: PrismaService) {
this.sqs = new SQS({
region: this.configService.get<string>('AWS_REGION'),
accessKeyId: this.configService.get<string>('AWS_ACCESS_KEY_ID'),
secretAccessKey: this.configService.get<string>('AWS_SECRET_ACCESS_KEY'),
});
this.queueUrl = this.configService.get<string>('SQS_QUEUE_URL');
}
...
}
SQS 를 이용할 서비스단에서 생성자 로직에 SQS 설정 정보 추가
private async sendMessage(messageBody: any): Promise<void> {
const params = {
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(messageBody),
};
await this.sqs.sendMessage(params).promise();
}
SQS 의 sendMessage 메서드에 인자로 전달한 데이터를 보낸다.
이 때 요청을 보낼 sqs 의 url 과 데이터 본문을 객체로 전달하는데 이 경우에는 Lambda 함수로 트리거되니 lambda_function 에서 필요로 하고 있는 GPT API 에 전달될 프롬프트, 모델 등이 전달된다.
MessageBody 를 JSON 형식으로 변환함으로써 구조화된 데이터를 전송할 수 있다.
람다가 잘 처리되고 있는지
ChatGPT API 플로우
SQS 호출 -> 큐에 쌓이는 메시지 -> Lambda 함수가 트리거되어 작동 -> 메시지 전달 -> GPT API 요청
Lambda 함수 로직 (Python)
# SQS 메시지 처리
for record in event['Records']:
message_body = record['body']
print("Message Body:", message_body)
try:
data = json.loads(message_body) # JSON 문자열 처리
print("Data:", data)
task_id = data['id']
task_type = data.get('type')
table_name = '"GptImageResult"' if task_type == 'image' else '"GptTextResult"'
# 작업 시작 상태 업데이트
with conn.cursor() as cursor:
cursor.execute(
f"UPDATE {table_name} SET status = %s WHERE id = %s",
('IN_PROGRESS', task_id)
)
conn.commit()
...
메시지를 처리하기 시작할 때, 세션 생성된 DB 에 'IN PROGRESS' 로 상태를 변경한다.
이후 GPT ChatCompletion 또는 Image 에서 create 메서드를 통해 요청을 보낸 뒤
with conn.cursor() as cursor:
cursor.execute(
f"UPDATE {table_name} SET result = %s, status = %s WHERE id = %s",
(result, 'COMPLETED', task_id)
)
conn.commit()
print("Response:", result_value)
except Exception as e:
print("Error processing message:", e)
with conn.cursor() as cursor:
cursor.execute(
f"UPDATE {table_name} SET status = %s WHERE id = %s",
('FAILED', task_id)
)
conn.commit()
정상적으로 response 를 받으면 DB 에 결과를 업데이트 후'COMPLETED' 상태 변경, 실패하면 'FAILED' 로 상태 변경
모니터링
람다 함수 구성 후 람다 함수 콘솔에서 모니터링할 수 있다. -> CloudWatch Logs
각각의 로그 파일에서 Lambda 함수 로그를 확인할 수 있다.
AWS 의 가이드에 따르면 일반적으로 Lambda 의 제한 사항에 대해 걱정할 필요가 없을 정도로 스케일 속도는 대부분의 사용 사례에 충분하다고 한다. (10초당 1,000개의 실행 환경 인스턴스)
그러나 AWS 서비스에 들어오는 요청을 비동기적으로 처리하는 방법을 체험할 수 있었고 CS 자료구조 학습할 때만 접했던 , FIFO 등의 개념도 직접 접할 수 있어서 좋았다. 그리고 ChatGPT 를 활용하면서 급증할 수 있는 트래픽에 대비해 메시지 큐를 사용해 트래픽을 조절하는 방법이 있다는 것을 알게 됐고 AWS 서비스 간의 연계작업을 경험해본 것도 좋은 일이다.
SQS 실패 시 다른 큐로 넘겨서 처리할 수 있다고 하는 DLQ 를 적용해보았는데
보존 기간동안 계속 요청들어가는 점 + 기존 SQS 와 DLQ 둘 다 동일한 람다 함수와 연결해놓았다는 점이
하나의 SQS 를 쓰는 것과 어떤 차이가 있는지 파악하지 못해 일단 보류해두었다.
람다 자체에서 재시도를 시도하는 부분도 있긴 하지만 람다의 인스턴스 관련 문제가 아니더라도 예상하지 못한 오류로 실행이 불가한 상황이 발생했을 때 SQS 로 관리할 수 있다는 점이 의미가 큰 것 같다.
참고 : https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/lambda-concurrency.html
https://devblog.kakaostyle.com/ko/2017-05-13-1-aws-serverless-1/