1. 개요

애니메이션이나 드라마를 보실 때 내가 몇 화까지 본지는 기억이 안 나지만 특정 장면만 떠오를 때가 있습니다. 이 문제를 해결하기 위해 사용자가 원하는 장면이 나온 회차를 쉽게 찾을 수 있는 검색 시스템을 만들어 보려 합니다.

그래서 이번 글의 목적은 실제 서비스에서도 써 볼 수 있는 애니메이션 장면 검색 RAG 시스템을 구축하는 것입니다. 다음 챕터들로 만들어봅시다.

2. 멀티모달 임베딩

멀티모달 임베딩 공간(Multimodal Embedding Space) 은 텍스트, 이미지, 동영상 프레임 등 서로 다른 형태의 데이터(모달리티)를 하나의 통일된 다차원 벡터 공간에 매핑하는 기술입니다.

멀티모달은 다양한 리소스를 의미하고 임베딩은 그 데이터를 의미가 보존된 저차원 벡터로 변환하는 과정을 말합니다. 벡터로 바꾸는 이유는 수학적으로 계산할 때 사용하기 위함이죠.

각종 미디어를 동일한 형태의 값으로 변환하기 때문에 교차 모달리티 검색이 가능하며, 다음과 같은 동작을 수행할 수 있습니다.

  • 텍스트로 이미지 검색
  • 이미지로 유사 영상 찾기
  • 영상 프레임으로 상황 인식

멀티모달 임베딩을 하기 위해서는 Contrastive Learning으로 비슷한 리소스끼리는 가깝게, 먼 것들은 멀게 배치하도록 모델을 최적화합니다. 이를 통해 모델은 데이터의 의미를 수치화된 거리로 파악하게 되며, 이렇게 거리 값을 비교하는 공간이 바로 Shared Embedding Space입니다.

이 공간에서는 서로 다른 모달리티의 데이터가 같은 좌표계에 위치하게 되어 수치로 비교할 수 있게 되죠. 하나의 공간에 데이터를 투영함으로써 다양한 동작을 수행할 수 있게 됩니다.

Contrastive Learning

대조 학습은 데이터 간의 유사도와 차이를 비교하며 특징을 학습하는 방식으로, 모델이 라벨 없이도 데이터의 의미를 스스로 파악할 수 있게 해주는 자기 지도 학습(Self-Supervised Learning)에서 주로 활용됩니다. 이런 방식이 아니라면 사람이 일일이 라벨링하며 데이터를 나눠야 했습니다. 예시로 대표적인 데이터셋인 ImageNet은 약 22,000개의 객체를 구분하기 위해 1,400만 장의 이미지와 25,000명 이상의 작업자가 필요했습니다.

핵심 아이디어는 매우 단순합니다.

  • Positive Pair (긍정 쌍): 같은 대상이거나 의미상 유사한 데이터 쌍입니다. 모델은 이들의 거리를 가깝게 만듭니다.
  • Negative Pair (부정 쌍): 서로 다른 대상이거나 관련 없는 데이터 쌍입니다. 모델은 이들의 거리를 멀게 만듭니다.

예를 들어 강아지 사진을 약간 회전시키거나 색감을 바꾼 사진은 원본에 가깝게 배치하고, 고양이 사진처럼 다른 대상의 사진은 멀리 떨어뜨려 놓는 훈련 방식이죠. 이 과정을 통해 모델은 “강아지"라는 객체의 특징을 스스로 깨닫게 됩니다.

이 점을 활용해서, 강아지 사진 한 장이 있다고 가정하면 이 사진을 자르거나, 회전시키거나, 색감을 바꾸는 식으로 여러 개의 변형된 사진을 만들어도 여전히 같은 강아지라는 Positive Pair로 인식하여 거리를 가깝게 합니다. 이로써 모델이 표면적인 변화에 흔들리지 않고 본질을 구별할 수 있게 됩니다.

Static Embedding Space, Shared Embedding Space

임베딩을 통해 데이터를 벡터로 표현하면, 공간 안에서의 상대적 위치를 통해 각 데이터의 의미를 파악할 수 있습니다. 거리와 각도를 통해 코사인 값으로 도출해서 벡터 값 사이의 유사도를 측정합니다.

다만 벡터의 숫자 값만으로는 의미를 직관적으로 이해하기 어렵기 때문에, 보통 좌표 공간상의 위치로 시각화하여 표현하죠. 기존 Word2Vec, GloVe, FastText 모델 등에서 사용된 방식은 공간에서 하나의 단어가 하나의 벡터에 1:1로 고정되는 Static Embedding이었습니다. 그래서 “배"라는 단어를 벡터화한 값이 먹는 배인지, 타는 배인지, 사람의 배인지를 구분할 수 없었죠. embeddings 위 이미지를 보면 3개의 축(dessertness, sandwichness, liquidness)으로 단어 사이의 관계를 표현했죠. 이를 통해 각 단어가 어떤 의미에 가까운지를 3차원 공간상에서 표현할 수 있게 됩니다. 샐러드, 피자, 핫도그, 샌드위치, 보르시(borscht) 등이 어느 쪽에 더 가까운지를 위치로 알 수 있죠.

하지만 이런 방식은 단일 종류의 모달리티(텍스트)에서만 동작할 수 있었습니다.

Shared Embedding Space이 이러한 한계를 개선한 방법입니다.
주로 멀티모달(Multimodal) 모델(ex: OpenAI의 CLIP)에서 중요한 개념으로, 서로 다른 형태의 데이터(이미지, 텍스트 등)가 하나의 공통된 공간에 함께 배치되어 이미지로서의 사과와 텍스트로서의 “사과"가 서로 가까운 위치의 벡터로 표현되도록 합니다. 그래서 데이터의 형태가 달라도 의미를 기준으로 검색할 수 있게 되죠.

CLIP 구조

파인튜닝 없이 바로 사용할 수 있고, 학습 시 보지 못한 키워드에 대해서도 추론이 가능한 모델입니다. 또한 이미지 인코더와 텍스트 인코더의 출력이 같은 Shared Embedding Space 위에 놓이기 때문에, 두 출력을 연결하기 위한 별도의 변환 레이어 없이도 곧바로 유사도를 비교할 수 있습니다.

CLIP은 자연어 지도 학습(Natural Language Supervision)을 통해 시각적 개념을 효율적으로 학습하는 신경망입니다. 인식하고자 하는 시각적 범주의 이름만 제공하면, GPT-2GPT-3의 “zero-shot” 기능처럼 다양한 시각 분류 벤치마크에 그대로 적용할 수 있습니다.

  • 신경망(Neural Network): 인간의 뇌가 정보를 처리하는 방식에서 영감을 받은 인공지능(AI) 및 머신러닝 모델
  • 제로샷(Zero-shot): AI 모델이 학습 과정에서 배우지 않았거나, 한 번도 본 적 없는 새로운 데이터/작업을 별도의 추가 학습(파인튜닝) 없이 바로 처리하는 능력

추후 Anime Search 프로젝트를 구성할 때 새로운 에피소드가 나올 때마다 모델을 재학습하는 건 비용적으로 손해이기 때문에 별도 학습 없이 곧바로 임베딩을 추출할 수 있는 CLIP이 적합합니다. 따라서 이를 베이스로 프로젝트를 구성하려 합니다.

Practice

허깅페이스로 로컬에서 돌려보기

여기서는 Hugging Face 모델 허브에서 CLIP 모델을 불러옵니다. 이 모델의 기능을 “이미지·텍스트 입력을 받아 고정 차원의 Float 배열(Vector)로 바꿔 반환하는 블랙박스 함수”로 정의하고, 함수에 걸리는 시간과 모델별 결과를 비교하며, 텍스트와 이미지의 유사도 측정까지 확인해 봅시다.

다음 코드로 로컬에서 이미지를 모델에 넣어 가장 가까운 텍스트들을 추론할 수 있습니다.

import time

import torch
from PIL import Image
from sentence_transformers import SentenceTransformer, util


def calculate_similarity(img_emb, text_embs):
    """ cosine similarity between two vectors """
    cosine_scores = util.cos_sim(img_emb, text_embs)
    return cosine_scores

class CLIPInferenceService:
    # https://huggingface.co/models 에서 모델 찾기
    def __init__(self, model_name='clip-ViT-L-14'):
        print(f"Loading model '{model_name}' into memory...")
        start_time = time.time()

        # NVIDIA GPU 가속이 가능하면 cuda, 아니면 cpu 사용
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model = SentenceTransformer(model_name, device=self.device)

        print(f"Model loaded in {time.time() - start_time:.2f} seconds on {self.device}")

    def encode_image(self, image_path):
        """ image to vector """
        img = Image.open(image_path)
        start_time = time.time()
        embeddings = self.model.encode(img)
        print(f"Image encoding latency: {(time.time() - start_time)*1000:.2f} ms")
        return embeddings

    def encode_text(self, text_list):
        # text(or list) to vector
        start_time = time.time()
        embeddings = self.model.encode(text_list)
        print(f"Text encoding latency: {(time.time() - start_time)*1000:.2f} ms")
        return embeddings


if __name__ == "__main__":
    clip_service = CLIPInferenceService()
    sample_image_path = "./anime.jpg"

    queries = [
        "비가 내리는 우울한 장면",
        "주인공이 검을 휘두르는 화려한 액션",
        "등장인물들이 웃으며 식사하는 일상",
        "울며 점프하는 엘프를 보는 주인공 일행",
        "침대 위에서 울고 있는 장면",
        "어느 여관 안에 있는 모습"
    ]

    print("\n--- Extracting Embeddings ---")
    img_vec = clip_service.encode_image(sample_image_path)
    text_vec = clip_service.encode_text(queries)

    print(f"\nImage Vector Shape: {img_vec.shape}")
    # Float32, 차원은 모델마다 다름 (예: ViT-L-14는 768, ViT-B-32는 512)

    print("\n--- Cosine Similarity Results ---")
    scores = calculate_similarity(img_vec, text_vec)

    # sort
    results = list(zip(queries, scores[0]))
    results.sort(key=lambda x: x[1].item(), reverse=True)
    for i, (query, score) in enumerate(results):
        print(f"{i + 1}. Score: {score.item():.4f} -> Query: '{query}'")

아래 이미지로 임베딩해 돌려봤습니다. anime.jpg 결과는 다음과 같죠… 생각 이상으로 잘 못 찾는 모습을 보여줍니다. 이미지 해상도와 모델이 이미지를 나누는 패치(픽셀) 단위의 영향 때문일 가능성이 큽니다.

Loading model 'clip-ViT-L-14' into memory...
Model loaded in 7.83 seconds on cpu

--- Extracting Embeddings ---
Image encoding latency: 586.39 ms
Text encoding latency: 155.15 ms

Image Vector Shape: (768,)

--- Cosine Similarity Results ---
1. Score: 0.1655 -> Query: '어느 여관 안에 있는 모습'
2. Score: 0.1540 -> Query: '침대 위에서 울고 있는 장면'
3. Score: 0.1535 -> Query: '울며 점프하는 엘프를 보는 주인공 일행'
4. Score: 0.1477 -> Query: '주인공이 검을 휘두르는 화려한 액션'
5. Score: 0.1404 -> Query: '비가 내리는 우울한 장면'
6. Score: 0.1347 -> Query: '등장인물들이 웃으며 식사하는 일상'

이 표를 참고해 보면 대부분 ‘불일치’ 구간에 가깝게 나오지만, 그래도 가장 부합하는 키워드들이 차례로 1·2·3위를 차지하는 점은 나쁘지 않은 접근이라 느껴졌습니다.

아래는 참고용 표로 이 점수와 의미가 절대적이지 않습니다.

CLIP 점수 (예시)유사도 수준의미
0.3 이상 (높음)매우 높은 일치이미지 내 주요 객체와 속성을 텍스트가 정확히 묘사
0.2 ~ 0.3 (보통)일치이미지의 전반적인 상황을 텍스트가 설명함
0.2 미만 (낮음)연관성 낮음내용이 불일치하거나 관련 없는 내용

정확도가 더 낮은 모델인 clip-ViT-B-32로 바꿔 돌리면 다음과 같은 결과가 나옵니다.
점수는 전반적으로 올라갔지만, 1위 후보가 달라지는 등 전체 순위가 기대와 맞지 않는 부분이 보입니다.

Loading model 'clip-ViT-B-32' into memory...
Model loaded in 7.77 seconds on cpu

--- Extracting Embeddings ---
Image encoding latency: 96.44 ms
Text encoding latency: 76.44 ms

Image Vector Shape: (512,)

--- Cosine Similarity Results ---
1. Score: 0.2219 -> Query: '주인공이 검을 휘두르는 화려한 액션'
2. Score: 0.2109 -> Query: '침대 위에서 울고 있는 장면'
3. Score: 0.2097 -> Query: '어느 여관 안에 있는 모습'
4. Score: 0.1976 -> Query: '비가 내리는 우울한 장면'
5. Score: 0.1974 -> Query: '울며 점프하는 엘프를 보는 주인공 일행'
6. Score: 0.1949 -> Query: '등장인물들이 웃으며 식사하는 일상'

Process finished with exit code 0

이름에 붙은 32, 14는 ViT가 이미지를 나눌 때 쓰는 패치 한 변의 픽셀 수를 뜻합니다. clip-ViT-B-32는 32×32 픽셀 패치, clip-ViT-L-14는 14×14 픽셀 패치라서 같은 해상도에서도 후자가 격자를 더 촘촘히 봅니다. 이미지 크기가 224px(이미지 크기는 모델에서 리사이징함)이고 14px 패치라면 16 x 16 그리드 형태로 분석되는거죠.

점수가 만족스럽지 않았는데요, 이유는 모델이 한국어보다 영어 텍스트 중심으로 학습되었기 때문입니다. clip-ViT-L-14 모델을 유지한 채 영어로 queries를 바꿔 진행해 보면 점수도 오르고 정확도도 유지되는 걸 확인할 수 있었습니다.

다음처럼 queries를 바꿔봅시다. 애매하고 서로 비슷한 쿼리들을 넣어봤습니다.

queries = [
    "A gloomy scene with falling rain", # 비가 내리는 우울한 장면
    "Flashy action of the protagonist swinging a sword", # 주인공이 검을 휘두르는 화려한 액션
    "Characters having a meal and laughing together", # 등장인물들이 웃으며 식사하는 일상
    "The protagonist's party watching an elf jumping while crying", # 울며 점프하는 엘프를 보는 주인공 일행
    "A scene of someone crying on the bed", # 침대 위에서 울고 있는 장면
    "A scene inside an inn", # 어느 여관 안에 있는 모습
    "There are four people inside the inn", # 여관 안에 사람이 4명이 있다
    "There are three people inside the inn", # 여관 안에 사람이 3명이 있다
    "There are two people inside the inn", # 여관 안에 사람이 2명이 있다
    "There is a wardrobe and a bed in the room", # 방 안에 장롱과 침대가 있다
    "There is a bed and a wardrobe in the room" # 방 안에 침대와 장롱이 있다
]

