코드 저장소.

일정추천 기능 고도화 - 일정추천 챗봇으로 고도화 본문

포폴/일정관리 프로젝트

일정추천 기능 고도화 - 일정추천 챗봇으로 고도화

slown 2026. 4. 27. 19:33

목차

1.고도화 이유

2.변경된 구조

3.후기

 

1.고도화 이유

지난 1편에서는 AWS LightSail 2GB VM이라는 제한된 인프라 환경에서 OpenAI API와의 연동을 위해 OpenFeign(블로킹)을 WebClient(논블로킹)로 전면 전환하는 뼈대 작업을 진행했었다. 당시 JMeter를 활용해 메인 인프라의 체력을 테스트했을 때, 일반 일정 생성(CRUD) API의 경우 동시 사용자 100명(100VU)까지 에러율 0%로 안정적으로 버텨내는 성능 방어선을 확인했었다.

우리 서버 자체의 일반적인 CRUD 처리 능력은 검증을 마쳤지만, OpenAI API를 직접 타고 들어오는 AI 일정 추천/챗봇 기능을 실제 서비스 운영 환경 관점으로 들이받았을 때는 여전히 해결해야 할 두 가지 한계가 찝찝하게 만들었습니다.

 

1-1. 사용자 경험(UX)의 한계: 멈춰있는 로딩 화면

기존 구조는 OpenAI가 긴 추천 일정 문장을 모두 구워낼 때까지 Mono 체인 내부에서 응답을 대기시켰다가 한방에 뱉어주는 방식이었습니다. 일반 일정 생성은 밀리초(ms) 단위로 끝나서 100VU도 짱짱하게 버티지만, AI 추천 응답은 전체 문장이 완성되기까지 수 초 이상 소요됩니다. 유저는 그 긴 시간 동안 멈춰있는 화면(로딩 스피너)을 보며 답답함을 느껴야 했습니다. ChatGPT처럼 AI가 글자를 실시간으로 타이핑하듯 출력하는 스트리밍(Streaming) 구조로의 변경을 하기로 했습니다.

1-2. 성능 및 결합도의 한계: 메인 비즈니스를 붙잡는 무거운 연산들

AI가 챗봇 응답을 완료하면 후처리로 두 가지 무거운 연산이 돌게 됩니다. 다음 대화의 맥락 유지를 위해 대화 이력을 데이터베이스에 영구 적재(MySQL)해야 하고, 유저가 주로 반응하는 시간대나 키워드를 추출하는 사용자 패턴 분석 연산(Redis 누적)을 수행해야 합니다. 일반 일정 생성처럼 가벼운 쿼리가 아니기 때문에, 이 무거운 후처리 로직들을 메인 응답 스레드선에 그대로 묶어두면 유저가 실시간 챗봇 응답을 받아보는 속도 자체가 뒤로 밀리게 되는 문제가 발생을 합니다.

 

이러한 이유로 외부 API 통신 계층과 우리 내부 인프라(DB/분석) 계층의 의존성을 완전히 비동기로 분리해야만 했습니다.

2.변경된 구조

이 문제를 해결하기 위해 Next.js 클라이언트와 WebSocket(STOMP) 커넥션을 뚫고, 메인 응답선은 OpenAI SSE 스트리밍 데이터를 WebFlux Flux로 받아 웹소켓(WebSocket STOMP) 채널로 유저에게 실시간 포워딩하고, Resilience4j 서킷 브레이커로 튜닝했으며, 후처리 영역은 Transactional Outbox + Kafka 멀티 컨슈머 파이프라인으로 결합도를 완벽히 해제했습니다. 아래는 이번에 변경된 일정추천 챗봇의 흐름입니다.

 

 

2-1. Flux<String> 스트리밍과 서킷 브레이커의 조화 (OpenAiWebClient)

오픈AI가 뱉어내는 텍스트 토큰(TEXT_EVENT_STREAM)을 Flux 리액티브 타입으로 받아 실시간 파싱합니다. 여기에 CircuitBreakerOperator를 바인딩하여, 스트리밍 도중 발생하는 네트워크 장애까지 서킷 브레이커가 완벽하게 추적하도록 했습니다.

 

public Flux<String> streamChatCompletion(OpenAiRequest request) {
    Flux<String> chatFlux = openAiWebClient.post()
            .uri("/chat/completions")
            .header("Authorization", "Bearer " + apiKey)
            .contentType(MediaType.APPLICATION_JSON)
            .accept(MediaType.TEXT_EVENT_STREAM) // SSE 스트리밍 선언
            .bodyValue(request)
            .retrieve()
            .bodyToFlux(String.class)
            .filter(data -> !data.isBlank() && !data.equals("[DONE]"))
            .mapNotNull(this::extractToken);

    // Flux 리액티브 스트림 전체에 서킷 브레이커 결합
    return chatFlux
            .transformDeferred(CircuitBreakerOperator.of(circuitBreaker)) 
            .onErrorResume(e -> {
                log.error("[OpenAI 서킷 오픈 또는 에러 발생] Fallback 작동. 사유: {}", e.getMessage());
                return Flux.just("죄송합니다. 현재 AI 연결이 원활하지 않습니다.");
            });
}

 

