PDF 문서 로드 및 처리
| def load_pdf(): |
| """PDF 문서 로드""" |
| if not PDF_PATH or not os.path.exists(PDF_PATH): |
| raise FileNotFoundError(f"PDF 파일을 찾을 수 없습니다: {PDF_PATH}") |
| print("PDF 파일 로드 중...") |
| return PyMuPDFLoader(PDF_PATH).load() |
기술 설명:
- PyMuPDFLoader: LangChain에서 제공하는 PDF 로더로, PDF 파일을 텍스트 형태로 추출
- 예외 처리: 파일이 없는 경우를 대비한 FileNotFoundError 처리
PDF 문서 벡터화 및 저장
| def create_pdf_vectorstore(): |
| """PDF 문서를 벡터로 변환하여 저장""" |
| documents = load_pdf() |
| print(f"PDF 문서 분할 중... (총 {len(documents)}개 문서)") |
| |
| text_splitter = CharacterTextSplitter( |
| chunk_size=2000, chunk_overlap=200, separator="\\n" |
| ) |
| texts = text_splitter.split_documents(documents) |
| |
| print(f"PDF 문서 임베딩 중... (총 {len(texts)}개 조각)") |
| embeddings = OpenAIEmbeddings(model="text-embedding-3-small") |
| |
| Chroma.from_documents( |
| documents=texts, |
| embedding=embeddings, |
| collection_name="startup_support_policies", |
| persist_directory=CHROMA_DB_DIR, |
| ) |
| print(f"PDF 벡터 저장 완료 ({len(texts)}개)") |
| return len(texts) |
기술 설명:
- CharacterTextSplitter: 대용량 문서를 처리 가능한 크기로 분할하는 LangChain 유틸리티
- chunk_size: 각 분할 조각의 최대 크기 (2000자)
- chunk_overlap: 연속성 유지를 위한 조각 간 중복 영역 (200자)
- OpenAI Embeddings: 텍스트를 벡터로 변환하는 임베딩 모델
- text-embedding-3-small: 크기와 속도 면에서 균형 잡힌 OpenAI의 임베딩 모델
- Chroma.from_documents: 벡터 데이터베이스에 문서를 직접 저장하는 편의 메서드
- collection_name: 벡터 데이터베이스 내 컬렉션 이름 지정
- persist_directory: 벡터 데이터를 디스크에 영구 저장할 경로
정부24 API 데이터 수집
| def get_gov_services(): |
| """정부24 공공서비스 목록 조회""" |
| url = f"{GOV24_BASE_URL}/serviceList" |
| all_services = [] |
| total_count = 0 |
| |
| print("정부24 데이터 수집 중...") |
| |
| |
| try: |
| first_page_params = { |
| "serviceKey": GOV24_API_KEY, |
| "page": 1, |
| "perPage": PAGE_SIZE, |
| } |
| first_response = requests.get(url, params=first_page_params) |
| if first_response.status_code == 200: |
| first_data = first_response.json() |
| total_count = first_data.get("totalCount", 0) |
| if "data" in first_data: |
| all_services.extend(first_data.get("data", [])) |
| |
| expected_pages = min(MAX_PAGES, (total_count + PAGE_SIZE - 1) // PAGE_SIZE) |
| print(f"정부24 총 정책 수: {total_count}, 예상 페이지 수: {expected_pages}") |
| |
| |
| for page in tqdm( |
| range(2, expected_pages + 1), |
| desc="정부24 데이터 수집", |
| dynamic_ncols=True, |
| ): |
| params = { |
| "serviceKey": GOV24_API_KEY, |
| "page": page, |
| "perPage": PAGE_SIZE, |
| } |
| response = requests.get(url, params=params) |
| if response.status_code == 200: |
| data = response.json() |
| services = data.get("data", []) |
| if not services: |
| break |
| all_services.extend(services) |
| else: |
| print(f"정부24 API 요청 실패: 상태 코드 {response.status_code}") |
| break |
| time.sleep(API_RATE_LIMIT_DELAY) |
| except Exception as e: |
| print(f"정부24 API 오류: {e}") |
| |
| print(f"정부24 데이터 수집 완료: {len(all_services)}개") |
| return all_services |
기술 설명:
- 페이지네이션 처리:
- 첫 페이지에서 총 개수를 확인하고 필요한 페이지 수 계산하는 기법
- (total_count + PAGE_SIZE - 1) // PAGE_SIZE "총 항목 수(total_count)를 페이지 크기(PAGE_SIZE)로 나누고 올림”
- 전체 데이터(10,000개 이상)를 한 번에 요청하면 서버의 메모리와 CPU 부하가 늘어나기 때문에 작은 단위로 나누어 요청
- API 속도 제한 처리:
- time.sleep(API_RATE_LIMIT_DELAY)로 요청 간 딜레이를 두어 API 서버 부하 방지
- 짧은 시간에 너무 많은 요청을 하면 API 제공자가 요청을 차단할 수 있어서 해야함(안하면 디도스 취급 받을지도 몰루)
- tqdm 활용:
- 장시간 실행되는, 많은 페이지 처리 작업의 진행 상황을 시각적으로 표시
- 이거 없으면 10분 이상 변화없는 터미널 보면서 손가락 빨고 있어야함
- dynamic_ncols=True 이거 넣으면 진행도 한줄로 예쁘게 나옴
- 3개의 API(정부24, 청년정책, 고용24)에서 데이터를 수집하는데 만약 첫 번째 API에서 오류가 발생했다고 해서 프로그램이 완전히 중단된다면 나머지 두 API에서 데이터를 수집할 기회를 잃게 됨. 그리고 페이지네이션 처리 중 특정 페이지에서 오류가 발생했을 때, 그 페이지만 건너뛰고 나머지 페이지의 데이터는 계속 수집하는 것이 더 효율적
- 예외 처리:
- 모든 API 요청을 try-except로 감싸 오류 발생 시에도 프로그램이 중단되지 않도록 처리
- 아잇 근데 기존 데이터를 삭제하거나 덮어쓰는 로직 넣으면 다시 실행해도 중복값 없이 들어갈테니까 이 부분은 나중에 보완하겠음.
청년정책 API 데이터 수집
| def get_youth_policies(): |
| """청년정책 목록 조회""" |
| all_policies = [] |
| total_count = 0 |
| |
| print("청년정책 데이터 수집 중...") |
| |
| |
| try: |
| first_page_params = { |
| "apiKeyNm": YOUTH_POLICY_API_KEY, |
| "pageNum": 1, |
| "pageSize": PAGE_SIZE, |
| "rtnType": "json", |
| } |
| first_response = requests.get( |
| YOUTH_POLICY_URL, params=first_page_params, timeout=10 |
| ) |
| if first_response.status_code == 200: |
| first_data = first_response.json() |
| total_count = ( |
| first_data.get("result", {}).get("pagging", {}).get("totCount", 0) |
| ) |
| policies = first_data.get("result", {}).get("youthPolicyList", []) |
| all_policies.extend(policies) |
기술 설명:
- 중첩 딕셔너리 안전 접근:
- get("result", {}).get("pagging", {}).get("totCount", 0)와 같이 기본값을 제공하여 KeyError 방지
- 요청 타임아웃 설정:
- timeout=10으로 API 요청이 무한정 대기하는 것을 방지
- JSON 응답 구조 처리:
- 각 API마다 다른 응답 구조 (여기서는 result > pagging > totCount)에 적응하는 유연한 코드
고용24 API 데이터 수집 (XML 처리)
| def get_employment_programs(): |
| """고용24 구직자취업역량강화 프로그램 조회""" |
| all_programs = [] |
| total_count = 0 |
| |
| print("고용24 데이터 수집 중...") |
| |
| |
| try: |
| first_page_params = { |
| "authKey": EMPLOYMENT_API_KEY, |
| "returnType": "XML", |
| "startPage": 1, |
| "display": PAGE_SIZE, |
| } |
| first_response = requests.get( |
| EMPLOYMENT_URL, params=first_page_params, timeout=15 |
| ) |
| if first_response.status_code == 200: |
| try: |
| root = ET.fromstring(first_response.text) |
| total_element = root.find(".//total") |
| if total_element is not None and total_element.text: |
| total_count = int(total_element.text) |
| |
| |
| items = root.findall(".//empPgmSchdInvite") |
| for item in items: |
| program = {} |
| for child in item: |
| program[child.tag] = child.text |
| |
| mapped_program = { |
| "title": program.get("pgmNm", "") |
| + ( |
| " - " + program.get("pgmSubNm", "") |
| if program.get("pgmSubNm") |
| else "" |
| ), |
| "content": program.get("pgmTarget", ""), |
| |
| } |
| all_programs.append(mapped_program) |
기술 설명:
- XML 파싱:
- ElementTree.fromstring으로 XML 문자열을 파싱하여 트리 구조로 변환
- XPath 활용:
- .//total, .//empPgmSchdInvite와 같은 XPath 표현식으로 XML 요소 탐색
- 데이터 정규화:
- 다양한 형식(여기서는 XML)의 데이터를 표준화된 구조로 변환
- 조건부 문자열 연결(title 필드에서 pgmSubNm이 있는 경우에만 추가)
API 데이터를 벡터DB에 저장
| def save_api_data_to_chroma(): |
| """API 데이터를 벡터DB에 저장""" |
| |
| gov_services = get_gov_services() |
| youth_policies = get_youth_policies() |
| employment_programs = get_employment_programs() |
| |
| documents = [] |
| print("문서 변환 중...") |
| |
| |
| print(f"정부24 데이터 처리 중... ({len(gov_services)}개)") |
| for item in tqdm(gov_services, desc="정부24 문서 변환", dynamic_ncols=True): |
| doc = Document( |
| page_content=f"{item.get('servNm', '이름 없음')}\\n{item.get('servDgst', '설명 없음')}", |
| metadata={"source": "정부24", "id": item.get("servId", "unknown_id")}, |
| ) |
| documents.append(doc) |
| |
| |
| batch_size = 100 |
| total_batches = (len(documents) + batch_size - 1) // batch_size |
| |
| print(f"벡터DB에 저장 중... (총 {total_batches}개 배치)") |
| for i in tqdm(range(total_batches), desc="벡터DB 저장 진행", dynamic_ncols=True): |
| start_idx = i * batch_size |
| end_idx = min((i + 1) * batch_size, len(documents)) |
| vectorstore.add_documents(documents[start_idx:end_idx]) |
| tqdm.write( |
| f" 배치 {i+1}/{total_batches} 완료 ({start_idx+1}-{end_idx}번 문서)" |
| ) |
| |
| print(f"API 데이터 {len(documents)}개 벡터DB 저장 완료") |
| return len(documents) |
기술 설명:
- Document 객체화:
- 각 정책 정보를 LangChain의 Document 객체로 변환하여 표준화
- metadata에 출처와 ID 정보를 포함하여 후속 검색 및 필터링을 용이하게 함
- 배치 처리:
- 배치 처리(Batch Processing)는 대량의 데이터를 한 번에 모두 처리하지 않고, 여러 개의 작은 그룹(배치)으로 나누어 순차적으로 처리하는 방식
- 메모리 효율성 향상 및 중간 실패 시 전체 작업 손실 방지
- 진행 상황 보고:
- tqdm을 통한 전체 진행률 표시와 tqdm.write()를 통한 배치별 상세 정보 출력
- 장시간 실행 작업에서 사용자에게 피드백 제공
메인 실행 함수
| if __name__ == "__main__": |
| try: |
| print("=== 데이터 로딩 시작 ===") |
| start_time = time.time() |
| |
| print("\\n[STEP 1/2] PDF 벡터 저장") |
| pdf_count = create_pdf_vectorstore() |
| |
| print("\\n[STEP 2/2] API 데이터 벡터 저장") |
| api_count = save_api_data_to_chroma() |
| |
| end_time = time.time() |
| elapsed_time = end_time - start_time |
| hours, remainder = divmod(elapsed_time, 3600) |
| minutes, seconds = divmod(remainder, 60) |
| |
| print("\\n=== 데이터 로딩 완료 ===") |
| print(f"PDF 문서: {pdf_count}개") |
| print(f"API 문서: {api_count}개") |
| print(f"총 문서: {pdf_count + api_count}개") |
| print(f"소요 시간: {int(hours)}시간 {int(minutes)}분 {int(seconds)}초") |
| except Exception as e: |
| print(f"데이터 로딩 실패: {e}") |
| import traceback |
| |
| traceback.print_exc() |
| |
기술 설명:
- 시간 측정:
- time.time()으로 시작 및 종료 시간을 기록하여 총 실행 시간 측정
- 단계별 실행:
- 전체 작업을 논리적 단계로 분리하여 순차적으로 실행
- 결과 요약:
- 작업 완료 후 총 처리된 문서 수와 소요 시간을 요약하여 보고
- 전역 예외 처리:
- 전체 코드를 try-except로 감싸 예외 발생 시에도 적절한 오류 메시지와 스택 트레이스 제공