결과는 다음과 같이 나옵니다. 울고 있는 모습을 정확히 캐치한 점이 흥미롭고, There is a wardrobe and a bed in the roomThere is a bed and a wardrobe in the room의 점수 차이를 보면 이미지를 파악하는 순서나 단어의 위치에 따라 벡터값이 달라지는 걸 볼 수 있습니다.

Loading model 'clip-ViT-L-14' into memory...
Model loaded in 7.57 seconds on cpu

--- Extracting Embeddings ---
Image encoding latency: 552.03 ms
Text encoding latency: 441.05 ms

Image Vector Shape: (768,)

--- Cosine Similarity Results ---
1. Score: 0.2711 -> Query: 'A scene of someone crying on the bed'
2. Score: 0.2227 -> Query: 'A scene inside an inn'
3. Score: 0.2223 -> Query: 'There is a bed and a wardrobe in the room'
4. Score: 0.2205 -> Query: 'There are four people inside the inn'
5. Score: 0.2205 -> Query: 'There are three people inside the inn'
6. Score: 0.2161 -> Query: 'The protagonist's party watching an elf jumping while crying'
7. Score: 0.2159 -> Query: 'There are two people inside the inn'
8. Score: 0.2094 -> Query: 'There is a wardrobe and a bed in the room'
9. Score: 0.2020 -> Query: 'Characters having a meal and laughing together'
10. Score: 0.1944 -> Query: 'Flashy action of the protagonist swinging a sword'
11. Score: 0.1459 -> Query: 'A gloomy scene with falling rain'

Colab 환경에서 OpenAI CLIP 써보기

이전에는 허깅페이스를 사용해 쉽게 모델에 접근했고 로컬에서 작업을 했습니다. 이번에는 Colab 환경에서 OpenAI CLIP을 사용해서 sentence_transformers 없이 CLIP으로 직접 특징 벡터를 추출하고, 이미지와 텍스트가 어떻게 비교되는지 시각화해봅니다. Colab은 GPU 런타임을 선택하면 CUDA 환경에서 모델 추론과 행렬 연산을 더 효율적으로 테스트할 수 있고 편이해서 이번에는 이 환경에서 진행해보려고 합니다.

  • Colab: Google Colab(Colaboratory)로 브라우저 기반의 무료 파이썬(Python) 개발 샌드박스 환경
  • CUDA(Compute Unified Device Architecture): 엔비디아(NVIDIA)가 개발한 병렬 컴퓨팅 플랫폼 및 프로그래밍 모델

CLIP이 이미지와 텍스트의 유사도를 계산하는 부분은 앞에서 Hugging Face를 통해 간략하게 1:N으로 진행했었는데, 이번에는 이미지와 텍스트를 N:M으로 비교해봅시다.

곧 나올 코드의 흐름은 다음과 같습니다.

이미지 파일 로드 → CLIP 전처리 → 배치 Tensor 생성 → GPU 이동 → 이미지 벡터 추출
텍스트 쿼리 → CLIP 토큰화 → GPU 이동 → 텍스트 벡터 추출
이미지 벡터 N개 × 텍스트 벡터 M개 → N:M 유사도 행렬 → 히트맵 시각화

이번 예시로 아래 3가지를 파악합니다.

  • 벡터 추출: 이미지와 텍스트가 각각 어떤 특징 벡터로 바뀌는지 확인
  • N:M 유사도 비교: 1:1 반복 비교가 아니라 행렬곱 한 번으로 모든 이미지-텍스트 조합을 비교
  • 시각화(Heatmap): 코사인 유사도와 softmax 상대 점수를 함께 보며 쿼리별 반응을 해석

이를 위해서 아래 사이트들을 참고할 수 있죠.

다음 코드를 Colab에 넣어봅시다.

  1. 필요한 라이브러리와 영상을 다운로드해봅시다.
!pip install yt-dlp
!pip install git+https://github.com/openai/CLIP.git
# yt-dlp로 프레임 추출에 사용할 영상 다운로드
!yt-dlp -f "bestvideo[ext=mp4]/best[ext=mp4]/best" "https://www.youtube.com/watch?v=qDFeurdWDNE" -o sample_anime.mp4
  1. 동영상에서 프레임 추출
# ffmpeg로 1차 리사이징 및 프레임 추출 (I/O 병목 방지)
# ViT-L/14 모델의 기본 입력은 224px이므로 scale=-1:224로 맞춤
!mkdir -p frames
!ffmpeg -i sample_anime.mp4 -vf "fps=2.5,scale=-1:224" -q:v 2 frames/frame_%04d.jpg
  1. 프레임 이미지와 텍스트를 벡터로 바꾼 뒤 N:M 유사도 비교 진행

아래 코드는 크게 네 단계로 나뉩니다.

  • 전체 프레임 중 일부를 균등하게 샘플링
  • 이미지와 텍스트를 각각 CLIP 입력 형태로 변환
  • CLIP으로 이미지/텍스트 특징 벡터 추출
  • 벡터 간 유사도를 행렬곱으로 계산하고 히트맵으로 시각화
import os
import time
import torch # 딥러닝 프레임워크
import clip # 신경망
from PIL import Image # 이미지 가공
import matplotlib.pyplot as plt # 시각화
import seaborn as sns # 데이터 통계 시각화
import matplotlib.gridspec as gridspec # grid 세팅

# CLIP 모델과 이미지 전처리 함수 로드
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Loading ViT-L/14 model...")
# 이 모델은 224px 이미지를 14px 패치로 나누므로 16 x 16 = 256개 패치를 사용함
# 이미지 크기를 224에서 336으로 늘리면 패치 수가 증가해 GPU 비용도 함께 증가함
# 224px이어도 CLIP은 색감, 구도, 객체, 분위기 같은 정보를 의미적 특징 벡터로 인코딩함
model, preprocess = clip.load("ViT-L/14", device=device)
model.eval() # 모델을 추론 모드로 설정

# CUDA 연산은 비동기로 실행되므로 시간 측정 전후에 사용할 동기화 함수
def sync_if_cuda():
  if device == "cuda":
    torch.cuda.synchronize()

# 전체 프레임 중 시각화할 이미지 샘플링
image_count = 20
all_frame_files = sorted([os.path.join("frames", f) for f in os.listdir("frames") if f.endswith(".jpg")])

if len(all_frame_files) > image_count and image_count > 1:
  sampled_indices = [
      round(i * (len(all_frame_files) - 1) / (image_count - 1))
      for i in range(image_count)
  ]
  frame_files = [all_frame_files[i] for i in sampled_indices]
else:
  frame_files = all_frame_files[:image_count]
print(f"Extracting features for {len(frame_files)} frames...")

# 샘플링한 이미지들을 전처리한 뒤 하나의 배치 Tensor로 묶어 GPU로 이동
image_tensors = torch.stack([preprocess(Image.open(f)) for f in frame_files]).to(device)
# 텍스트 토큰화, sentence-transformers를 쓰지 않으므로 CLIP 토크나이저를 직접 사용
queries = [
    "Netflix",
    "Sad scene",
    "Neon-lit Night City",
    "Giant Hologram Advertisement",
    "Rainy Dystopian Street",
    "Cybernetic Body Enhancements",
    "Flying Vehicle in the Sky",
    "Digital Glitch Effect",
    "High-tech Hacking Interface",
    "Wires and Cables Underground",
    "Cyberpunk Techwear Fashion",
    "Melancholic Loneliness"
]
text_tokens = clip.tokenize(queries).to(device)

# 특징 추출 (Feature Extraction & Normalization)
with torch.no_grad(): # 그래디언트 추적을 꺼서 추론 속도와 VRAM 사용량을 개선
  # 이미지 배치와 텍스트 배치를 각각 한 번에 인코딩
  sync_if_cuda()
  encode_start = time.perf_counter()
  image_features = model.encode_image(image_tensors)
  text_features = model.encode_text(text_tokens)
  sync_if_cuda()
  encode_time = time.perf_counter() - encode_start

  # L2 정규화: 벡터의 길이를 1로 맞춰 방향성(Cosine Similarity)만 비교할 수 있게 함, 비교하기 위해 정규화를 적용했다고 이해하면 될 것 같습니다.
  image_features /= image_features.norm(dim=-1, keepdim=True)
  text_features /= text_features.norm(dim=-1, keepdim=True)

  # 1:1 방식: 각 이미지-텍스트 쌍을 하나씩 비교
  sync_if_cuda()
  loop_start = time.perf_counter()
  loop_scores = []
  for image_feature in image_features:
    row = []
    for text_feature in text_features:
      row.append(image_feature @ text_feature)
    loop_scores.append(torch.stack(row))
  loop_scores = torch.stack(loop_scores)
  sync_if_cuda()
  loop_time = time.perf_counter() - loop_start

  # N:M 방식: 행렬곱 한 번으로 전체 유사도 행렬 계산
  # image_features (프레임 수, 768) @ text_features.T (768, 쿼리 수) = cos_sim (프레임 수, 쿼리 수)
  sync_if_cuda()
  matrix_start = time.perf_counter()
  cos_sim_tensor = image_features @ text_features.t()
  sync_if_cuda()
  matrix_time = time.perf_counter() - matrix_start

  # 유사도 점수를 CLIP의 logits와 softmax 상대 점수로 변환
  logit_scale = model.logit_scale.exp()
  # logits은 코사인 유사도에 CLIP이 학습한 스케일 값을 곱한 점수
  logits = logit_scale * cos_sim_tensor

  # softmax는 전체 정답 확률이 아니라 현재 쿼리 목록 안에서의 상대 점수
  probs = logits.softmax(dim=-1).cpu().numpy()
  # 순수 코사인 유사도
  # cpu().numpy(): GPU 텐서를 CPU 메모리로 옮긴 뒤 NumPy 배열로 변환
  cos_sim = cos_sim_tensor.cpu().numpy()

# 이미지/텍스트를 CLIP 벡터로 바꾸는 모델 추론 시간이 어느 정도인지 확인
print(f"CLIP feature encoding time: {encode_time:.6f}s")
# 이미지-텍스트 쌍을 하나씩 비교하면 얼마나 걸리는지 확인
print(f"1:1 loop similarity time: {loop_time:.6f}s")
# 같은 비교를 N:M 행렬곱 한 번으로 처리하면 얼마나 걸리는지 확인
print(f"N:M matrix similarity time: {matrix_time:.6f}s")
# 전체 비교 기준으로 행렬곱 방식이 1:1 반복 방식보다 몇 배 빠른지 확인
print(f"Matrix speedup: {loop_time / max(matrix_time, 1e-12):.2f}x")

# 두 방식이 같은 코사인 유사도 행렬을 만드는지 최대 오차로 체크
max_diff = (loop_scores - cos_sim_tensor).abs().max().item()
print(f"Max difference between two methods: {max_diff:.8f}")

# 시각화 설정
num_frames = len(frame_files)
fig = plt.figure(figsize=(16, num_frames * 1.5))
# grid 설정
gs = gridspec.GridSpec(1, 2, width_ratios=[3, 1])

# 코사인 유사도와 softmax 상대 점수를 함께 표시
heatmap_labels = [
    [f"{cos_sim[i, j]:.3f} ({probs[i, j]:.2f})" for j in range(len(queries))]
    for i in range(num_frames)
]
# 히트맵 색상은 cos_sim 기준, 셀 텍스트에는 cos_sim과 probs를 함께 표시
ax_cos = plt.subplot(gs[0])
sns.heatmap(cos_sim, annot=heatmap_labels, cmap='Blues', fmt="",
            xticklabels=queries,
            yticklabels=[f"Frame {i+1}" for i in range(num_frames)],
            ax=ax_cos, cbar_kws={"shrink": 0.5})
ax_cos.set_title(
    "CLIP Cosine Similarity (Softmax Relative Score)", 
    fontsize=15, 
    pad=20
)

# 오른쪽에 이미지 그리기 
gs_img = gridspec.GridSpecFromSubplotSpec(num_frames, 1, subplot_spec=gs[1])
for i, img_path in enumerate(frame_files):
  ax_img = plt.subplot(gs_img[i])
  img = Image.open(img_path)
  ax_img.imshow(img)
  ax_img.axis('off') # 축 숨기기
  ax_img.set_title(f"Frame {i+1}", fontsize=9)

plt.tight_layout()
plt.show()

여기서 시간 비교는 CLIP 모델 자체의 성능 벤치마크가 아니라, 이미 추출된 특징 벡터들을 비교하는 방식의 차이를 보기 위한 것입니다. 작은 예제에서는 차이가 크게 보이지 않을 수 있지만, 이미지 수와 쿼리 수가 늘어날수록 1:1 반복 비교보다 N:M 행렬곱의 효율성이 더 커집니다.

정리하면, 이 코드에서 봐야 하는 핵심은 다음과 같습니다.

  • image_features: 이미지들을 의미 벡터로 바꾼 결과
  • text_features: 텍스트 쿼리들을 같은 공간의 의미 벡터로 바꾼 결과
  • cos_sim: 이미지 벡터와 텍스트 벡터의 방향이 얼마나 비슷한지 보는 값
  • logits: 코사인 유사도에 CLIP이 학습한 스케일 값(logit_scale)을 곱한 점수
  • logit_scale: softmax에 들어가기 전 점수 차이를 더 명확히 만드는 값
  • softmax: 한 프레임 안에서 현재 쿼리 후보들끼리 상대 비교한 값
  • N:M 비교: N개의 이미지와 M개의 텍스트를 행렬곱 한 번으로 비교하는 구조

위 코드를 실행한 결과는 다음과 같습니다.

Loading ViT-L/14 model...
Extracting features for 20 frames...
CLIP feature encoding time: 70.832849s
1:1 loop similarity time: 0.002893s
N:M matrix similarity time: 0.000110s
Matrix speedup: 26.31x
Max difference between two methods: 0.00000007

CLIP에서 특징을 추출하는 데 70.832849s로 가장 시간이 많이 걸렸습니다. 항상 이렇게 오래 걸리는 게 아닌 첫 실행일 경우 모델에서 인코딩 비용 이외에도 CUDA 초기화나 GPU 워밍업, 첫 커널 실행 같은 콜드 스타트들이 섞인 경우가 많으니 참고 부탁드립니다. 즉 첫 실행에는 오버헤드가 발생하는 거죠.

