MVP 벡터 DB 로드 코드

PDF 문서 로드 및 처리

def load_pdf():
    """PDF 문서 로드"""
    if not PDF_PATH or not os.path.exists(PDF_PATH):
        raise FileNotFoundError(f"PDF 파일을 찾을 수 없습니다: {PDF_PATH}")
    print("PDF 파일 로드 중...")
    return PyMuPDFLoader(PDF_PATH).load()

기술 설명:

  • PyMuPDFLoader: LangChain에서 제공하는 PDF 로더로, PDF 파일을 텍스트 형태로 추출
  • 예외 처리: 파일이 없는 경우를 대비한 FileNotFoundError 처리

PDF 문서 벡터화 및 저장

def create_pdf_vectorstore():
    """PDF 문서를 벡터로 변환하여 저장"""
    documents = load_pdf()
    print(f"PDF 문서 분할 중... (총 {len(documents)}개 문서)")

    text_splitter = CharacterTextSplitter(
        chunk_size=2000, chunk_overlap=200, separator="\\n"
    )
    texts = text_splitter.split_documents(documents)

    print(f"PDF 문서 임베딩 중... (총 {len(texts)}개 조각)")
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

    Chroma.from_documents(
        documents=texts,
        embedding=embeddings,
        collection_name="startup_support_policies",
        persist_directory=CHROMA_DB_DIR,
    )
    print(f"PDF 벡터 저장 완료 ({len(texts)}개)")
    return len(texts)

기술 설명:

  • CharacterTextSplitter: 대용량 문서를 처리 가능한 크기로 분할하는 LangChain 유틸리티
    • chunk_size: 각 분할 조각의 최대 크기 (2000자)
    • chunk_overlap: 연속성 유지를 위한 조각 간 중복 영역 (200자)
  • OpenAI Embeddings: 텍스트를 벡터로 변환하는 임베딩 모델
    • text-embedding-3-small: 크기와 속도 면에서 균형 잡힌 OpenAI의 임베딩 모델
  • Chroma.from_documents: 벡터 데이터베이스에 문서를 직접 저장하는 편의 메서드
    • collection_name: 벡터 데이터베이스 내 컬렉션 이름 지정
    • persist_directory: 벡터 데이터를 디스크에 영구 저장할 경로

정부24 API 데이터 수집

