PDF 문서 로드 및 처리
def load_pdf():
"""PDF 문서 로드"""
if not PDF_PATH or not os.path.exists(PDF_PATH):
raise FileNotFoundError(f"PDF 파일을 찾을 수 없습니다: {PDF_PATH}")
print("PDF 파일 로드 중...")
return PyMuPDFLoader(PDF_PATH).load()
기술 설명:
- PyMuPDFLoader: LangChain에서 제공하는 PDF 로더로, PDF 파일을 텍스트 형태로 추출
- 예외 처리: 파일이 없는 경우를 대비한 FileNotFoundError 처리
PDF 문서 벡터화 및 저장
def create_pdf_vectorstore():
"""PDF 문서를 벡터로 변환하여 저장"""
documents = load_pdf()
print(f"PDF 문서 분할 중... (총 {len(documents)}개 문서)")
text_splitter = CharacterTextSplitter(
chunk_size=2000, chunk_overlap=200, separator="\\n"
)
texts = text_splitter.split_documents(documents)
print(f"PDF 문서 임베딩 중... (총 {len(texts)}개 조각)")
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
Chroma.from_documents(
documents=texts,
embedding=embeddings,
collection_name="startup_support_policies",
persist_directory=CHROMA_DB_DIR,
)
print(f"PDF 벡터 저장 완료 ({len(texts)}개)")
return len(texts)
기술 설명:
- CharacterTextSplitter: 대용량 문서를 처리 가능한 크기로 분할하는 LangChain 유틸리티
- chunk_size: 각 분할 조각의 최대 크기 (2000자)
- chunk_overlap: 연속성 유지를 위한 조각 간 중복 영역 (200자)
- OpenAI Embeddings: 텍스트를 벡터로 변환하는 임베딩 모델
- text-embedding-3-small: 크기와 속도 면에서 균형 잡힌 OpenAI의 임베딩 모델
- Chroma.from_documents: 벡터 데이터베이스에 문서를 직접 저장하는 편의 메서드
- collection_name: 벡터 데이터베이스 내 컬렉션 이름 지정
- persist_directory: 벡터 데이터를 디스크에 영구 저장할 경로
정부24 API 데이터 수집
def get_gov_services():
"""정부24 공공서비스 목록 조회"""
url = f"{GOV24_BASE_URL}/serviceList"
all_services = []
total_count = 0
print("정부24 데이터 수집 중...")
# 첫 페이지 요청으로 총 개수 확인
try:
first_page_params = {
"serviceKey": GOV24_API_KEY,
"page": 1,
"perPage": PAGE_SIZE,
}
first_response = requests.get(url, params=first_page_params)
if first_response.status_code == 200:
first_data = first_response.json()
total_count = first_data.get("totalCount", 0)
if "data" in first_data:
all_services.extend(first_data.get("data", []))
expected_pages = min(MAX_PAGES, (total_count + PAGE_SIZE - 1) // PAGE_SIZE)
print(f"정부24 총 정책 수: {total_count}, 예상 페이지 수: {expected_pages}")
# 2페이지부터 데이터 수집
for page in tqdm(
range(2, expected_pages + 1),
desc="정부24 데이터 수집",
dynamic_ncols=True,
):
params = {
"serviceKey": GOV24_API_KEY,
"page": page,
"perPage": PAGE_SIZE,
}
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
services = data.get("data", [])
if not services:
break
all_services.extend(services)
else:
print(f"정부24 API 요청 실패: 상태 코드 {response.status_code}")
break
time.sleep(API_RATE_LIMIT_DELAY)
except Exception as e:
print(f"정부24 API 오류: {e}")
print(f"정부24 데이터 수집 완료: {len(all_services)}개")
return all_services
기술 설명:
- 페이지네이션 처리:
- 첫 페이지에서 총 개수를 확인하고 필요한 페이지 수 계산하는 기법
- (total_count + PAGE_SIZE - 1) // PAGE_SIZE "총 항목 수(total_count)를 페이지 크기(PAGE_SIZE)로 나누고 올림”
- 전체 데이터(10,000개 이상)를 한 번에 요청하면 서버의 메모리와 CPU 부하가 늘어나기 때문에 작은 단위로 나누어 요청
- API 속도 제한 처리:
- time.sleep(API_RATE_LIMIT_DELAY)로 요청 간 딜레이를 두어 API 서버 부하 방지
- 짧은 시간에 너무 많은 요청을 하면 API 제공자가 요청을 차단할 수 있어서 해야함(안하면 디도스 취급 받을지도 몰루)
- tqdm 활용:
- 장시간 실행되는, 많은 페이지 처리 작업의 진행 상황을 시각적으로 표시
- 이거 없으면 10분 이상 변화없는 터미널 보면서 손가락 빨고 있어야함
- dynamic_ncols=True 이거 넣으면 진행도 한줄로 예쁘게 나옴
- 3개의 API(정부24, 청년정책, 고용24)에서 데이터를 수집하는데 만약 첫 번째 API에서 오류가 발생했다고 해서 프로그램이 완전히 중단된다면 나머지 두 API에서 데이터를 수집할 기회를 잃게 됨. 그리고 페이지네이션 처리 중 특정 페이지에서 오류가 발생했을 때, 그 페이지만 건너뛰고 나머지 페이지의 데이터는 계속 수집하는 것이 더 효율적
- 예외 처리:
- 모든 API 요청을 try-except로 감싸 오류 발생 시에도 프로그램이 중단되지 않도록 처리
- 아잇 근데 기존 데이터를 삭제하거나 덮어쓰는 로직 넣으면 다시 실행해도 중복값 없이 들어갈테니까 이 부분은 나중에 보완하겠음.
청년정책 API 데이터 수집
def get_youth_policies():
"""청년정책 목록 조회"""
all_policies = []
total_count = 0
print("청년정책 데이터 수집 중...")
# 첫 페이지 요청으로 총 개수 확인
try:
first_page_params = {
"apiKeyNm": YOUTH_POLICY_API_KEY,
"pageNum": 1,
"pageSize": PAGE_SIZE,
"rtnType": "json",
}
first_response = requests.get(
YOUTH_POLICY_URL, params=first_page_params, timeout=10
)
if first_response.status_code == 200:
first_data = first_response.json()
total_count = (
first_data.get("result", {}).get("pagging", {}).get("totCount", 0)
)
policies = first_data.get("result", {}).get("youthPolicyList", [])
all_policies.extend(policies)
기술 설명:
- 중첩 딕셔너리 안전 접근:
- get("result", {}).get("pagging", {}).get("totCount", 0)와 같이 기본값을 제공하여 KeyError 방지
- 요청 타임아웃 설정:
- timeout=10으로 API 요청이 무한정 대기하는 것을 방지
- JSON 응답 구조 처리:
- 각 API마다 다른 응답 구조 (여기서는 result > pagging > totCount)에 적응하는 유연한 코드
고용24 API 데이터 수집 (XML 처리)
def get_employment_programs():
"""고용24 구직자취업역량강화 프로그램 조회"""
all_programs = []
total_count = 0
print("고용24 데이터 수집 중...")
# 첫 페이지 요청으로 총 개수 확인
try:
first_page_params = {
"authKey": EMPLOYMENT_API_KEY,
"returnType": "XML",
"startPage": 1,
"display": PAGE_SIZE,
}
first_response = requests.get(
EMPLOYMENT_URL, params=first_page_params, timeout=15
)
if first_response.status_code == 200:
try:
root = ET.fromstring(first_response.text)
total_element = root.find(".//total")
if total_element is not None and total_element.text:
total_count = int(total_element.text)
# 첫 페이지 데이터 처리
items = root.findall(".//empPgmSchdInvite")
for item in items:
program = {}
for child in item:
program[child.tag] = child.text
mapped_program = {
"title": program.get("pgmNm", "")
+ (
" - " + program.get("pgmSubNm", "")
if program.get("pgmSubNm")
else ""
),
"content": program.get("pgmTarget", ""),
# 기타 필드 매핑...
}
all_programs.append(mapped_program)
기술 설명:
- XML 파싱:
- ElementTree.fromstring으로 XML 문자열을 파싱하여 트리 구조로 변환
- XPath 활용:
- .//total, .//empPgmSchdInvite와 같은 XPath 표현식으로 XML 요소 탐색
- 데이터 정규화:
- 다양한 형식(여기서는 XML)의 데이터를 표준화된 구조로 변환
- 조건부 문자열 연결(title 필드에서 pgmSubNm이 있는 경우에만 추가)
API 데이터를 벡터DB에 저장
def save_api_data_to_chroma():
"""API 데이터를 벡터DB에 저장"""
# 데이터 수집
gov_services = get_gov_services()
youth_policies = get_youth_policies()
employment_programs = get_employment_programs()
documents = []
print("문서 변환 중...")
# 정부24 데이터 추가
print(f"정부24 데이터 처리 중... ({len(gov_services)}개)")
for item in tqdm(gov_services, desc="정부24 문서 변환", dynamic_ncols=True):
doc = Document(
page_content=f"{item.get('servNm', '이름 없음')}\\n{item.get('servDgst', '설명 없음')}",
metadata={"source": "정부24", "id": item.get("servId", "unknown_id")},
)
documents.append(doc)
# 배치 처리
batch_size = 100
total_batches = (len(documents) + batch_size - 1) // batch_size
print(f"벡터DB에 저장 중... (총 {total_batches}개 배치)")
for i in tqdm(range(total_batches), desc="벡터DB 저장 진행", dynamic_ncols=True):
start_idx = i * batch_size
end_idx = min((i + 1) * batch_size, len(documents))
vectorstore.add_documents(documents[start_idx:end_idx])
tqdm.write(
f" 배치 {i+1}/{total_batches} 완료 ({start_idx+1}-{end_idx}번 문서)"
)
print(f"API 데이터 {len(documents)}개 벡터DB 저장 완료")
return len(documents)
기술 설명:
- Document 객체화:
- 각 정책 정보를 LangChain의 Document 객체로 변환하여 표준화
- metadata에 출처와 ID 정보를 포함하여 후속 검색 및 필터링을 용이하게 함
- 배치 처리:
- 배치 처리(Batch Processing)는 대량의 데이터를 한 번에 모두 처리하지 않고, 여러 개의 작은 그룹(배치)으로 나누어 순차적으로 처리하는 방식
- 메모리 효율성 향상 및 중간 실패 시 전체 작업 손실 방지
- 진행 상황 보고:
- tqdm을 통한 전체 진행률 표시와 tqdm.write()를 통한 배치별 상세 정보 출력
- 장시간 실행 작업에서 사용자에게 피드백 제공
메인 실행 함수
if __name__ == "__main__":
try:
print("=== 데이터 로딩 시작 ===")
start_time = time.time()
print("\\n[STEP 1/2] PDF 벡터 저장")
pdf_count = create_pdf_vectorstore()
print("\\n[STEP 2/2] API 데이터 벡터 저장")
api_count = save_api_data_to_chroma()
end_time = time.time()
elapsed_time = end_time - start_time
hours, remainder = divmod(elapsed_time, 3600)
minutes, seconds = divmod(remainder, 60)
print("\\n=== 데이터 로딩 완료 ===")
print(f"PDF 문서: {pdf_count}개")
print(f"API 문서: {api_count}개")
print(f"총 문서: {pdf_count + api_count}개")
print(f"소요 시간: {int(hours)}시간 {int(minutes)}분 {int(seconds)}초")
except Exception as e:
print(f"데이터 로딩 실패: {e}")
import traceback
traceback.print_exc()
기술 설명:
- 시간 측정:
- time.time()으로 시작 및 종료 시간을 기록하여 총 실행 시간 측정
- 단계별 실행:
- 전체 작업을 논리적 단계로 분리하여 순차적으로 실행
- 결과 요약:
- 작업 완료 후 총 처리된 문서 수와 소요 시간을 요약하여 보고
- 전역 예외 처리:
- 전체 코드를 try-except로 감싸 예외 발생 시에도 적절한 오류 메시지와 스택 트레이스 제공
'Today I Learned' 카테고리의 다른 글
BM25, MultiQueryRetriever, Singleton 패턴 (1) | 2025.03.21 |
---|---|
LAG를 왜 공부해야할까? (0) | 2025.03.10 |
내가 보려고 정리한 협업 깃 명령어 (2) | 2025.03.05 |
openai로 내용 보강법 (0) | 2025.02.21 |
OpenAI API의 멀티턴 대화 (0) | 2025.02.21 |