2-2. 메인 응답선과 비동기 파이프라인의 격리 (ChatBotService)

유저에게 실시간으로 토큰을 쏴주는 흐름(clientStream)을 방해하지 않으면서, 뒤로는 전체 응답 문자열을 취합(collect(Collectors.joining()))하여 Outbox 테이블에 이벤트를 적재하는 파이프라인을 구축했습니다. 혹시 모를 Outbox 테이블 병목이 메인 흐름을 붙잡지 않도록 5초 타임아웃을 설정해 유저 응답을 최우선으로 보호했습니다.

 

public Flux<String> streamChat(Long memberId, String userMessage) {
    Mono<List<ChatMessage>> historyMono = Mono.fromCallable(() -> cacheService.getChatHistory(memberId)).subscribeOn(Schedulers.boundedElastic());
    Mono<List<SchedulesModel>> schedulesMono = Mono.fromCallable(() -> scheduleRepositoryPort.findAllByMemberId(memberId, Pageable.ofSize(5)).getContent()).subscribeOn(Schedulers.boundedElastic());

    return Mono.zip(historyMono, schedulesMono)
            .flatMapMany(tuple -> {
                OpenAiRequest request = buildChatRequest(tuple.getT1(), tuple.getT2(), userMessage);
                return openAiWebClient.streamChatCompletion(request)
                        .publish(sharedFlux -> {
                            Flux<String> clientStream = sharedFlux; // 실시간 반환선
                            
                            // 데이터 저장은 메인 흐름과 격리하여 Outbox 파이프라인으로 위임
                            Mono<Void> saveOutboxMono = sharedFlux
                                    .collect(Collectors.joining())
                                    .flatMap(fullResponse -> Mono.fromRunnable(() -> {
                                        ChatCompletedEvent event = ChatCompletedEvent.builder()
                                                .memberId(memberId).userMessage(userMessage)
                                                .assistantResponse(fullResponse).createdAt(LocalDateTime.now()).build();
                                        chatEventPort.publish(event); // Outbox 테이블 적재 트리거
                                    }).subscribeOn(Schedulers.boundedElastic()))
                                    .timeout(Duration.ofSeconds(5)) // 5초 타임아웃 가드
                                    .onErrorResume(e -> {
                                        log.error("[Outbox 저장 실패] 유저 응답은 보호합니다. 사유: {}", e.getMessage());
                                        return Mono.empty();
                                    }).then();
                                    
                            return clientStream.mergeWith(saveOutboxMono.thenMany(Flux.empty()));
                        });
            });
}

 

2-3. Transactional Outbox + Kafka 멀티 컨슈머 아키텍처

Outbox 테이블에 들어간 이벤트는 ShedLock 가드가 적용된 3초 주기 폴링 스케줄러에 의해 안전하게 퍼 올려져 Kafka chat-history 토픽으로 발행됩니다. 이 토픽을 두고 이력 저장 컨슈머(chat-history-save)와 패턴 분석 컨슈머(pattern-analysis)가 각자 독립된 groupId로 이벤트를 동시에 소비합니다. 향후 통계나 알림 요구사항이 새로 추가되더라도 기존 메인 코드는 단 한 줄도 건드리지 않는 완벽한 OCP(개방-폐쇄 원칙)를 달성했습니다.

 

1) 대화 이력 영구 적재 및 최신 컨텍스트 동기화 (ChatHistorySaveConsumer)

 

첫 번째 컨슈머는 이벤트를 가져와 데이터 유실을 방지하기 위한 At-least-once(수동 커밋) 전략을 펼칩니다. 비즈니스가 완전히 성공해야 오프셋을 커밋(ack.acknowledge())하며, processedEventService를 통한 eventId 기반 멱등성 검증으로 중복 저장을 차단합니다. 이후 MySQL 영구 저장과 차기 대화 맥락 유지를 위한 Redis 갱신을 동시에 수행합니다.

 