나중에 워밍업을 한 뒤 실행했을 때의 인코딩 속도를 표로 보여줄 예정이니 정확한 시간 분리는 넘어가도록 합니다.

    image_features = model.encode_image(image_tensors)
    text_features = model.encode_text(text_tokens)

인코딩 후 나온 특징 벡터로 유사도를 계산할 때는 아직 데이터 양이 작아 0.002893s0.000110s로 절대적인 시간 차이가 작게 발생하지만, 데이터가 늘수록 시간 차이는 더 커집니다.

1:1 반복 방식과 N:M 행렬곱 방식의 정합성을 확인하기 위해 결과 차이를 비교했을 때 Max difference0.00000007로 매우 작은 걸 확인할 수 있는데, 이는 사실상 같은 코사인 유사도 행렬을 만든다고 볼 수 있습니다. 이 정도 차이가 발생하는 이유는 GPU/부동소수점 연산 순서 차이에서 생길 수 있는 미세한 오차입니다. 그리고 N:M으로 행렬곱을 했을 때 1:1 방식보다 26.31배 빠르게 동작하는 것도 볼 수 있었습니다.

마지막으로 이를 실행한 시각화 결과는 아래와 같습니다. 유튜브 영상을 다운받고 프레임별로 가장 부합하는 키워드들을 찾아주는 걸 확인할 수 있습니다. colab_result_grid.webp

3. 대용량 전처리 파이프라인

RAG 시스템으로 만들기 위해서는 우선 동영상에서 뽑은 수많은 프레임을 벡터화해서 저장하는 전처리 작업이 필요합니다. 하지만 유튜브 영상의 전체 프레임을 뽑을 때 모든 프레임이 유의미한 차이를 지니진 않습니다. 또한 애니메이션은 보통 24fps지만 실제 애니메이션 제작 과정에서 비용 절감을 위해 아래 방법을 사용합니다.

  • 1-frame (Full, 24fps): 매 프레임 그림이 바뀜 (움직임이 매우 부드러움)
  • 2-frame (On twos, 12fps): 1장의 그림을 2프레임 동안 유지
  • 3-frame (On threes, 8fps): 1장의 그림을 3프레임 동안 유지.

그래서 1초당 8장의 이미지를 추출할 경우 이미지 양은 매우 늘어나지만 정작 1초 안에 프레임 간의 큰 차이가 발생하지 않을 가능성이 크기에 합리적으로 접근하기 위해 1초에 1장을 추출하는 걸로 전제로 두고 해봅시다.

2초에 한 번, 3초에 한 번 같은 방식으로 더 늘릴 수 있지만 현재는 정확한 결과를 도출하는데 목적을 두고 진행합니다.

프레임을 뽑을 동영상은 1분짜리 티저가 아닌 20분짜리 애니메이션 한 편을 통째로 써서 약 1,420 프레임을 분석하는 작업을 최대한 빠르게 끝내는 걸 목적으로 둡시다.

이번 챕터의 목적은 제 맥북 M1 Air에서 가장 빠른 batch_size, num_workers 조합을 파악해보는 걸로 둡시다.

  • batch_size: 한 번에 모델(encode_image)에 넘기는 이미지 장 수입니다.
  • num_workers: DataLoader가 쓰는 데이터 로딩·전처리용 서브프로세스(워커) 수로, 메인 프로세스 부담을 덜어 주기 위해 메인 프로세스에 데이터를 공급하는 역할을 합니다.
  • DataLoader: 많은 프레임(Dataset)을 배치로 빠르게 돌리고 로딩을 병렬화하기 위해 사용합니다.

이 세팅 값을 알기 위해 로컬에 가중치가 들어간 ViT-L-14.pt를 받아서 테스트해봅니다.

  • 가중치: 인공지능이 데이터를 처리하고 예측을 내리는 데 필요한 핵심적인 학습 값

ViT-L-14.pt는 이미지·텍스트를 처리하는 거대한 네트워크의 모든 학습된 숫자가 들어 있는 체크포인트(checkpoint)로 보시면 됩니다. 이를 PyTorch에서는 보통 .pt / .pth로 많이 저장해서 사용합니다. 이 모델을 기준으로 제 로컬에서 배치 설정값을 실행했을 때 다음 결과를 보여줍니다.

항목
model_nameViT-L/14
devicemps
num_frames1420

batch_sizenum_workers는 다음처럼 사용됩니다.

dataloader = DataLoader(
    dataset,                   # 배치마다 샘플을 꺼내는 Dataset
    batch_size=batch_size,     # 한 스텝에서 묶어서 모델에 넘길 이미지 개수
    shuffle=False,             # 순서 섞기 여부
    num_workers=num_workers,   # 디스크 로딩·전처리용 서브프로세스 수 (0이면 메인만)
    pin_memory=pin,            # pinned memory 사용 (CUDA→GPU 복사 최적화 등, MPS에선 효과 제한적일 수 있음)
)
  • MPS(Apple Silicon용 Metal Performance Shaders): PyTorch 등 딥러닝 프레임워크가 애플 실리콘 Mac의 내장 GPU를 활용하여 딥러닝 모델 학습 및 추론 속도를 높이는 백엔드 기술입니다.

batch_size가 32,64,128 인 이유는 가장 많이 사용하는 수이기에 사용했습니다.
많은 CUDA 커널·텐서 연산이 워프(32 스레드) 단위로 돌아가고, 메모리나 캐시 사용도 2의 거듭제곱 크기에서 효율이 나오는 경우가 있어서 저 수치를 고른 것이기에 24,48,96 등 다른 수를 사용해도 되죠.

batch_sizenum_workers초당 처리량전체 소요시간
6403.9502359.4768
3203.8425369.5502
6443.7334380.3494
12843.6364390.4925
3243.5383401.3276
12803.5074404.8612
12883.463410.0507
6483.3836419.6772
3283.2579435.8696
128163.2458437.4923
32162.9983473.6013
64162.9033489.1044
25602.8655495.5498
25642.2234638.6575
25680.53112673.9189
256160.48232944.1662

제 PC 환경에서는 batch_size=64, num_workers=0일 때, 워커용 서브프로세스를 아예 안 두는 쪽이 가장 빨랐네요. 서브프로세스가 필요 없는 이유는, 워커를 만들어 스케줄링해서 배치하는 효율보다 그걸 돌리면서 생기는 오버헤드가 더 컸기 때문입니다. 그래서 메인 단일 프로세스에 CPU 자원을 몰아주는 게 더 빨랐던 거죠.

아마 CPU나 GPU 자원이 훨씬 넉넉한 환경에서는 워커가 체감 차이를 만들 수도 있었겠지만, 제 환경에서는 그런 제약 때문에 이런 결과가 나온 걸로 보입니다.

4. 벡터 검색 엔진 구축

RAG 시스템 구현을 위해 텍스트 쿼리와 프레임 이미지를 같은 CLIP 임베딩 공간에 올려 두고 사용자가 입력한 쿼리 벡터와 가까운 프레임 벡터를 찾는 의미(시맨틱) 검색을 만들어야 합니다. 이때 고차원 벡터를 빠르게 근사 최근접 이웃(ANN) 탐색해 주는 저장소가 필요하게 되는데 Qdrant가 그중 하나입니다.

Qdrant

Qdrant는 벡터를 컬렉션에 적재하고, 거리(코사인, 유클리드 등) 기준으로 가까운 값을 찾아주는 벡터 DB입니다. 수백만 개의 고차원 벡터 사이에서 쿼리와 가까운 것을 빠르게 찾기 위해 ANN(근사 최근접 이웃) 방식을 씁니다.

  • ANN (근사 최근접 이웃): 쿼리 벡터와 아주 가까운 벡터를 찾되, 매번 모든 값이랑 비교하지 않고 그래프·트리 같은 구조를 타고 가며 가까운 후보를 빠르게 탐색
  • k-NN (정확 최근접): 가능한 모든 벡터와 거리를 비교해 진짜로 가장 가까운 이웃을 탐색 k-NN 방식은 데이터가 많으면 비용이 커서 벡터 DB에서는 ANN을 많이 택합니다.

Qdrant는 다음 명령어로 간단하게 도커에 DB 서버를 띄울 수 있습니다. (제 로컬 기준)

docker pull qdrant/qdrant
docker run -p 6333:6333 -p 6334:6334 \
    -v "$(pwd)/qdrant_storage:/qdrant/storage:z" \
    qdrant/qdrant

우선 Collection(Table과 비슷한 개념)의 형식을 정의합니다. Qdrant에서 Collection은 포인트(벡터+메타데이터)를 저장하고 관리하는 최상위 논리 단위입니다. RDBMS(관계형 데이터베이스)의 테이블(Table)이나 데이터베이스(Database)와 유사한 개념으로, 특정 벡터 검색 쿼리가 실행되는 대상 공간을 의미합니다.

  • Points: 컬렉션 안에 저장되는 한 건(row) 단위의 레코드
curl -X PUT http://localhost:6333/collections/test_collection \
  -H 'Content-Type: application/json' \
  -d '{
    "vectors": { "size": 4, "distance": "Cosine" }
  }'

아래처럼 데이터를 삽입한 후

curl -X PUT http://localhost:6333/collections/test_collection/points \
  -H 'Content-Type: application/json' \
  -d '{
    "points": [
      { "id": 1, "vector": [0.05, 0.61, 0.76, 0.74], "payload": {"city": "Seoul", "age": 26} },
      { "id": 2, "vector": [0.19, 0.81, 0.75, 0.11], "payload": {"city": "Tokyo", "age": 30} }
    ]
  }'

쿼리 벡터와 가까운 점을 찾으면서 페이로드(메타데이터) 에도 조건을 걸 수 있습니다.

curl -X POST http://localhost:6333/collections/test_collection/points/search \
  -H 'Content-Type: application/json' \
  -d '{
    "vector": [0.2, 0.1, 0.9, 0.7],
    "filter": {
        "must": [ { "key": "city", "match": { "value": "Seoul" } } ]
    },
    "limit": 3
  }'

Qdrant는 고차원 벡터 검색에서 많이 쓰는 HNSW(Hierarchical Navigable Small World) 같은 그래프 기반 인덱스를 사용할 수 있습니다.

이는 벡터를 노드처럼 잇고 레이어를 나눠 둡니다. 찾는 벡터 값과 멀리 있는 후보는 건너뛰고 가까운 쪽만 따라가며 탐색할 수 있어서 매번 전부와 거리 비교하지 않아도 실용적인 속도가 나오는 편이라 벡터 DB에서 흔하게 선택하는 방식입니다.

프로젝트 Collection 구조 정하기

이제 RAG 시스템에서 쓰일 Collection 설계를 잡아봅시다.

Collection에는 다음 2개의 구조가 들어갑니다.

  • 벡터 쪽 설정(차원·거리 측정): 컬렉션을 만들 때 API에 넣는 값으로 포인트 payload 메타데이터와는 다른 층
    • 위 예제에서 쓰인 size, distance
  • 페이로드: 포인트마다 붙는 메타데이터 키

벡터 설정(컬렉션 생성 시)
PUT /collections/{name} JSON 안의 vectors 블록에 해당합니다. 아래 표에 있는 값은 작품 제목·화수·장르 같은 메타데이터 영역이 아니라 이 컬렉션의 벡터 칸이 몇 차원이고 어떤 거리로 비교할지를 고정하는 설정입니다.

설정추천설명
vectors.sizeCLIP 등 선택한 인코더의 출력 차원예: 모델마다 512·768 등으로 고정. 틀리면 upsert 단계에서 바로 깨짐
vectors.distanceCosineCLIP 계열 벡터는 보통 길이를 맞춰 두고 각도만 비교하는 경우가 많아 코사인을 자주 사용
멀티 벡터(named vectors)필요 시 멀티 벡터로 사용벡터 값을 하나가 아닌 여러 개로 두고 싶을 때 사용

단일 벡터만 쓸 때는 포인트가 "vector": [0.1, 0.2, ...]처럼 배열 하나이고, named vectors일 때는 "vector": { "키이름": [...] } 형태가 됩니다.

페이로드
벡터로 검색한 후 검색 결과가 “어느 작품·몇 화·몇 초 장면인지”로 확인할 때 쓰거나, Qdrantfilter로 조건으로 쓸 메타데이터입니다. RDB를 어떻게 설계하느냐에 따라 사용법은 많이 달라질 수 있지만, 일단은 아래 형태를 기초로 둡니다.

필드(키)Qdrant 타입용도비고
anime_idkeyword작품 식별(슬러그)filter로 특정 작품만 검색
episodeinteger에피소드(화) 번호특정 화 안에서만 검색할 때 사용
genrekeyword장르 슬러그RDB와 조인해도 되고, Qdrant filter에도 사용
frame_indexinteger그 화 안에서의 프레임 순번추출·upsert 시 부여한 인덱스
frame_filekeyword추출된 프레임 파일 경로디버깅·재처리·원본 장면 추적
job_public_idkeywordingest 잡 UUID한 번의 적재 작업 단위 구분, 포인트 id 생성에도 사용
timestamp_sec(float)재생 시각(초)upsert 시 함께 넣음. 아래 컬렉션 정보 조회payload_schema에는 인덱스를 건 키만 나타남

포인트(Points)의 id는 컬렉션 안에서 중복 없이 유일해야 합니다. job_public_idframe_index를 조합해 만든 UUID를 문자열 id로 씁니다.

벡터 정보 저장하기

이제 동영상에서 프레임을 뽑고 이를 사전 학습된 모델로 데이터를 벡터화했죠. 이 벡터화한 값을 가지고 유사도 검사를 했었습니다. 저는 이를 이용해서 애니메이션 장면을 파악해서 검색하는 기능을 만들고 싶습니다. 이를 구현하기 위해서 벡터 DB에 저장해서 나중에 RAG 시스템에서 이용할 수 있도록 구성합시다.

아래는 3번까지 진행한 파이프라인입니다.

flowchart TB subgraph step3["~ 3번"] direction LR v1[동영상] --> f1[프레임 추출] --> e1[벡터화] --> sim[유사도 계산] end

이제 아래처럼 진행하지만, 전체 코드 양은 방대하므로 컨셉이 담긴 핵심 코드로 정리합니다.

flowchart TB subgraph step4["4장"] direction LR v2[동영상] --> f2[프레임 추출] --> e2[벡터화] --> db[벡터 DB에 저장] end