def get_gov_services():
    """정부24 공공서비스 목록 조회"""
    url = f"{GOV24_BASE_URL}/serviceList"
    all_services = []
    total_count = 0

    print("정부24 데이터 수집 중...")

    # 첫 페이지 요청으로 총 개수 확인
    try:
        first_page_params = {
            "serviceKey": GOV24_API_KEY,
            "page": 1,
            "perPage": PAGE_SIZE,
        }
        first_response = requests.get(url, params=first_page_params)
        if first_response.status_code == 200:
            first_data = first_response.json()
            total_count = first_data.get("totalCount", 0)
            if "data" in first_data:
                all_services.extend(first_data.get("data", []))

            expected_pages = min(MAX_PAGES, (total_count + PAGE_SIZE - 1) // PAGE_SIZE)
            print(f"정부24 총 정책 수: {total_count}, 예상 페이지 수: {expected_pages}")

            # 2페이지부터 데이터 수집
            for page in tqdm(
                range(2, expected_pages + 1),
                desc="정부24 데이터 수집",
                dynamic_ncols=True,
            ):
                params = {
                    "serviceKey": GOV24_API_KEY,
                    "page": page,
                    "perPage": PAGE_SIZE,
                }
                response = requests.get(url, params=params)
                if response.status_code == 200:
                    data = response.json()
                    services = data.get("data", [])
                    if not services:
                        break
                    all_services.extend(services)
                else:
                    print(f"정부24 API 요청 실패: 상태 코드 {response.status_code}")
                    break
                time.sleep(API_RATE_LIMIT_DELAY)
    except Exception as e:
        print(f"정부24 API 오류: {e}")

    print(f"정부24 데이터 수집 완료: {len(all_services)}개")
    return all_services

기술 설명:

  • 페이지네이션 처리:
    • 첫 페이지에서 총 개수를 확인하고 필요한 페이지 수 계산하는 기법
    • (total_count + PAGE_SIZE - 1) // PAGE_SIZE "총 항목 수(total_count)를 페이지 크기(PAGE_SIZE)로 나누고 올림”
    • 전체 데이터(10,000개 이상)를 한 번에 요청하면 서버의 메모리와 CPU 부하가 늘어나기 때문에 작은 단위로 나누어 요청
  • API 속도 제한 처리:
    • time.sleep(API_RATE_LIMIT_DELAY)로 요청 간 딜레이를 두어 API 서버 부하 방지
    • 짧은 시간에 너무 많은 요청을 하면 API 제공자가 요청을 차단할 수 있어서 해야함(안하면 디도스 취급 받을지도 몰루)
  • tqdm 활용:
    • 장시간 실행되는, 많은 페이지 처리 작업의 진행 상황을 시각적으로 표시
    • 이거 없으면 10분 이상 변화없는 터미널 보면서 손가락 빨고 있어야함
    • dynamic_ncols=True 이거 넣으면 진행도 한줄로 예쁘게 나옴
      • 3개의 API(정부24, 청년정책, 고용24)에서 데이터를 수집하는데 만약 첫 번째 API에서 오류가 발생했다고 해서 프로그램이 완전히 중단된다면 나머지 두 API에서 데이터를 수집할 기회를 잃게 됨. 그리고 페이지네이션 처리 중 특정 페이지에서 오류가 발생했을 때, 그 페이지만 건너뛰고 나머지 페이지의 데이터는 계속 수집하는 것이 더 효율적
  • 예외 처리:
    • 모든 API 요청을 try-except로 감싸 오류 발생 시에도 프로그램이 중단되지 않도록 처리
    • 아잇 근데 기존 데이터를 삭제하거나 덮어쓰는 로직 넣으면 다시 실행해도 중복값 없이 들어갈테니까 이 부분은 나중에 보완하겠음.

청년정책 API 데이터 수집

def get_youth_policies():
    """청년정책 목록 조회"""
    all_policies = []
    total_count = 0

    print("청년정책 데이터 수집 중...")

    # 첫 페이지 요청으로 총 개수 확인
    try:
        first_page_params = {
            "apiKeyNm": YOUTH_POLICY_API_KEY,
            "pageNum": 1,
            "pageSize": PAGE_SIZE,
            "rtnType": "json",
        }
        first_response = requests.get(
            YOUTH_POLICY_URL, params=first_page_params, timeout=10
        )
        if first_response.status_code == 200:
            first_data = first_response.json()
            total_count = (
                first_data.get("result", {}).get("pagging", {}).get("totCount", 0)
            )
            policies = first_data.get("result", {}).get("youthPolicyList", [])
            all_policies.extend(policies)

기술 설명:

  • 중첩 딕셔너리 안전 접근:
    • get("result", {}).get("pagging", {}).get("totCount", 0)와 같이 기본값을 제공하여 KeyError 방지
  • 요청 타임아웃 설정:
    • timeout=10으로 API 요청이 무한정 대기하는 것을 방지
  • JSON 응답 구조 처리:
    • 각 API마다 다른 응답 구조 (여기서는 result > pagging > totCount)에 적응하는 유연한 코드

고용24 API 데이터 수집 (XML 처리)

def get_employment_programs():
    """고용24 구직자취업역량강화 프로그램 조회"""
    all_programs = []
    total_count = 0

    print("고용24 데이터 수집 중...")

    # 첫 페이지 요청으로 총 개수 확인
    try:
        first_page_params = {
            "authKey": EMPLOYMENT_API_KEY,
            "returnType": "XML",
            "startPage": 1,
            "display": PAGE_SIZE,
        }
        first_response = requests.get(
            EMPLOYMENT_URL, params=first_page_params, timeout=15
        )
        if first_response.status_code == 200:
            try:
                root = ET.fromstring(first_response.text)
                total_element = root.find(".//total")
                if total_element is not None and total_element.text:
                    total_count = int(total_element.text)

                # 첫 페이지 데이터 처리
                items = root.findall(".//empPgmSchdInvite")
                for item in items:
                    program = {}
                    for child in item:
                        program[child.tag] = child.text

                    mapped_program = {
                        "title": program.get("pgmNm", "")
                        + (
                            " - " + program.get("pgmSubNm", "")
                            if program.get("pgmSubNm")
                            else ""
                        ),
                        "content": program.get("pgmTarget", ""),
                        # 기타 필드 매핑...
                    }
                    all_programs.append(mapped_program)

기술 설명:

  • XML 파싱:
    • ElementTree.fromstring으로 XML 문자열을 파싱하여 트리 구조로 변환
  • XPath 활용:
    • .//total, .//empPgmSchdInvite와 같은 XPath 표현식으로 XML 요소 탐색
  • 데이터 정규화:
    • 다양한 형식(여기서는 XML)의 데이터를 표준화된 구조로 변환
    • 조건부 문자열 연결(title 필드에서 pgmSubNm이 있는 경우에만 추가)

API 데이터를 벡터DB에 저장

def save_api_data_to_chroma():
    """API 데이터를 벡터DB에 저장"""
    # 데이터 수집
    gov_services = get_gov_services()
    youth_policies = get_youth_policies()
    employment_programs = get_employment_programs()

    documents = []
    print("문서 변환 중...")

    # 정부24 데이터 추가
    print(f"정부24 데이터 처리 중... ({len(gov_services)}개)")
    for item in tqdm(gov_services, desc="정부24 문서 변환", dynamic_ncols=True):
        doc = Document(
            page_content=f"{item.get('servNm', '이름 없음')}\\n{item.get('servDgst', '설명 없음')}",
            metadata={"source": "정부24", "id": item.get("servId", "unknown_id")},
        )
        documents.append(doc)

    # 배치 처리
    batch_size = 100
    total_batches = (len(documents) + batch_size - 1) // batch_size

    print(f"벡터DB에 저장 중... (총 {total_batches}개 배치)")
    for i in tqdm(range(total_batches), desc="벡터DB 저장 진행", dynamic_ncols=True):
        start_idx = i * batch_size
        end_idx = min((i + 1) * batch_size, len(documents))
        vectorstore.add_documents(documents[start_idx:end_idx])
        tqdm.write(
            f"  배치 {i+1}/{total_batches} 완료 ({start_idx+1}-{end_idx}번 문서)"
        )

    print(f"API 데이터 {len(documents)}개 벡터DB 저장 완료")
    return len(documents)

기술 설명:

  • Document 객체화:
    • 각 정책 정보를 LangChain의 Document 객체로 변환하여 표준화
    • metadata에 출처와 ID 정보를 포함하여 후속 검색 및 필터링을 용이하게 함
  • 배치 처리:
    • 배치 처리(Batch Processing)는 대량의 데이터를 한 번에 모두 처리하지 않고, 여러 개의 작은 그룹(배치)으로 나누어 순차적으로 처리하는 방식
    • 메모리 효율성 향상 및 중간 실패 시 전체 작업 손실 방지
  • 진행 상황 보고:
    • tqdm을 통한 전체 진행률 표시와 tqdm.write()를 통한 배치별 상세 정보 출력
    • 장시간 실행 작업에서 사용자에게 피드백 제공

메인 실행 함수

if __name__ == "__main__":
    try:
        print("=== 데이터 로딩 시작 ===")
        start_time = time.time()

        print("\\n[STEP 1/2] PDF 벡터 저장")
        pdf_count = create_pdf_vectorstore()

        print("\\n[STEP 2/2] API 데이터 벡터 저장")
        api_count = save_api_data_to_chroma()

        end_time = time.time()
        elapsed_time = end_time - start_time
        hours, remainder = divmod(elapsed_time, 3600)
        minutes, seconds = divmod(remainder, 60)

        print("\\n=== 데이터 로딩 완료 ===")
        print(f"PDF 문서: {pdf_count}개")
        print(f"API 문서: {api_count}개")
        print(f"총 문서: {pdf_count + api_count}개")
        print(f"소요 시간: {int(hours)}시간 {int(minutes)}분 {int(seconds)}초")
    except Exception as e:
        print(f"데이터 로딩 실패: {e}")
        import traceback

        traceback.print_exc()

기술 설명:

  • 시간 측정:
    • time.time()으로 시작 및 종료 시간을 기록하여 총 실행 시간 측정
  • 단계별 실행:
    • 전체 작업을 논리적 단계로 분리하여 순차적으로 실행
  • 결과 요약:
    • 작업 완료 후 총 처리된 문서 수와 소요 시간을 요약하여 보고
  • 전역 예외 처리:
    • 전체 코드를 try-except로 감싸 예외 발생 시에도 적절한 오류 메시지와 스택 트레이스 제공