@KafkaListener(topics = "chat-history", groupId = "chat-history-save", containerFactory = "chatKafkaListenerFactory")
@Override
@Transactional
public void handle(ChatCompletedEvent event, Acknowledgment ack) {
    if (processedEventService.isAlreadyProcessed(event.getEventId())) {
        log.info("⚠️ 이미 처리된 이벤트 (Skip): {}", event.getEventId());
        ack.acknowledge(); // 멱등성 가드: 중복은 커밋 후 넘김
        return;
    }
    try {
        KafkaMDCUtil.initMDC(event);
        processedEventService.saveProcessedEvent(event.getEventId());
        
        // MySQL 영구 저장 및 다음 대화용 Redis 최신 이력 갱신
        chatHistoryRepository.save(ChatHistoryModel.builder().memberId(event.getMemberId()).userMessage(event.getUserMessage()).assistantResponse(event.getAssistantResponse()).createdAt(event.getCreatedAt()).build());
        cacheService.appendChatMessage(event.getMemberId(), ChatMessage.builder().role("user").content(event.getUserMessage()).createdAt(event.getCreatedAt()).build());
        cacheService.appendChatMessage(event.getMemberId(), ChatMessage.builder().role("assistant").content(event.getAssistantResponse()).createdAt(event.getCreatedAt()).build());

        ack.acknowledge(); // 수동 오프셋 커밋 보장
    } catch (Exception e) {
        log.error("[ChatHistorySaveConsumer] 실패: {}", e.getMessage(), e);
        throw e; // 에러 시 완충 구역(DLQ)으로 격리 이송
    } finally { KafkaMDCUtil.clear(); }
}

 

 

2) 사용자 행동 분석 및 선호 통계 누적 (PatternAnalysisConsume)

두 번째 컨슈머는 동일한 이벤트를 완전히 독립된 스레드 영역(groupId = "pattern-analysis")으로 긁어옵니다. 유저가 언제 챗봇과 대화했는지 선호 시간대를 판단하고, 자연어 메시지 내에 포함된 특정 키워드(운동, 공부 등) 분포도를 Redis의 HINCRBY(increment) 연산으로 비동기 누적합니다.이때 메인 로직에는 어떠한 사이드 이펙트도 주지 않으며, 추후 AI 통계 알림 요건이 추가되더라도 이 그룹 ID를 가진 컨슈머만 새로 개통하면 되므로 완벽한 OCP(개방-폐쇄 원칙)가 실현됩니다.

 

@KafkaListener(topics = "chat-history", groupId = "pattern-analysis", containerFactory = "chatKafkaListenerFactory")
@Override
@Transactional
public void handle(ChatCompletedEvent event, Acknowledgment ack) {
    if (processedEventService.isAlreadyProcessed(event.getEventId())) { ack.acknowledge(); return; }
    try {
        KafkaMDCUtil.initMDC(event);
        processedEventService.saveProcessedEvent(event.getEventId());

        // 핵심 통계 분석 로직 격리 작동
        analyzeTimePreference(event.getMemberId(), event.getCreatedAt()); // morning/afternoon 빈도 누적
        analyzeMessageContext(event.getMemberId(), event.getUserMessage()); // 대화 키워드 관심사 추출 (HINCRBY)

        ack.acknowledge();
    } catch (Exception e) { log.error("[ChatPatternAnalysisConsumer] 실패: {}", e.getMessage(), e); throw e; }
    finally { KafkaMDCUtil.clear(); }
}

 

3.후기

 

WebClient 전환이 단순한 '자원 효율성 확보와 단일 API 성능 개척'이었다면, 이번 3차 고도화는 "통제할 수 없는 외부 장애로부터 내 핵심 도메인을 완벽히 격리하고 시스템을 어떻게 생존시킬 것인가"에 대한 치열한 고민의 결과물이었습니다. 비동기 분산 시스템을 구축하면서 백엔드 엔지니어로서 얻은 핵심 인사이트는 다음과 같습니다.

  • 진정한 비동기는 흐름의 격리에서 온다
    • 일반적인 일정 생성이 잘 나온다고 방심하면 안 된다는 것을 배웠다. 외부 API 통신과 내부 저장 로직을 Outbox 테이블과 Kafka로 완전히 분리해 내면서, 메인 비즈니스 스레드는 외부 장애나 DB 병목에 전혀 영향을 받지 않는 탄탄한 인프라 체력을 갖추게 되었습니다.
  • OCP(개방-폐쇄 원칙)의 실전 적용
    • Kafka 멀티 컨슈머 그룹 아키텍처를 도입해 보니, 새로운 분석 요건이나 부가 기능이 추가되어도 기존 챗봇 핵심 로직을 전혀 건드리지 않아도 된다는 분산 인프라의 막강한 확장성을 체감했습니다.
  • 아직 남아있는 실전 검증 과제
    • 코드와 아키텍처 구조는 무결하게 심어두었지만, 상용 프로덕션 환경에 나가기 전 인위적인 카오스 주입(네트워크 단절 및 에러 레이트 조작)을 통해 @CircuitBreaker AOP 프록시가 리액티브 컨텍스트의 실행 시점 예외를 정확히 낚아채는지, CircuitBreakerOperator가 스트리밍 도중 끊기는 장애 윈도우 슬라이딩 카운트를 정상적으로 인지하는지 끝까지 부하 테스트로 검증해 볼 계획입니다.