테스트는 REST API를 통한 HTTP로 동기적으로 진행되는데, 실제 서비스에서는 비동기로 워커가 돌게끔 구상합니다 이는 5장에서 구현합니다.

프레임 추출

subprocessffmpeg를 사용해 프레임을 추출하겠습니다.
OpenCV로도 가능하지만, ffmpeg로 JPG까지 넘기는 방식이 단순하고 BGR/RGB 같은 색 공간도 신경 쓰지 않아도 되며, 추출 과정을 ffmpeg 명령으로 외부 프로세스 하나에 맡겨 관리하기 편하다는 여러 이유 때문에 이 방법을 선택했습니다.

다음 함수로 프레임을 추출했습니다.

def extract_frames_ffmpeg(
    *,  # 키워드 전용
    video_path: Path,  # 입력 동영상 경로
    output_dir: Path,  # 추출 JPG 저장 디렉터리
    ffmpeg_bin: str | None = None,  # ffmpeg 실행 경로(None이면 자동 해석)
    fps: float | None = None,  # 초당 추출 장수(-vf fps, None이면 생략)
    max_frames: int | None = None,  # 최대 장수(None이면 제한 없음)
    jpeg_quality: int = 2,  # JPEG -q:v(작을수록 보통 고화질)
) -> int:  # 성공 시 frame_*.jpg 개수
    if not video_path.is_file():
        raise FileNotFoundError(f"동영상 파일이 없습니다: {video_path}")

    output_dir.mkdir(parents=True, exist_ok=True)
    for old in output_dir.glob("frame_*.jpg"):
        old.unlink()

    exe = resolve_ffmpeg_exe(ffmpeg_bin)
    # ffmpeg 옵션
    cmd: list[str] = [
        exe,
        "-hide_banner",
        "-loglevel",
        "error",
        "-y",
        "-i",
        str(video_path),
    ]
    if fps is not None and fps > 0:
        cmd.extend(["-vf", f"fps={fps}"])
    cmd.extend(["-q:v", str(jpeg_quality)])
    if max_frames is not None and max_frames > 0:
        cmd.extend(["-frames:v", str(max_frames)])
    cmd.append(str(output_dir / "frame_%06d.jpg"))

    # 추출 수행
    proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
    if proc.returncode != 0:
        err = (proc.stderr or proc.stdout or "").strip()
        raise RuntimeError(f"ffmpeg 실패 (exit {proc.returncode}): {err or 'no stderr'}")

    return len(list(output_dir.glob("frame_*.jpg")))

벡터화

벡터화하는 작업은 이전의 코드와 비슷합니다.

def extract_features(
        self,  # CLIP 서비스(self.model·preprocess·device)
        frames_dir: str,  # 프레임 JPG 디렉터리
        *,  # 키워드 전용
        batch_size: int,  # 배치 크기(한 스텝에 넣을 이미지 수)
        num_workers: int,  # DataLoader 워커 수(0이면 메인만)
        verbose: bool = True,  # 로그·요약 출력 여부
        log_batches_every: int = 10,  # N 배치마다 진행 로그(0이면 생략)
    ) -> Tuple[np.ndarray, float, float]:
        # jpg 목록 가져오기
        all_frame_files = list_frame_jpgs(frames_dir)
        if not all_frame_files:
            return self._empty_features(), 0.0, 0.0

        # dataset에서 인덱스마다 파일을 읽고 그때 self.preprocess를 적용하게 생성
        dataset = AnimeFrameDataset(all_frame_files, self.preprocess)
        pin = self.device == "cuda"
        # Dataloader로 배치, 병렬처리 지정
        dataloader = DataLoader(
            dataset,
            batch_size=batch_size,
            shuffle=False,
            num_workers=num_workers,
            pin_memory=pin,
        )

        all_features: List[np.ndarray] = []
        start_time = time.perf_counter()
        # 추론할 때 그래프/그래디언트 비활성화해서 속도 향상
        # 학습을 위해 켜지는 옵션을 비활성화해서 추론 비용을 가볍게 처리
        with torch.no_grad():
            # 프레임 파일을 배치 단위로 순회
            for batch_idx, images in enumerate(dataloader):
                images = images.to(self.device)
                # 모델로 벡터(임베딩) 추출
                features = self.model.encode_image(images)
                # L2 정규화
                features /= features.norm(dim=-1, keepdim=True)
                all_features.append(features.cpu().numpy())

                if verbose and log_batches_every > 0 and (batch_idx + 1) % log_batches_every == 0:
                    print(f"Processed batch {batch_idx + 1}/{len(dataloader)}...")

        _sync_device(self.device)
        # 배치 단위로 나뉜 임베딩을 “프레임 N개 × 차원 D”처럼 하나의 레이아웃으로 합침
        final_matrix = np.vstack(all_features)
        elapsed = time.perf_counter() - start_time
        frames_per_sec = len(all_frame_files) / elapsed if elapsed > 0 else 0.0

        if verbose:
            print(f"  >> Inference Time: {elapsed:.2f}s ({frames_per_sec:.2f} frames/sec)")
        return final_matrix, elapsed, frames_per_sec

저장

Qdrant에서 컬렉션을 생성한 뒤 다음 코드로 벡터 결과를 Qdrant에 저장할 수 있습니다.

벡터만 저장하면 되니 postgresql처럼 벡터 확장(pgvector 등)이 있는 걸 택하는 선택지도 있지만, 이 글에서는 전용 벡터 DB를 쓰기로 했습니다. 대략적인 차이는 아래 표로 정리할 수 있습니다.

비교 항목일반 DB + 벡터 확장 (예: pgvector)전용 벡터 DB (Qdrant)
적합한 상황기존 DB를 쓰면서 가볍게 벡터 검색 추가대규모 데이터, 초고속 검색, 복잡한 필터링
성능보통 (데이터가 많아지면 느려질 수 있음)최상 (Rust 엔진 등으로 최적화되는 편)
기능기본적인 벡터 연산 위주양자화, 하이브리드 검색, 멀티테넌시 등
복잡도낮음 (기존 SQL 활용)중간 (별도 인프라 운영)
def upsert_job_frame_vectors(
    *,  # 키워드 전용
    job_public_id: uuid.UUID,  # 잡 단위 UUID — 포인트 id 생성에 사용
    anime_slug: str,  # 작품 슬러그
    episode: int | None,  # 화 번호(없으면 None)
    genre_slugs: list[str],  # 장르 슬러그 목록
    frame_indices: list[int],  # 프레임 인덱스(N개)
    vectors: Any,  # 임베딩 (N, D)
    timestamps: list[float],  # 재생 시각 초(N개)
    frame_files: list[str],  # 프레임 파일 경로(N개)
) -> None:
    # _get_client: Qdrant Client 로드
    pair = _get_client()
    if pair[0] is None:
        return
    client, collection = pair
    try:
        # PointStruct 포인트를 만들 때 쓰는 클래스
        from qdrant_client.models import PointStruct
    except ImportError:
        return

    n = len(frame_indices)
    if n == 0:
        return
    if len(timestamps) != n or len(frame_files) != n:
        raise ValueError("frame_indices, timestamps, frame_files 길이가 일치해야 합니다.")

    # 필요한 경우에만 로드, 앞에서 반환할 수 있으니
    import numpy as np

    mat = np.asarray(vectors, dtype=np.float32)
    if mat.ndim != 2 or mat.shape[0] != n:
        raise ValueError(f"vectors 는 (N, D) 형태여야 합니다. N={n}, shape={mat.shape}")

    dim = int(mat.shape[1])
    # 컬렉션 생성 및 페이로드에 대한 인덱스 생성
    ensure_collection_and_indexes(vector_dim=dim)

    points: list[Any] = []
    for i in range(n):
        fid = frame_indices[i]
        vec = mat[i]
        # 포인트 id 생성
        pid = frame_point_uuid(job_public_id, fid)
        payload = build_frame_payload(
            anime_slug=anime_slug,
            job_public_id=job_public_id,
            frame_index=fid,
            frame_file=frame_files[i],
            episode=episode,
            genre_slugs=genre_slugs,
            timestamp_sec=float(timestamps[i]),
        )
        # 포인트: Qdrant의 데이터 단위
        points.append(
            PointStruct(
                id=str(pid),
                vector=vec.tolist(),
                payload=payload,
            )
        )

    client.upsert(collection_name=collection, points=points, wait=True)

핵심 로직은 위와 같이 정의했습니다.
현재는 Qdrant에 저장하지 않아서 아무런 포인트나 컬렉션이 없는 걸 볼 수 있습니다.

curl -sS 'http://127.0.0.1:6333/collections/anime_clip'
{"status":{"error":"Not found: Collection `anime_clip` doesn't exist!"},"time":0.000179333}%

프로세스를 실행해서 값을 저장했을 경우 다음처럼 볼 수 있죠.

컬렉션 정보 조회

QdrantGET /collections/{collection_name}은 컬렉션 전체 상태를 한 번에 보는 관리·점검용 API입니다. 쉽게 확인할 수 있다는 점이 편하죠. 개별 포인트의 벡터 값은 나열되지 않지만, 컬렉션 상태·설정·데이터 수 등을 요약해서 보여 줍니다.

curl -sS 'http://127.0.0.1:6333/collections/anime_clip' | jq
{
  "result": {
    "status": "green",
    "optimizer_status": "ok",
    "indexed_vectors_count": 0,
    "points_count": 1542,
    "segments_count": 4,
    "config": {
      "params": {
        "vectors": { "size": 768, "distance": "Cosine" },
        "shard_number": 1,
        "replication_factor": 1,
        "write_consistency_factor": 1,
        "on_disk_payload": true
      },
      "hnsw_config": {
        "m": 16,
        "ef_construct": 100,
        "full_scan_threshold": 10000,
        "max_indexing_threads": 0,
        "on_disk": false
      },
      "optimizer_config": {
        "deleted_threshold": 0.2,
        "vacuum_min_vector_number": 1000,
        "default_segment_number": 0,
        "max_segment_size": null,
        "memmap_threshold": null,
        "indexing_threshold": 10000,
        "flush_interval_sec": 5,
        "max_optimization_threads": null,
        "prevent_unoptimized": null
      },
      "wal_config": {
        "wal_capacity_mb": 32,
        "wal_segments_ahead": 0,
        "wal_retain_closed": 1
      },
      "quantization_config": null
    },
    "payload_schema": {
      "frame_index": { "data_type": "integer", "points": 1542 },
      "frame_file": { "data_type": "keyword", "points": 1542 },
      "anime_id": { "data_type": "keyword", "points": 1542 },
      "genre": { "data_type": "keyword", "points": 1542 },
      "episode": { "data_type": "integer", "points": 1542 },
      "job_public_id": { "data_type": "keyword", "points": 1542 }
    },
    "update_queue": { "length": 0 }
  },
  "status": "ok",
  "time": 0.002697042
}

위 응답에서 봐야 할 부분은 아래 테이블처럼 정리됩니다. 앞에서 말한 HNSW 관련 설정(hnsw_config, indexing_threshold 등)도 이 JSON에 들어 있습니다.

필드예시 값의미
result.statusgreen컬렉션 상태로 green이면 읽기·쓰기·검색에 문제 없음
result.optimizer_statusok세그먼트 병합·인덱싱 등 백그라운드 최적화가 정상
result.points_count1542저장된 포인트 수(벡터 + 페이로드)로 upsert가 반영됐는지 확인할 때 본다
result.indexed_vectors_count0HNSW 그래프에 올라간 벡터 수로 points_count와 다를 수 있음
result.segments_count4내부 저장 단위(세그먼트) 개수. 여러 번 upsert하면 늘 수 있음
result.config.params.vectorssize: 768, distance: Cosine이 컬렉션의 벡터 차원·거리 함수(설계 표의 vectors.size / vectors.distance)
result.config.optimizer_config.indexing_threshold10000이 개수 미만이면 HNSW 인덱스를 미룸 → indexed_vectors_count가 0일 수 있음
result.config.hnsw_config.full_scan_threshold10000벡터가 이보다 적으면 검색 시 전수 탐색
result.payload_schema아래 표 참고페이로드 인덱스가 걸린 키와 타입·포인트 수. filter에 쓸 필드가 맞는지 점검
result.update_queue.length0아직 반영 대기 중인 업데이트가 없음
status / timeok, 0.002…HTTP API 자체 성공 여부·응답 시간

points_count는 저장된 포인트 수이고 indexed_vectors_count가 0인 이유는 벡터 개수(1542)가 indexing_threshold(10000)보다 작아 HNSW 인덱스를 아직 만들지 않았기 때문입니다. 벡터만 있고, 인덱스가 없어 전수 탐색으로 데이터를 찾는 게 현재 상태죠.

payload_schema는 위 페이로드 표와 대응합니다. points는 그 키를 가진 포인트 수입니다.

payload_schemadata_typepoints의미
anime_idkeyword1542작품 식별(예시 "123"은 내부 id)
episodeinteger1542화 번호
genrekeyword1540장르. 2건은 genre 없음(또는 빈 값)
frame_indexinteger1542프레임 순번
frame_filekeyword1542프레임 파일 경로
job_public_idkeyword1542ingest 잡 UUID

위는 전체 구조이고 아래처럼 개별에 대한 API endpoint를 제공해서 사용해보거나 도커에서 로컬로 마운트해서 파일 시스템으로 접근해서 볼 수도 있겠죠.

여기서는 API로 파악해봅시다.
exact는 요청 데이터 바디에 넣어서 사용하는데, 이 옵션은 근사치가 아닌 정확한 개수를 요청할 수 있습니다.

# 저장 건수만 다시 확인
curl -sS -X POST 'http://127.0.0.1:6333/collections/anime_clip/points/count' \
  -H 'Content-Type: application/json' \
  -d '{"exact": true}' | jq

결과는 아래와 같죠.

{
  "result": {
    "count": 1542
  },
  "status": "ok",
  "time": 0.0015215
}

아래 명령어로 개별 point를 볼 수 있습니다.
with_vectortrue로 두면 768차원 float 배열(예: 0.02366, -0.00133, -0.08224, …)이 함께 옵니다. 여기서는 payload 확인이 목적이므로 with_vector: false로 샘플 3건만 봅시다.

# payload만 샘플 3건 (id 확인용)
curl -sS -X POST 'http://127.0.0.1:6333/collections/anime_clip/points/scroll' \
  -H 'Content-Type: application/json' \
  -d '{"limit": 3, "with_payload": true, "with_vector": false}' | jq
{
  "result": {
    "points": [
      {
        "id": "0003386b-da20-5cd3-a190-c163c1dd8af7",
        "payload": {
          "anime_id": "123",
          "job_public_id": "0e0f35d4-bdcd-4a08-8f4d-746d861487c5",
          "frame_index": 1015,
          "frame_file": "frame_001016.jpg",
          "timestamp_sec": 42.291666666666664,
          "episode": 1,
          "genre": [
            "g-612048e5"
          ]
        }
      },
      {
        "id": "0077db65-24ab-5325-b904-f49ff6937df8",
        "payload": {
          "anime_id": "123",
          "job_public_id": "0e0f35d4-bdcd-4a08-8f4d-746d861487c5",
          "frame_index": 1393,
          "frame_file": "frame_001394.jpg",
          "timestamp_sec": 58.041666666666664,
          "episode": 1,
          "genre": [
            "g-612048e5"
          ]
        }
      },
      {
        "id": "00b12149-49e8-505c-9af6-44e0dfd38f5e",
        "payload": {
          "anime_id": "123",
          "job_public_id": "0e0f35d4-bdcd-4a08-8f4d-746d861487c5",
          "frame_index": 327,
          "frame_file": "frame_000328.jpg",
          "timestamp_sec": 13.625,
          "episode": 1,
          "genre": [
            "g-612048e5"
          ]
        }
      }
    ],
    "next_page_offset": "00b80937-7c2c-5bb9-9b12-e262b2350f31"
  },
  "status": "ok",
  "time": 0.011248125
}

5. 비동기 시스템 구조 (Celery, Job Worker & Redis Queue)

4장까지로 프레임 추출·벡터화·Qdrant 적재 같은 핵심 파이프라인은 갖춰졌습니다. 다만 지금은 HTTP로 직접 돌리거나 로컬에서 한 번에 실행하는 수준이고, 이를 그대로 서비스에 올리기엔 아직 부족한 부분이 많죠. 실서비스에서 부족한 점은, 관리자가 동영상을 올린 뒤 임베딩이 끝날 때까지 API 요청이 붙잡혀 있으면 타임아웃이 나기 쉽고, 웹 서버와 무거운 추론 작업이 한 프로세스에 섞이면 검색 API까지 같이 느려질 수 있다는 것입니다.

맥북 M1에서 로컬로 돌릴 때도 마찬가지죠. OSCPU·GPU를 자동으로 나눠 주는 게 아니라, ffmpeg는 주로 CPU, CLIPmps(Metal)로 GPU를 씁니다. 임베딩 작업과 HTTP 서비스 등 여러 프로세스가 동일한 자원을 쓰면 웹 환경이 임베딩 작업에 영향을 받아 응답이 느려질 수 있습니다. 그래서 역할을 분리하고 응답 지연을 방지하기 위한 해결법이 필요하죠.

참고) 맥북 Air M1 사양

  • 총 코어: 8개(성능 4 + 효율 4)
  • 메모리: 16 GB
  • SoC: CPU·GPU 통합(Unified Memory), mps로 GPU 가속
  • MPS(Metal Performance Shaders): Apple Silicon GPU로 딥러닝·ML 연산을 가속하는 PyTorch 백엔드

5장에서는 Celery 워커와 Redis 큐로 비동기 잡 구조를 구현합니다. 이는 Task Queue 방식으로, Django가 run_embedding_job_task.delay(job_public_id)로 넣은 메시지를 워커가 Redis에서 꺼내 처리합니다.

임베딩 잡 UUID는 DB·코드에서는 job_public_id, API JSON에는 보통 public_id 키로 같은 값을 씁니다(4장 Qdrant payload의 job_public_id와 동일). 로컬에서 워커를 1개만 두면 대기열은 FIFO(먼저 넣은 잡부터)에 가깝고, 워커나 concurrency를 늘리면 동시에 여러 잡이 돌아가 끝나는 순서는 달라질 수 있습니다. 이 패턴은 로컬뿐 아니라 클라우드에도 그대로 가져갈 수 있고, 분산 환경에서는 임베딩 워커는 늘리기 쉽지만 Redis 브로커는 클러스터·관리형 서비스를 고민해 봐야 합니다.

비동기 잡 구조가 필요한 구체적인 이유는 다음과 같습니다.

  • 업로드와 처리 분리: 파일만 먼저 받고, 메타·화수·장르 확인 후 임베딩 실행·취소를 선택.
  • 긴 작업 비동기화: ffmpeg·CLIP은 수 분 걸릴 수 있으니 public_id(job_public_id)를 바로 반환하고, pending / processing / running / done / failed로 단계 확인.
  • 실행 시점 조절: 피크 시간엔 스케줄로 미루거나, 한가한 시간에 몰아서 실행. (임베딩 전용 서버 분리는 실서비스 예시.)
  • 작업 묶음 처리: 여러 화를 모아 큐에 넣고 워커가 처리.
  • 실패·재시도: ffmpeg 오류, OOM 등 잡 단위 로그·재시도·실패 건만 재실행.
  • 부하 조절: 큐에 쌓아 두고 워커 수만큼만 동시 실행.
  • 확장: 클라우드로 확장 시 검색 API와 임베딩 워커를 분리해 워커만 늘리기 쉽게 확장 가능.

4장에서 프레임 추출부터 Qdrant 저장까지 검증했다면, 5장에서는 배포 가능한 서비스 형태로 옮기는 단계입니다.

Celery와 Redis 큐

임베딩 파이프라인처럼 오래 걸리는 작업은 Django 요청 안에서 돌리지 않고, Celery 태스크로 워커에 넘깁니다. 워커가 무엇을 할지 알려 주는 작업 메시지는 Redis 큐(브로커)에 쌓이고, 떠 있는 Celery worker 프로세스가 하나씩 꺼내 실행합니다.

아래 프로세스 시퀀스에서 전체 이벤트 순서를 볼 수 있습니다.

구성요소역할
Celery파이썬 태스크 큐. 임베딩 파이프라인을 백그라운드에서 실행
Redis (broker)“어떤 잡을 실행할지” 메시지를 잠깐 보관하는 큐
Redis (result backend, 선택)태스크 결과·상태를 잠깐 두는 저장소. 잡 상세는 DB가 source of truth
Celery worker큐에서 태스크를 꺼내 4장 파이프라인 실행
Django업로드 API, 잡 생성, delay()로 큐에 넣기
SQLiteDjango 기본 DB. 잡 상태·메타데이터 (EmbeddingJob)
Disk업로드 원본·프레임 JPG 스테이징 (jobs/{uuid}/input/ 등)
Qdrant벡터 저장 (4장과 동일)

RabbitMQ 대신 Redis를 쓰는 이유

Celery에 이벤트 메시지를 전달하는 브로커는 RabbitMQ, Redis, SQS 등을 고를 수 있습니다. 이 프로젝트는 초당 수만 건의 메시지보다 한 건이 수 분~수십 분 걸리는 임베딩 잡이 중심입니다. 즉 큐에 쌓이는 건 작업 지시 한 줄로 가볍지만 이로 인해 발생하는 무거운 연산은 워커가 따로 하죠.

이 부분 때문에 RabbitMQ에서 지원하는 강한 대량 메시지·복잡한 라우팅·강한 메시지 보장까지 필요하진 않아, 이 환경(Mac M1)에서는 redis-server 하나로 브로커를 띄우기 쉬운 Redis가 맞다고 생각하여 적용했습니다. 또 Redis는 아래 기능을 확장할 때도 사용할 수 있어서 좋습니다.

  • Cache: 인기 검색어, 장르별 Top-K 결과처럼 자주 읽히는 응답을 GET search:query_hash 형태로 캐싱해두면, Qdrant와 CLIP 호출을 줄일 수 있습니다.
  • Rate limit: 검색 API에 IP·계정당 분당 N회 제한을 INCR + TTL 키(ratelimit:user:{id}:60s)로 두면, 임베딩 워커 부하와 별도로 읽기 트래픽 폭주를 막을 수 있습니다.

다만 주의할 점은 Redis가 브로커일 때는 중간에 Redis가 죽거나, 설정·운영 실수가 나면 큐에 넣은 작업 지시가 손실될 수 있습니다. RabbitMQ는 그런 상황에서도 메시지를 디스크에 남기고, ack·재전달 기능이 있어 안정성이 더 강하죠. 그래서 더 안정성을 높이고 싶다면 Redis 구조를 유지하면서 실서비스로 옮길 때는 PostgreSQL에 핵심 데이터를 남기고 Redis는 브로커 역할에 초점을 두는 게 좋죠. 5장 로컬 개발에서는 Django 기본 SQLite(db.sqlite3)로 EmbeddingJob 상태만 두고, PostgreSQL은 아직 쓰지 않습니다.

필요한 프로세스

맥에서 개발할 때는 아래 다섯 가지를 같이 띄웁니다.
(QdrantRedis는 작업 편이를 위해 Docker로, DB는 Django가 SQLite 파일로 사용)

#프로세스설명
1RedisCelery 브로커
2Django (runserver 등)API, 업로드·잡 생성
3SQLiteDjango 내부 DB (db.sqlite3). EmbeddingJob 상태(pending / processing / running / done / failed)
4Celery worker임베딩 실행
5Qdrant벡터 DB

M1 16 GB 기준으로 워커는 메모리를 많이 쓰므로 --concurrency=1부터 두는 편이 좋습니다.

# Redis Docker
docker run -d --name redis -p 6379:6379 redis:7-alpine

# Celery worker (Django 프로젝트 루트, 가상환경 활성화 후)
celery -A anime_search worker -l info --concurrency=1

Celery 앱 설정

celery -A anime_search workerembeddings.run_embedding_job 태스크를 실행하려면, Django settings.pyCELERY_*와 프로젝트 루트의 celery.py가 먼저 맞춰져 있어야 합니다.

# settings.py — Celery (임베딩 잡 비동기 실행)
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0").strip()
CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", CELERY_BROKER_URL).strip()
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = int(os.environ.get("CELERY_TASK_TIME_LIMIT", "3600"))
CELERY_WORKER_CONCURRENCY = int(os.environ.get("CELERY_WORKER_CONCURRENCY", "1"))
# 로컬(M1)에서는 위 값보다 `celery worker --concurrency=1` CLI가 실제 동시 실행 수를 정함
CELERY_TASK_ALWAYS_EAGER = os.environ.get("CELERY_TASK_ALWAYS_EAGER", "").strip().lower() in (
    "1",
    "true",
    "yes",
)

anime_search/celery.py(경로는 프로젝트마다 다를 수 있음)에서 앱 이름을 anime_search로 두고, 설정을 읽은 뒤 tasks 모듈을 자동 탐지합니다.

app = Celery("anime_search")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

autodiscover_tasks()INSTALLED_APPS 각 앱의 tasks.py를 스캔해 @shared_task로 붙인 함수(예: embeddings.tasks.run_embedding_job_task)를 워커에 등록합니다. task를 직접 지정하려면 CELERY_IMPORTS를 쓸 수 있지만, 여기서는 기본 자동 탐지를 씁니다.

이제 Django가 run_embedding_job_task.delay(job_public_id)로 큐에 넣으면, HTTP는 public_id를 바로 돌려줍니다. 응답 시점 DB는 pending일 수 있고, transaction.on_commitenqueue_run_job이 성공하면 processing으로 바뀝니다. 워커가 태스크를 집으면 running, 완료 시 done(실패 시 failed)로 SQLiteEmbeddingJob 행을 갱신하고, 관리 화면에서는 그 상태만 폴링하면 됩니다.

프로세스 시퀀스

위에서 보여준 구조에 이벤트를 더 상세히 보여줄 경우 다음과 같습니다.

sequenceDiagram participant Admin as 관리자 participant Django as Django (웹/API) participant DB as DB participant Disk as 스테이징·미디어 디스크 participant Redis as Redis (Celery) participant Worker as Celery Worker participant Qdrant as Qdrant Admin->>Django: 동영상 업로드 (slug, episode 등) Django->>DB: EmbeddingJob 생성 (pending) Django->>Disk: jobs/{uuid}/input/ 에 동영상 저장 Django-->>Admin: public_id 즉시 반환 (DB job_public_id) Note over Django: transaction.on_commit Django->>DB: 스테이징 OK면 processing Django->>Redis: run_embedding_job_task.delay(job_public_id) Redis->>Worker: 태스크 수신 Worker->>DB: running Worker->>Disk: ffmpeg → frames/*.jpg Worker->>Disk: media/{slug}/frames/ 로 승격 Worker->>Worker: CLIP 임베딩 Worker->>Qdrant: 벡터 upsert Worker->>DB: done (실패 시 failed)

위 시퀀스는 5장 전체 프로세스의 흐름입니다.
이어서 Django 쪽 데이터 모델과 업로드와 메시지를 적재하는 코드를 봅니다.

Model 구조

이 프로젝트는 다음 모델 구조로 진행합니다.

  • 장르(Genre) ↔ 애니메이션(Anime) — N:M
    한 장르에 여러 작품이 속할 수 있고, 한 작품도 여러 장르를 가질 수 있습니다. 장르 필터·검색 확장에도 씁니다.
    Django에서는 Animegenres = models.ManyToManyField(blank=True, related_name="animes", to="catalog.Genre")처럼 두면, 조인용 pivot(중간) 테이블이 마이그레이션으로 자동 생성되고 역참조는 genre.animes로 접근합니다.

  • 애니메이션 → 에피소드 — 1:N
    에피소드는 anime_id 외래 키로 부모에 종속됩니다. on_delete=models.CASCADE이므로 애니메이션을 지우면 해당 에피소드도 함께 삭제됩니다. (anime_id, number) 복합 unique로 같은 애니메이션에서 화수가 겹치지 않게 합니다.

  • 임베딩 잡(EmbeddingJob) — Anime·Episode FK
    업로드·Celery 처리 단위이므로 anime_idepisode_id를 모두 둡니다. on_delete=models.PROTECT로, 잡이 남아 있는 동안에는 연결된 애니메이션·에피소드를 실수로 삭제할 수 없습니다. API·워커·Qdrant payload에서 쓰는 public_id(UUID)는 이 테이블의 UK입니다.

erDiagram Genre }o--o{ Anime : "genres M2M(Django에서 pivot 테이블 생성)" Anime ||--o{ Episode : "anime_id FK CASCADE" Episode ||--o{ EmbeddingJob : "episode_id FK PROTECT" Anime ||--o{ EmbeddingJob : "anime_id FK PROTECT" Genre { int id PK string slug UK string label_ko string label_ko_norm UK int sort_order } Anime { int id PK string slug UK string title datetime created_at datetime updated_at } Episode { int id PK int anime_id FK int number "UK with anime_id" string title datetime created_at datetime updated_at } EmbeddingJob { int id PK uuid public_id UK int anime_id FK int episode_id FK string canonical_key string staging_rel_path string status text last_error string celery_task_id datetime created_at datetime updated_at datetime processed_at }

참고로 장르와 애니메이션 M2M 관계로 Django가 자동 생성하는 pivot 테이블(catalog_anime_genres) 구조는 다음과 같습니다. id 값과 index, unique 제약조건만 가진 최소한의 테이블을 자동으로 만들어 줘서 편하네요.

sqlite> .schema catalog_anime_genres
CREATE TABLE IF NOT EXISTS "catalog_anime_genres" (
    "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
    "anime_id" bigint NOT NULL
        REFERENCES "catalog_anime" ("id")
        DEFERRABLE INITIALLY DEFERRED,
    "genre_id" bigint NOT NULL
        REFERENCES "catalog_genre" ("id")
        DEFERRABLE INITIALLY DEFERRED
);

CREATE UNIQUE INDEX "catalog_anime_genres_anime_id_genre_id_c8680c2f_uniq"
    ON "catalog_anime_genres" ("anime_id", "genre_id");

CREATE INDEX "catalog_anime_genres_anime_id_c2f96e25"
    ON "catalog_anime_genres" ("anime_id");

CREATE INDEX "catalog_anime_genres_genre_id_8b6a7b27"
    ON "catalog_anime_genres" ("genre_id");

모델·ER 구조를 본 뒤, HTTP 업로드 API에서 EmbeddingJob을 만들고 Celery 큐에 넣는 코드로 이어집니다.

업로드 (Django API)

동영상 업로드 흐름은 핵심 코드만 짚어 봅니다. 먼저 catalog/urls.py입니다.

...

urlpatterns = [
    # 메인 페이지, 잡 목록 보기 페이지랑 뷰 겸용
    path("", views.job_console, name="catalog_home"),
    # 장르 리스트랑 수정, 등록 기능
    path("genres/", views.genre_list, name="catalog_genre_list"),
    path("genres/new/", views.genre_new, name="catalog_genre_new"),
    path("genres/bulk-new/", views.genre_bulk_new, name="catalog_genre_bulk_new"),
    path("genres/<int:pk>/edit/", views.genre_edit, name="catalog_genre_edit"),
    # 업로드
    path("upload/", views.anime_upload, name="catalog_anime_upload"),
    # 업로드 후 영상 미리보기
    path(
        "jobs/<uuid:public_id>/preview/<str:filename>",
        views.serve_uploaded_video,
        name="catalog_job_video_preview",
    ),
    # Job 목록 페이지
    path("jobs/", views.job_console, name="catalog_jobs"),
    # Job JSON API
    # Job 목록 가져오기
    path("api/jobs/", views.job_api_list, name="catalog_job_api_list"),
    # Job 수동 실행(자동 enqueue 외에 재실행·지연 실행용): job_api_run → enqueue_run_job
    path(
        "api/jobs/<uuid:public_id>/run/",
        views.job_api_run,
        name="catalog_job_api_run",
    ),
    # Job 삭제하기
    path(
        "api/jobs/<uuid:public_id>/delete/",
        views.job_api_delete,
        name="catalog_job_api_delete",
    ),
]

동영상이 없어 auto enqueue가 실패한 pending 잡은 api/jobs/<uuid>/run/(job_api_run)으로 enqueue_run_job을 다시 호출할 수 있습니다.

장르 등록과 편의성을 위한 Job 목록 API에 대한 설명은 생략하고, views.anime_upload 중심으로 봅니다. POST일 때 SQLiteAnime·Episode·EmbeddingJob을 쓰고, 동영상은 디스크 스테이징(jobs/…/input/)에 저장합니다. HTTP 응답을 보낸 뒤 transaction.on_commit으로 schedule_auto_enqueue_on_commit이 실행되며, 조건이 맞으면 run_embedding_job_task.delay(job_public_id)가 Celery 브로커(Redis)에 작업 메시지만 넣습니다.

catalog/views.pyanime_upload는 유효성 검사 → 스트리밍 저장 → 큐 예약 순입니다. transaction.atomic() TODO처럼 DB 행과 파일 저장의 원자성은 아직 완전히 묶지 않았고 추후 보완할 예정입니다.

@require_http_methods(["GET", "POST"])
def anime_upload(request: HttpRequest) -> HttpResponse:
    # 로그인 필요 여부 체크, 개발 단계에서는 체크 안함
    gate = _ui_gate(request)
    if gate:
        return gate

    # 애니메이션 시리즈 구분하기 위한 리스트 가져오기
    animes = list(Anime.objects.order_by("slug"))
    # 장르 리스트 가져오기
    genres = list(Genre.objects.order_by("sort_order", "slug"))

    # 업로드
    if request.method == "POST":
        slug = (request.POST.get("anime_slug") or "").strip()
        # slug 유효성 검사
        if not is_valid_anime_id(slug):
            msg = "시리즈 slug는 영문·숫자·_- 만 1~255자여야 합니다."
            if _wants_json(request):
                return JsonResponse({"detail": msg}, status=400)
            messages.error(request, msg)
            return redirect("catalog_anime_upload")

        # 애니메이션이 없으면 새로 만들고 있으면 그대로 사용
        title = (request.POST.get("title") or "").strip()
        anime, _ = Anime.objects.get_or_create(slug=slug, defaults={"title": title})
        # title은 선택(없으면 slug만으로 생성)
        if title:
            anime.title = title
            anime.save(update_fields=["title", "updated_at"])

        # 장르 N:M 연결
        ids = [int(x) for x in request.POST.getlist("genre_ids") if x.isdigit()]
        anime.genres.set(Genre.objects.filter(pk__in=ids))

        # 에피소드 유효성 검사
        ep_raw = (request.POST.get("episode") or "").strip()
        if not ep_raw:
            msg = "episode(화수)는 필수입니다."
            if _wants_json(request):
                return JsonResponse({"detail": msg}, status=400)
            messages.error(request, msg)
            return redirect("catalog_anime_upload")
        try:
            episode_number = int(ep_raw)
            if episode_number < 1:
                raise ValueError
        except ValueError:
            msg = "episode는 1 이상 정수여야 합니다."
            if _wants_json(request):
                return JsonResponse({"detail": msg}, status=400)
            messages.error(request, msg)
            return redirect("catalog_anime_upload")

        episode_row = get_or_create_episode(anime=anime, number=episode_number)
        # Job 생성(anime·episode FK로 연결)
        job = EmbeddingJob.objects.create(anime=anime, episode=episode_row)
        # 잡 스테이징 디렉터리(jobs/<id>/frames, input/) 생성
        frames_leaf = ensure_job_staging_dirs(job)
        input_dir = staging_input_dir_for_job_frames(frames_leaf)

        dest_name: str | None = None
        f = request.FILES.get("video")
        if f:
            try:
                dest_name = _safe_video_name(f.name)
            except ValueError as exc:
                job.delete()
                if _wants_json(request):
                    return JsonResponse({"detail": str(exc)}, status=400)
                messages.error(request, str(exc))
                return redirect("catalog_anime_upload")
            # pathlib로 경로 조합(문자열 split/join 대신)
            dest = input_dir / dest_name
            with dest.open("wb") as out:
                # Django의 chunks()로 스트리밍 저장(기본 청크 64KB)
                for chunk in f.chunks():
                    out.write(chunk)

        # TODO: DB·파일 저장을 transaction.atomic() 등으로 묶기
        # DB 커밋 후 Celery enqueue 예약(on_commit; atomic 블록 안이면 즉시 실행)
        schedule_auto_enqueue_on_commit(job.public_id)

        if _wants_json(request):
            payload: dict[str, str | None] = {
                "public_id": str(job.public_id),
                "filename": dest_name,
                "preview_url": _video_preview_url(request, job, dest_name) if dest_name else None,
                "jobs_url": request.build_absolute_uri(reverse("catalog_jobs")),
            }
            if dest_name:
                payload["message"] = f"작업 생성됨 · {job.public_id} · 동영상 저장됨 · 처리 큐에 넣는 중"
            else:
                payload["message"] = (
                    f"작업 생성됨 · {job.public_id} · input 폴더에 동영상을 넣은 뒤 "
                    f"/jobs/ 에서 실행하세요"
                )
            return JsonResponse(payload)

        if dest_name:
            messages.success(
                request,
                f"작업 생성됨 · {job.public_id} · 동영상 저장됨 · 처리 큐에 넣는 중",
            )
        else:
            messages.success(
                request,
                f"작업 생성됨 · {job.public_id} · input 폴더에 동영상을 넣은 뒤 /jobs/ 에서 실행하세요",
            )
        return redirect("catalog_jobs")

    # 업로드 페이지 보여주기
    return render(
        request,
        "catalog/anime_upload.html",
        {"animes": animes, "genres": genres},
    )

업로드가 DB에 커밋되면 schedule_auto_enqueue_on_committry_auto_enqueue_jobenqueue_run_job 순으로 이어지고, 마지막에 delay()Redis 브로커 큐에 작업 메시지를 넣습니다.

schedule_auto_enqueue_on_committransaction.on_commit으로 커밋 후 try_auto_enqueue_job을 호출합니다.

def schedule_auto_enqueue_on_commit(public_id: UUID) -> None:
    """DB 커밋 후 자동 enqueue."""
    pid = public_id

    def _enqueue() -> None:
        try_auto_enqueue_job(pid)

    transaction.on_commit(_enqueue)

try_auto_enqueue_jobenqueue_run_job을 호출합니다. on_commit 스케줄링과 enqueue 로직을 나눈 구조입니다.

def try_auto_enqueue_job(public_id: UUID) -> bool:
    """
    pending 잡을 Celery에 넣는다.
    input/ 동영상 없음 등으로 거부되면 False, enqueue 성공 시 True.
    """
    from embeddings.services.job_dispatch import JobDispatchError, enqueue_run_job

    try:
        enqueue_run_job(public_id)
    except JobDispatchError as exc:
        logger.info("auto enqueue skipped public_id=%s: %s", public_id, exc)
        return False
    except Exception:
        logger.exception("auto enqueue failed public_id=%s", public_id)
        return False
    return True

실제 enqueue는 embeddings/services/job_dispatch.pyenqueue_run_job에서 합니다.

...

class JobDispatchError(Exception):
    """enqueue 거부 (상태·중복 등)."""

    def __init__(self, message: str, *, status_code: int = 400) -> None:
        super().__init__(message)
        self.status_code = status_code


@dataclass(frozen=True)
class EnqueueResult:
    public_id: str
    celery_task_id: str
    status: str


def _save_task_id(job: EmbeddingJob, async_result) -> str:
    task_id = str(async_result.id)
    job.celery_task_id = task_id
    job.save(update_fields=["celery_task_id", "updated_at"])
    return task_id


def _require_input_video(job: EmbeddingJob) -> None:
    err = check_job_input_video(job)
    if err:
        raise JobDispatchError(err, status_code=400)


def enqueue_run_job(public_id: UUID) -> EnqueueResult:
    """
    pending 잡을 검증·processing 표시 후 Celery에 단건 실행을 넣음
    """
    from embeddings.tasks import run_embedding_job_task

    with transaction.atomic():
        # 잡 조회 (select_for_update로 행 잠금)
        job = (
            EmbeddingJob.objects.select_for_update()
            .filter(public_id=public_id)
            .first()
        )
        if job is None:
            raise JobDispatchError("작업을 찾을 수 없습니다.", status_code=404)

        # 잡 상태 검증
        if job.status == EmbeddingJob.Status.PROCESSING:
            raise JobDispatchError(
                f"이미 처리 중입니다 (task={job.celery_task_id or '—'})",
                status_code=409,
            )
        if job.status != EmbeddingJob.Status.PENDING:
            raise JobDispatchError(
                f"pending 만 실행 가능합니다 (현재 {job.status})",
                status_code=400,
            )
        # 스테이징 폴더(frames/, input/)만 생성
        ensure_job_staging_dirs(job)
        # 큐에 넣기 전 input/ 동영상 필수 체크
        _require_input_video(job)

        # 잡 상태 업데이트(processing)
        job.status = EmbeddingJob.Status.PROCESSING
        job.last_error = ""
        job.save(update_fields=["status", "last_error", "updated_at"])

    # Celery에 해당 작업을 넣음, Redis 브로커에 메시지 적재
    async_result = run_embedding_job_task.delay(str(public_id))
    task_id = _save_task_id(job, async_result)
    return EnqueueResult(
        public_id=str(public_id),
        celery_task_id=task_id,
        status=job.status,
    )

여기까지가 Django에서 큐에 넣는 부분입니다. enqueue_run_job이 끝나면 EmbeddingJobprocessing이고 celery_task_id가 저장되며, 브로커(Redis)에 run_embedding_job_task 메시지가 올라갑니다. 워커가 태스크를 집으면 running, 끝나면 done 또는 failed입니다. 상태만 다시 정리하면 아래와 같습니다.

상태누가·언제의미
pending업로드 직후 (Django)잡·스테이징 폴더만 있음. 동영상 없으면 수동 실행 대기
processingenqueue_run_job 성공 직후 (Django)Celery에 넣었고, 워커가 아직 안 집음
running워커가 태스크 시작 (Celery)ffmpeg·CLIP·Qdrant upsert 등 파이프라인 실행 중
done워커 완료벡터 적재·스테이징 정리까지 끝
failed워커 실패last_error에 원인 기록

Django에서 큐에 넣는 흐름까지 봤으니 이제 워커가 Redis에서 메시지를 가져와 4장 파이프라인을 실행하는 부분으로 넘어갑니다.

Celery worker & Redis (임베딩 실행)

Celery와 Redis 큐 절의 Producer–Broker–Consumer 흐름을 redis-cli MONITOR로 실제 Redis 명령에 맞춰 확인합니다.

다음은 작업(task) 기준으로 생산자, 메시지 브로커, 소비자 패턴을 나타냈습니다.

flowchart LR subgraph producer["발행 Producer"] Django["Django\nenqueue_run_job"] CeleryClient["Celery 클라이언트\n.delay()"] end subgraph broker["메시지 브로커 Message Broker"] Redis[("Redis\n작업 메시지 큐")] end subgraph consumer["소비 Consumer"] Worker["Celery worker\nrun_embedding_job_task"] Pipeline["4장 파이프라인\nffmpeg · CLIP · Qdrant"] end DB[("SQLite\nEmbeddingJob 상태")] Django --> CeleryClient CeleryClient -->|"publish\n태스크명 + public_id"| Redis Redis -->|"consume\nFIFO에 가깝게"| Worker Worker --> Pipeline Django -.->|"잡 메타·상태"| DB Worker -.->|"running → done/failed"| DB

앞에서 enqueue_run_job이 호출한 run_embedding_job_task.delay(...)는 지금 당장 run_embedding_job_task 본문을 실행하지 않습니다. Redis 브로커에 작업 메시지를 적재합니다.

delay() 직후 Redis에서 무엇이 오가는지 보려면 redis-cli MONITOR(Docker면 docker exec -it redis redis-cli MONITOR)를 켜두면 동영상을 업로드했을 때 Redis에서 실행되는 명령어들을 확인할 수 있는데, PING·CLIENT SETINFO는 연결 유지용이라 무시해도 되고 임베딩 잡과 직접 관련된 건 아래 세 가지입니다.

순서Redis 명령의미
1LPUSH celery …브로커 큐에 태스크 메시지 적재(delay() 본체). 워커가 실행할 일을 넣음
2HSET unacked / ZREM unacked워커가 메시지를 가져간 뒤 처리 완료 응답을 주지 않은 상태
3SETEX celery-task-meta-<task_id>Celerytask 상태를 Redis에 저장할 때 사용

LPUSH celery로 적재되는 메시지의 내용은 다음과 같습니다. 이 내용 전체가 Redis에 메시지로 저장됩니다.

{
  "body": "W1siMGQzOGFjNjMtZDM5OS00MzcxLWJjM2EtOTRlMjJhMjIyNGQwIl0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==",
  "content-encoding": "utf-8",
  "content-type": "application/json",
  "headers": {
    "lang": "py",
    "task": "embeddings.run_embedding_job",
    "id": "2ed7a75a-4c40-459a-9275-7c49f3cddf17",
    "shadow": null,
    "eta": null,
    "expires": null,
    "group": null,
    "group_index": null,
    "retries": 0,
    "timelimit": [null, null],
    "root_id": "2ed7a75a-4c40-459a-9275-7c49f3cddf17",
    "parent_id": null,
    "argsrepr": "('0d38ac63-d399-4371-bc3a-94e22a2224d0',)",
    "kwargsrepr": "{}",
    "origin": "gen32524@jiseunglyeol-ui-MacBookAir.local",
    "ignore_result": false,
    "replaced_task_nesting": 0,
    "stamped_headers": null,
    "stamps": {}
  },
  "properties": {
    "correlation_id": "2ed7a75a-4c40-459a-9275-7c49f3cddf17",
    "reply_to": "9950199d-4293-39ea-bd9e-5a400d686cef",
    "delivery_mode": 2,
    "delivery_info": {
      "exchange": "",
      "routing_key": "celery"
    },
    "priority": 0,
    "body_encoding": "base64",
    "delivery_tag": "88c17946-d9c8-4704-9035-d8e204de4c28"
  }
}

위 데이터에서 중점적으로 봐야 할 부분은 다음과 같습니다.

  • headers.task: @shared_task(name="embeddings.run_embedding_job")에 쓰인 값과 동일
  • headers.id / properties.correlation_id: Celery 태스크 ID (EmbeddingJob.celery_task_id, UUID 문자열). public_id와는 다름
  • headers.argsrepr: delay(public_id)에 넘긴 public_id를 읽기 쉽게 표시
  • body(base64): 위와 같은 public_id를 실행용으로 직렬화한 값
  • properties.delivery_info.routing_key: 기본 큐 이름 celery

이제 별도 프로세스로 돌고 있는 Celery 워커가 Rediscelery 리스트에서 메시지를 가져온 뒤, 아래 run_embedding_job_task 본문을 실행합니다.

run_embedding_job_task의 코드를 보면 우리가 embeddings/tasks.py에 작성한 일반 함수인데, @shared_task 데코레이터가 함수를 감싸서 Celery가 인식하는 작업(task)로 등록하고 .delay()로 브로커에 넣을 수 있게 합니다. 이름은 그대로 run_embedding_job_task이지만, Celery가 .delay()·.apply_async() 같은 메서드를 붙입니다.

그래서 Django에서는 delay()로 큐에 넣고, 워커에서는 같은 이름으로 본문이 돌아가는 거죠. 로컬에서 동기 테스트할 때는 run_embedding_job_task("uuid")처럼 괄호만 써서 직접 호출할 수도 있습니다.

run_embedding_job_task의 내부 로직입니다.
잡 조회 → 상태·입력 검증 → 파이프라인 실행 → 성공/실패 반환 순으로 진행됩니다.

@shared_task(name="embeddings.run_embedding_job")
def run_embedding_job_task(public_id: str) -> dict[str, str]:
    """단건 잡: enqueue_run_job에서 processing으로 올린 뒤 워커가 실행."""
    started = time.monotonic()
    uid = UUID(public_id)

    # job에서 select_related를 통해 anime와 같이 join 해서 가져옴
    job = EmbeddingJob.objects.select_related("anime").filter(public_id=uid).first()

    if job is None:
        logger.error("run_embedding_job_task: job not found public_id=%s", public_id)
        return {"public_id": public_id, "status": "not_found"}

    slug = job.anime.slug
    logger.info(
        "embedding job start public_id=%s anime=%s status=%s",
        public_id,
        slug,
        job.status,
    )

    try:
        # Job 상태 체크 
        if job.status != EmbeddingJob.Status.PROCESSING:
            logger.warning(
                "embedding job skip unexpected status public_id=%s status=%s",
                public_id,
                job.status,
            )
            return {"public_id": public_id, "status": job.status}

        # enqueue와 동일: input/ 동영상 필수(ffmpeg 추출 전 재검증)
        input_err = check_job_input_video(job)
        if input_err:
            _mark_failed(job, RuntimeError(input_err))
            return {"public_id": public_id, "status": EmbeddingJob.Status.FAILED}

        # 4장 파이프라인 (보통 running → done/failed는 여기서 갱신)
        run_single_embedding_job(job)
        # 파이프라인이 바꾼 status·필드를 DB에서 다시 읽음
        job.refresh_from_db()
        elapsed = time.monotonic() - started
        logger.info(
            "embedding job done public_id=%s anime=%s status=%s elapsed_sec=%.2f",
            public_id,
            slug,
            job.status,
            elapsed,
        )
        return {"public_id": public_id, "status": job.status}
    except Exception as exc:  # noqa: BLE001
        logger.exception(
            "embedding job failed public_id=%s anime=%s",
            public_id,
            slug,
        )
        job.refresh_from_db()
        if job.status != EmbeddingJob.Status.FAILED:
            _mark_failed(job, exc)
        return {"public_id": public_id, "status": EmbeddingJob.Status.FAILED}

run_embedding_job_task는 앞 Celery 앱 설정으로 등록된 embeddings.run_embedding_job 이름으로 워커에 전달되고, 본문에서 run_single_embedding_job으로 4장 파이프라인을 실행합니다.

정리하자면, Django가 EmbeddingJob을 만들고 enqueue_run_job으로 Celery·Redis에 넣은 뒤, 워커가 ffmpeg·CLIP·Qdrant까지 이어서 처리합니다.

6. 자연어 검색

이 장의 목표는 사용자가 자연어를 입력하면 일치하는 장면을 찾는 전체 프로세스를 완성시키는 겁니다. RAG 시스템답게 프로젝트의 클라이언트 레이어를 씌우는 거죠. 최종적으로 자연어 입력 → (Retrieval) Qdrant 유사도 검색 → LLM·UI로 표시까지 만드는 겁니다.

RAG

RAG(Retrieval-Augmented Generation)는 LLM이 데이터를 생성해 주는 것에서 더 나아가 먼저 정해진 영역에서 관련 자료를 찾고(Retrieval) 그 결과를 바탕으로 답하거나 보여 주는(Generation) 구조입니다.

  • 검색 (Retrieval): 사용자가 질문하면 AI가 사전에 연결된 문서나 데이터베이스에서 질문과 가장 관련 있는 정보를 찾습니다.
  • 증강 (Augmentation): 검색된 정보와 사용자의 질문을 결합하여 AI에게 맥락(Context)을 제공합니다.
  • 생성 (Generation): AI가 제공받은 정보를 바탕으로 정확하고 신뢰할 수 있는 답변을 만들어냅니다.

이 프로젝트에서 미리 준비하는 일에 해당하는 것은 장면을 색인하는 작업이었죠. 이 부분은 4장5장에서 진행했습니다. 프레임을 CLIP으로 벡터화하고 Qdrantupsert하면서 검색할 수 있는 형태로 DB에 쌓았죠. 벡터 DB 맥락에서 이 과정을 색인(indexing)이라고 부르며, GET /collections/...points_count·payload_schema로 색인된 데이터의 수를 파악할 수 있었습니다.

후에 추가 학습을 통해 더 검색을 정확히 하거나 알맞은 모델을 찾는 과정을 보며 퀄리티를 높일 수 있지만 아직은 RAG 시스템을 구현하는 부분까지를 목표로 합시다.

LLM 연결

애니메이션 장면을 찾을 때 챗봇이나 여타 다른 LLM 서비스처럼 채팅 형태로 검색하는 작업이 이뤄져야 합니다. 그러기 위해서는 LLM 모델 연결이 간단하고 직관적인 방법이죠. CLIP 같은 멀티 모달 모델은 LLM 모델보다 비교적 가볍기 때문에 내장하는 방향으로 진행되었습니다.

구분CLIP (표준 모델 기준)초경량 LLM (3B~8B 기준)
파라미터(매개변수) 수1.5억 ~ 4억 개 (150M ~ 400M)30억 ~ 80억 개 (3B ~ 8B)
필요 메모리(RAM)약 0.5GB ~ 1.5GB약 2.5GB ~ 6GB (4-bit 양자화 시)
연산 방식단발성 연산: 한 번에 쓱 읽고 특징(벡터)만 추출하고 끝납니다.순차적 생성: 단어를 하나씩 예측하며 출력하므로 연산이 계속 누적됩니다.
시스템 부하매우 가벼움매우 무거움 (병목의 주원인)

Google에서 무료 Gemini key를 발급해 주니 이를 이용해 보도록 합시다. 물론 제약사항이 존재하지만 MVP 단계에서는 충분히 사용을 고려해 볼 만합니다.

모델속도 제한 (RPM)일일 제한 (RPD)
Gemini Flash (예: 2.0 Flash 등)분당 최대 15회 요청하루 최대 1,500회 요청
Gemini Pro (예: 2.5 Pro 등)분당 최대 2회 요청하루 최대 50회 ~ 100회 요청 (모델 및 정책에 따라 변동 가능)

이렇게 가져온 LLM이 담당할 로직은 사용자가 입력한 텍스트를 분석해서, 결과적으로 벡터와 매칭할 수 있는 형태로 가공하는 겁니다. 예를 들면 다음 사진처럼 채팅을 통해 원하는 장면을 찾는 겁니다. chat

프로세스 흐름은 아래처럼 진행되며 채팅 내역을 모아서 사용자와 대화해주는 LLM 영역 하나와 대화 내용 전반을 전부 가져온 뒤 CLIP과 비교 가능한 형태로 가공하는 LLM 부분으로 총 2곳에 연결했습니다.

하나로도 충분히 제어가 되지만, 한국어 대화와 CLIP용 검색 문장을 나눠서 정확도를 높이고자 했습니다.

  1. 대화와 맥락을 보고 사용자가 원하는 장면을 search_query 문장으로 만듦 (G1)
  2. 그 문장을 CLIP이 비교하기 쉬운 영어 시각 묘사로 바꿈 (G2)
sequenceDiagram participant Browser participant Django as views.chat_api participant Orch as chat_orchestrator participant DB as ChatMessage participant G1 as Gemini_대화 participant Search as run_scene_search participant G2 as Gemini_번역 participant CLIP as Celery_CLIP participant Qdrant Browser->>Django: POST /api/search/chat/ {message, session_id?} Django->>Orch: get_or_create_session + run_chat_turn Orch->>DB: USER/ASSISTANT 최근 N건 조회 Orch->>Orch: _build_user_prompt (이전 대화 + 현재 메시지) Orch->>G1: generate_content(system + tools + user prompt) alt search_scenes 도구 호출 G1-->>Orch: function_call search_scenes Orch->>Search: run_scene_search(search_query_ko, ...) Search->>G2: translate_search_query_for_clip G2-->>Search: clip_query_en Search->>CLIP: encode_search_query_task CLIP->>Qdrant: search_segments Search-->>Orch: scenes JSON Orch->>G1: generate_content(user + function_call + tool 응답) end G1-->>Orch: reply 텍스트 Orch->>DB: USER / TOOL / ASSISTANT 저장 Orch-->>Django: reply, scenes Django-->>Browser: JSON

프론트 부분은 생략하고, 채팅 입력 후 서버는 chat_api에서 run_chat_turn을 호출합니다.

@csrf_exempt
@require_POST
def chat_api(request: HttpRequest) -> JsonResponse:
    # session·message 파싱, get_or_create_session (생략)

    try:
        reply, scenes = run_chat_turn(session=session, user_message=message, request=request)
    except ChatOrchestratorError as exc:
        return JsonResponse({"detail": str(exc)}, status=503)
    except Exception as exc:  # noqa: BLE001
        return JsonResponse({"detail": str(exc)}, status=500)

    return JsonResponse(
        {
            "session_id": str(session.id),
            "reply": reply,
            "scenes": scenes,
        }
    )

아래는 채팅 오케스트레이터 핵심만 발췌한 코드입니다.
유효성 검사·DB 저장 등은 # ... (생략)으로 표시합니다.

run_chat_turngoogle.genaigenerate_content를 호출합니다. search_scenes 도구 정의를 넘기고, 필요 시 Django가 run_scene_search로 CLIP·Qdrant 검색을 합니다. 영어 변환은 translate_search_query_for_clip에서 합니다.

검색 결과는 types.Part.from_function_response로 래핑해서 role="tool"과 함께 두 번째 generate_content에 넣어 최종 한국어 답(response.text)을 받습니다.

# google-genai SDK: Gemini 요청/응답 타입 (Content, Part, GenerateContentConfig 등)
from google.genai import types

...
# 한국어 대화 + search_scenes 호출 규칙 (search_query는 시각 묘사만)
SYSTEM_PROMPT = """당신은 애니메이션 장면 검색 도우미입니다.
- 사용자와 한국어로 대화합니다.
- 장면을 찾아야 할 때 search_scenes 도구를 호출하세요.
- search_query에는 잡담을 빼고 시각적으로 그릴 수 있는 장면만 한국어로 1~2문장 적으세요.
- CLIP 검색용 영어 변환은 서버 파이프라인에서 처리합니다. search_query는 한국어 시각 묘사만 넣으세요.
- 도구 결과에 없는 타임스탬프·URL·제목을 만들지 마세요.
"""

...

# generate_content 호출 시 공통 옵션
_GENERATE_CONFIG = types.GenerateContentConfig(
    tools=[SEARCH_SCENES_TOOL],  # search_scenes 도구 정의
    system_instruction=SYSTEM_PROMPT,
    # False → SDK가 도구를 대신 실행하지 않음. 서버에서 _execute_search_scenes로 실행
    automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)

...
# 채팅 분석·검색·반환까지의 오케스트레이터
def run_chat_turn(
    *,
    session: ChatSession,
    user_message: str,
    request: HttpRequest,
) -> tuple[str, list[dict[str, Any]]]:  # (채팅 답변 텍스트, 장면 목록 JSON)
    try:
        client = get_gemini_client()
    except RuntimeError as exc:
        raise ChatOrchestratorError(str(exc)) from exc
    model = get_gemini_model_id()

    prompt = _build_user_prompt(session, user_message)
    # types.Content: 대화 한 턴 (role=user|model|tool)
    user_content = types.Content(
        role="user",
        parts=[types.Part.from_text(text=prompt)],  # 텍스트 조각
    )

    scenes: list[dict[str, Any]] = []  # 프론트와 DB에 넘길 검색 결과 
    searched = False  # search_scenes가 한 번이라도 호출됐는지
    pending_tools: list[_PendingToolMessage] = []

    # Gemini의 텍스트 응답 또는 function_call(search_scenes)
    response = client.models.generate_content(
        model=model,
        contents=[user_content],
        config=_GENERATE_CONFIG,
    )

    # 도구 호출이 있으면 서버가 실행 후 결과를 다시 Gemini에 넘김 (최대 3라운드)
    for _ in range(3):
        fn_calls = list(response.function_calls or [])
        if not fn_calls:
            break

        function_call_content = response.candidates[0].content if response.candidates else None
        if function_call_content is None:
            break

        # 도구 실행 결과, Gemini에 다시 보낼 Part 모음
        tool_response_parts: list[types.Part] = []  

        for fc in fn_calls:
            name = fc.name or ""
            # search_scenes 도구 호출 시
            if name == "search_scenes":

                searched = True
                args = _function_call_args(fc)  # search_query 등 인자 파싱

                # 실질 Retrieval: 영어 변환·CLIP·Qdrant 검색
                tool_json, new_scenes, query_ko, query_en, trace_id = _execute_search_scenes(
                    args, request, session=session
                )
                scenes = _merge_scene_lists(scenes, new_scenes)

                # TOOL 메시지 DB 저장용 (생략)

                tool_response_parts.append(
                    types.Part.from_function_response(
                        name=name,
                        response={"result": tool_json},
                    )
                )
            # 지원하지 않는 도구 → error JSON (생략)
        if not tool_response_parts:
            break

        # 코드로 도구 실행 후 tool 결과를 LLM에 넘겨 최종 reply 생성
        response = client.models.generate_content(
            model=model,
            contents=[
                user_content,
                function_call_content,
                types.Content(role="tool", parts=tool_response_parts),
            ],
            config=_GENERATE_CONFIG,
        )

    reply_text = (response.text or "").strip()
    # 답이 비면 scenes 개수 기준 기본 문구 (생략)
    # USER / TOOL / ASSISTANT 순 ChatMessage 저장 (생략)

    return reply_text, scenes

모델에서 도구 search_scenes를 호출하면 _execute_search_scenes가 인자를 넘기고 run_scene_search가 번역·CLIP·Qdrant 등의 파이프라인을 돌립니다.

def _execute_search_scenes(
    args: dict[str, Any],
    request: HttpRequest,
    *,
    session: ChatSession,
) -> tuple[str, list[dict[str, Any]], str, str, UUID | None]:
    search_query = (args.get("search_query") or "").strip()
    # search_query 빈 값 early return (생략)
    # anime_id·episode·genre_slugs 인자 정규화 (생략)

    clip_query, scenes = run_scene_search(
        search_query=search_query,
        anime_slug=...,  # anime_id·episode·genre_slugs 정규화 결과
        episode=...,
        genre_slugs=...,
        tracer=...,
        request=request,
    )
    # SearchPipelineError 시 error JSON 반환 (생략)

    payload = {"count": len(scenes), "scenes": scenes, "search_query_ko": search_query, ...}
    return json.dumps(payload, ensure_ascii=False), scenes, search_query, clip_query, trace_id

run_scene_search에서 검색에 관련된 로직들을 실행합니다.
한글을 영어로 변경 → CLIP 인코딩 → 벡터 검색 → 시간대 묶기 → 점수와 개수로 걸러내기 → 반환 형태로 가공

def run_scene_search(
    *,
    search_query: str,
    anime_slug: str | None = None,
    episode: int | None = None,
    genre_slugs: list[str] | None = None,
    limit: int = 50,
    tracer: PipelineTracer | None = None,
    request: HttpRequest,
) -> tuple[str, list[dict]]:
    ko = (search_query or "").strip()

    clip_query = translate_search_query_for_clip(ko)  # CLIP용 영어 쿼리
    vec, _ = _encode_via_celery(clip_query)  # CLIP으로 인코딩

    hits = search_segments(
        query_vector=vec,
        limit=limit,
        anime_slug=anime_slug,
        episode=episode,
        genre_slugs=genre_slugs,
    )

    segments = merge_frame_hits(hits, gap_sec=..., max_segments=...)
    segments, _ = filter_segments_for_display(segments)

    scenes, _ = present_scenes(segments, request)

    return clip_query, scenes

run_scene_search 안에서 호출하는 핵심 함수는 아래와 같습니다.

  • hit: Qdrant에서 맞아 떨어진 프레임 1장 단위 결과(유사도 점수·메타데이터 포함)
  • segment: 시간이 가까운 hit를 묶은 장면 구간
함수기능
translate_search_query_for_clip한국어 search_query를 CLIP 검색용 영어 문장으로 변환 (시퀀스의 G2)
_encode_via_celeryCelery 워커에서 CLIP encode를 돌려 쿼리 벡터를 만듦
search_segmentsQdrant에서 벡터 유사도 검색, 프레임 단위 hit 목록 반환
merge_frame_hits시간적으로 가까운 hit을 구간(segment)으로 묶음
filter_segments_for_display최소 점수·최대 개수 등으로 후보 구간을 걸러냄
present_scenessegment를 UI용 scene 카드(JSON)로 변환 (URL·썸네일 등)

CLIP용 영문 변환은 translate_search_query_for_clip입니다.

_TRANSLATE_SYSTEM: Final[str] = """You convert anime scene search phrases into concise English visual descriptions for CLIP text-image matching.
Rules:
- Output English only, 1-2 short sentences.
- Describe only what can be seen in a frame (characters, pose, setting, lighting, colors).
- Do not include episode numbers, titles, URLs, or conversational filler.
- No quotes or markdown."""

def translate_search_query_for_clip(search_query: str) -> str:
    """CLIP용 영어 시각 묘사로 변환. 실패 시 원문 반환."""
    raw = (search_query or "").strip()
    # 파라미터 유효성 검사 (생략)

    try:
        client = get_gemini_client()
        # 위 영문 명령어로 영문 생성
        response = client.models.generate_content(
            model=get_gemini_model_id(),
            contents=raw,
            config=types.GenerateContentConfig(
                system_instruction=_TRANSLATE_SYSTEM,
                temperature=0.1,
            ),
        )
        translated = (response.text or "").strip()
        if translated:
            return translated
    except Exception as exc:  # noqa: BLE001
        logger.warning("search_query EN translate failed, using original: %s", exc)

    return raw

영문 문장을 CLIP으로 벡터화합니다. Celery task로 넣지만, 검색 경로에서는 get()으로 동기 대기합니다.

def _encode_via_celery(search_query: str) -> tuple[list[float], int]:
    q = (search_query or "").strip()
    # empty 검사, eager/timeout (생략)
    async_result = encode_search_query_task.apply_async(args=[q])
    vec = async_result.get(timeout=timeout)
    return vec, ms

...

@shared_task(name="discovery.encode_search_query")
def encode_search_query_task(search_query: str) -> list[float]:
    worker = get_vision_worker()
    return worker.encode_text_query(search_query).tolist()

메타데이터 필터를 걸고 Qdrant에서 벡터 유사도 검색합니다.

def search_segments(
    *,
    query_vector: list[float],
    limit: int = 20,
    anime_slug: str | None = None,
    episode: int | None = None,
    genre_slugs: list[str] | None = None,
) -> list[dict[str, Any]]:
    client, collection = get_qdrant_client_and_collection()
    if client is None or collection is None:
        return []
    flt = build_search_filter(anime_slug=anime_slug, episode=episode, genre_slugs=genre_slugs)
    lim = min(max(1, limit), 100)

    # qdrant 쿼리
    try:
        response = client.query_points(
            collection_name=collection,
            query=query_vector,
            limit=lim,
            query_filter=flt,
            with_payload=True,
        )
        points = list(response.points or [])
    except Exception as exc:  # noqa: BLE001
        logger.warning("Qdrant search 실패: %s", exc)
        return []

    # 딕셔너리로 변경
    out: list[dict[str, Any]] = []
    for point in points:
        out.append(
            {
                "id": str(point.id),
                "score": float(point.score or 0.0),
                "payload": dict(point.payload or {}),
            }
        )
    return out

merge_frame_hitssearch_segments가 돌려준 hit(프레임 1장)을 시간순으로 훑으며, 가까운 시각끼리 segment(장면 구간)로 묶습니다.

cluster는 지금 같은 장면으로 보는 hit를 임시로 쌓아 두는 리스트이고, flush는 cluster가 끊길 때(작품·화 변경, gap_sec보다 시각이 멀어짐, 마지막 순회 후) 모인 hit를 SceneSegment 한 구간으로 만들어 out에 넣는 함수입니다.

def merge_frame_hits(
    hits: list[dict[str, Any]],  # search_segments가 준 프레임 단위 hit 목록
    *,
    gap_sec: float = 1.5,  # 이 초보다 멀면 다른 장면 구간으로 봄 (설정 SEGMENT_MERGE_GAP_SEC)
    max_segments: int = 8,
) -> list[SceneSegment]:  # UI·filter에 넘길 segment(장면 구간) 목록
    # hit → (유사도 점수, payload). 재생 시각이 없는 포인트는 제외
    parsed: list[tuple[float, dict[str, Any]]] = []
    for h in hits:
        p = dict(h.get("payload") or {})
        if "timestamp_sec" not in p:
            continue
        parsed.append((float(h.get("score") or 0.0), p))
    if not parsed:
        return []

    # 같은 작품·화·잡끼리 모은 뒤, timestamp_sec 순으로 정렬
    parsed.sort(key=lambda x: (_key(x[1]), float(x[1]["timestamp_sec"])))
    out: list[SceneSegment] = []
    cluster: list[tuple[float, dict[str, Any]]] = []  # 지금 묶는 중인 hit 묶음
    ckey: tuple[str, int | None, str] | None = None  # _key(p): 작품·화·잡 식별

    def flush() -> None:
        nonlocal cluster, ckey
        if not cluster:
            return
        best_score, best_p = max(cluster, key=lambda x: x[0])  # 구간 대표 점수·프레임
        times = [float(x[1]["timestamp_sec"]) for x in cluster]
        out.append(
            SceneSegment(
                anime_id=str(best_p.get("anime_id") or ""),
                episode=int(best_p["episode"]) if best_p.get("episode") is not None else None,
                job_public_id=str(best_p.get("job_public_id") or ""),
                frame_file=str(best_p.get("frame_file") or ""),
                start_sec=min(times),  # 구간 시작·끝 재생 시각(초)
                end_sec=max(times),
                peak_sec=float(best_p["timestamp_sec"]),  # 썸네일·대표 프레임 시각
                score=best_score,
            )
        )
        cluster = []
        ckey = None

    for score, p in parsed:
        key = _key(p)
        ts = float(p["timestamp_sec"])
        if ckey is None:
            ckey, cluster = key, [(score, p)]
            continue
        if key != ckey:
            # 작품·화가 바뀌면 이전 구간 마무리 후 새 cluster
            flush()
            ckey, cluster = key, [(score, p)]
            continue
        if ts - float(cluster[-1][1]["timestamp_sec"]) > gap_sec:
            # 1.5초(기본)보다 떨어져 있으면 같은 장면이 아니라고 보고 구간 분리
            flush()
            ckey, cluster = key, [(score, p)]
        else:
            # 가까운 시각의 hit → 한 segment 후보에 계속 추가
            cluster.append((score, p))
    flush()  # 마지막 cluster 반영
    out.sort(key=lambda s: s.score, reverse=True)
    return out[:max_segments]  # 상위 max_segments개 구간만

filter_segments_for_display는 점수 미달·상위 N개 조건으로 구간을 걸러냅니다.

def filter_segments_for_display(
    segments: list[SceneSegment],
) -> tuple[list[SceneSegment], list[dict]]:
    """점수 미달 탈락 후 상위 N개만 반환. (kept, dropped with reason)."""
    min_score = float(getattr(settings, "SEARCH_MIN_SCORE", 0.24))
    max_scenes = int(getattr(settings, "SEARCH_MAX_SCENES", 12))
    kept: list[SceneSegment] = []
    dropped: list[dict] = []
    for seg in segments:
        if seg.score < min_score:
            row = _segment_summary(seg)
            row["reason"] = "below_min_score"
            dropped.append(row)
        else:
            kept.append(seg)
    overflow = kept[max_scenes:]
    kept = kept[:max_scenes]
    for seg in overflow:
        row = _segment_summary(seg)
        row["reason"] = "max_scenes_cap"
        dropped.append(row)
    return kept, dropped

위 파이프라인까지 구현해 6장 초반 화면처럼 자연어 검색이 동작합니다. 다만 아직 부족한 점도 있고, 실용성이 떨어지는 부분도 몇 군데 보였습니다.

  1. 애니메이션의 경우 자막을 쓰는 편이 더 효율적으로 보임
  2. 스포일러 방지, 사용자가 조건을 넣지 않으면 원치 않는 화나 장면이 노출될 수 있음
  3. 프레임 샘플링 한계, 성능을 올리면 리소스 사용량이 커져 실용성이 떨어짐