core-admin · AI context viewer
읽기 전용 컨텍스트 · 실제 수정은 GitHub repo 에서만 한다 · index
운영판/server.py
server.py
원본 경로: 운영판/server.py
#!/usr/bin/env python3
"""
정야 운영판 v0.3 — 단일 Flask 서버 (port 5002)
- 상단 탭: 수집 / 운영
- 폴더 모델: 선별 대기 / 아카이브 / _휴지통
- 분류: type 4기본+확장, category 배열 (워크스페이스 모델)
- 카테고리 안내 글: _정야총괄/카테고리/{이름}.md
- 외부 LLM API 직접 호출 X (프롬프트는 클립보드까지)
"""
import os
import re
import io
import json
import uuid
import shutil
import zipfile
import traceback
import urllib.request
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse, unquote
from flask import Flask, request, jsonify, send_from_directory
# ── 경로 ─────────────────────────────────────────────────────
BASE = Path("D:/정야프로젝트")
SELECTION_DIR = BASE / "선별 대기"
ARCHIVE_DIR = BASE / "아카이브"
TRASH_DIR = BASE / "_휴지통"
CATEGORY_DIR = BASE / "_정야총괄" / "카테고리"
OPDB = Path(__file__).parent
DATA_DIR = OPDB / "data"
PROMPTS_FILE = DATA_DIR / "prompts.json"
ERROR_LOG = DATA_DIR / "error.log"
STATIC_DIR = OPDB / "static"
# v0.2 type 4기본 (확장 가능). capture가 박는 platform 값은 type이 아님
TYPE_BASE = ["외부자료", "메모", "컨텐츠", "프롬프트"]
# 구버전(capture) type → v0.2 type 매핑
LEGACY_TYPE_TO_V2 = {
"youtube": "외부자료",
"youtube_shorts": "외부자료",
"notion": "외부자료",
"instagram": "외부자료",
"instagram_reel": "외부자료",
"naver": "외부자료",
"slashpage": "외부자료",
"misc": "외부자료",
"keep": "메모",
"memo": "메모",
}
app = Flask(__name__, static_folder=str(STATIC_DIR))
# ── 초기화 ───────────────────────────────────────────────────
def init_data():
DATA_DIR.mkdir(exist_ok=True)
ERROR_LOG.touch(exist_ok=True)
SELECTION_DIR.mkdir(exist_ok=True)
ARCHIVE_DIR.mkdir(exist_ok=True)
TRASH_DIR.mkdir(exist_ok=True)
CATEGORY_DIR.mkdir(parents=True, exist_ok=True)
if not PROMPTS_FILE.exists():
seed = {
"stages": [
{"id": "draft_gpt", "label": "생각 정리 (GPT)", "model_hint": "GPT 5.5",
"body": "역할: 너는 초안 작성자다.\n톤 reference: 첨부한 raw의 색을 죽이지 말 것.\n\n작업:\n1. 핵심 키워드 5-7개 추출\n2. N자 분량으로 구조화 (목표: ____자)\n3. 필요한 자료 목록 (최대 5개)\n4. 사용자 확인 필요 사항 (최대 3개)\n\n하지 말 것:\n- 거친 표현 다듬지 말 것\n- 빈 칸을 일반론으로 채우지 말 것 → 모르면 [확인 필요] 표시\n- 너 의견 넣지 말 것 (정리자, 평가자 아님)"},
{"id": "research_perplexity", "label": "실시간 자료 (Perplexity)", "model_hint": "Perplexity",
"body": "다음 주제 최신 정보 검색. 출처 3개 이상. 2026년 자료 우선.\n핵심 5줄 요약. 감성 빼고 데이터 중심.\n\n주제: ____"},
{"id": "research_gemini", "label": "긴 자료 분석 (Gemini)", "model_hint": "Gemini Plus",
"body": "첨부 자료를 분석. 감성 빼고 데이터 중심으로 정리.\n출력 형식: 핵심 요지 → 근거 → 빠진 시각\n\n주제: ____"},
{"id": "direction_opus", "label": "방향 점검 (Opus)", "model_hint": "Claude Opus",
"body": "외부 자문 임원. 칭찬 빼고 비판만.\n\n5분짜리 검토. 3개만 답할 것:\n1. 방향 자체가 잘못된 게 있나? (Y/N + 한 줄 이유)\n2. 빠진 시각 있나? (있으면 1-2개)\n3. 구조에서 위험한 부분? (있으면 1-2개)\n\n하지 말 것: 칭찬, 응원, 디테일 수정 제안 (방향만)"},
{"id": "polish_sonnet", "label": "톤 정리 (Sonnet)", "model_hint": "Claude Sonnet",
"body": "최종 편집자.\n톤 우선순위: raw > 초안 > 평소 내 글 패턴\n\n해야 할 것:\n- 문장 결 다듬기 (의미 유지)\n- 톤 흔들린 구간 통일\n- 거친 표현/비유 복원\n\n하지 말 것:\n- 안전하게 다듬지 말 것\n- 내가 '이게 맞나?' 물어도 너 판단 다르면 굽히지 말고 근거 대라\n- 구조 자체 바꾸지 말 것 (편집 권한, 재기획 권한 아님)"},
{"id": "final_check_opus", "label": "마감 검수 (Opus)", "model_hint": "Claude Opus",
"body": "최종 발행 전 마감 검수.\n\n검수 항목:\n1. 발행해도 되는 완성도인가? (Y/N + 이유)\n2. raw의 색이 살아 있나? (Y/N + 휘발된 부분 표시)\n3. 사실관계 오류? (있으면 위치)\n4. 톤 일관성? (점프 있는 구간 위치)\n5. 빠뜨린 핵심 메시지?\n\n수정 필요 → 구체적 위치 + 변경안.\n완료 → 'OK'만."},
{"id": "execute_code", "label": "구현 (Claude Code)", "model_hint": "Claude Code",
"body": "다음 스펙을 정확히 따라 구현할 것.\n스펙에 없는 기능 추가 금지.\n오류 발생 시 수정 지점과 로그를 남길 것.\n\n스펙 위치: ____"}
],
"custom": []
}
PROMPTS_FILE.write_text(json.dumps(seed, ensure_ascii=False, indent=2), encoding="utf-8")
# ── 공용 유틸 ────────────────────────────────────────────────
def log_error(endpoint: str, exc: Exception):
ts = datetime.now().isoformat(timespec="seconds")
line = f"[{ts}] [{endpoint}] {type(exc).__name__}: {exc}\n"
try:
with ERROR_LOG.open("a", encoding="utf-8") as f:
f.write(line)
f.write(traceback.format_exc() + "\n")
except Exception:
pass
def safe_resolve(rel: str) -> Path | None:
"""rel을 BASE 하위의 절대경로로 변환. 경로 탈출 거부."""
if not rel or ".." in rel or os.path.isabs(rel):
return None
target = (BASE / rel.replace("\\", "/")).resolve()
base_resolved = BASE.resolve()
try:
target.relative_to(base_resolved)
except ValueError:
return None
return target
def rel_to_base(p: Path) -> str:
return str(p.relative_to(BASE)).replace("\\", "/")
def make_slug(title: str) -> str:
slug = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '', title or "")
slug = slug.replace(' ', '_')
return slug[:50]
def unique_path(folder: Path, filename: str) -> Path:
p = folder / filename
if not p.exists():
return p
stem, ext = (filename.rsplit('.', 1) if '.' in filename else (filename, ''))
i = 2
while True:
cand = folder / (f"{stem}_{i}.{ext}" if ext else f"{stem}_{i}")
if not cand.exists():
return cand
i += 1
# ── frontmatter ─────────────────────────────────────────────
def parse_frontmatter(text: str) -> tuple[dict, str]:
"""간이 YAML frontmatter 파싱. 실패 시 ({}, text)."""
fm = {}
body = text
if not text.startswith("---"):
return fm, body
end = text.find("\n---", 3)
if end == -1:
return fm, body
raw_fm = text[3:end].lstrip("\n")
body = text[end + 4:].lstrip("\n")
for line in raw_fm.splitlines():
m = re.match(r'^(\w+):\s*(.*)$', line)
if not m:
continue
key, val = m.group(1), m.group(2).strip()
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
elif val.startswith('[') and val.endswith(']'):
inner = val[1:-1].strip()
if not inner:
val = []
else:
items = []
for v in inner.split(','):
v = v.strip().strip('"').strip("'")
if v:
items.append(v)
val = items
elif val.lower() in ("true", "false"):
val = (val.lower() == "true")
fm[key] = val
return fm, body
def yaml_escape(s: str) -> str:
return (s or "").replace("\\", "\\\\").replace('"', '\\"')
def serialize_frontmatter(fm: dict, body: str) -> str:
"""fm dict + body → 전체 md 문자열. 알려진 키 순서 유지."""
order = ["title", "type", "category", "captured_at", "tags",
"source_url", "memo", "platform", "needs_review"]
seen = set()
lines = ["---"]
for key in order:
if key in fm:
seen.add(key)
lines.append(_fm_line(key, fm[key]))
# 알려지지 않은 키들도 보존
for key, val in fm.items():
if key not in seen:
lines.append(_fm_line(key, val))
lines.append("---")
lines.append("")
return "\n".join(lines) + (body if body.startswith("\n") else "\n" + body)
def _fm_line(key: str, val) -> str:
if isinstance(val, list):
items = ", ".join(f'"{yaml_escape(str(v))}"' for v in val)
return f"{key}: [{items}]"
if isinstance(val, bool):
return f"{key}: {'true' if val else 'false'}"
if isinstance(val, str):
return f'{key}: "{yaml_escape(val)}"'
return f"{key}: {val}"
def normalize_type(raw_type, platform_field: str = "") -> tuple[str, str]:
"""frontmatter type을 v0.2 4종으로 정규화. (v2_type, platform) 반환."""
if not raw_type:
return ("", platform_field or "")
raw_str = str(raw_type)
if raw_str in TYPE_BASE or raw_str in ("외부자료", "메모", "컨텐츠", "프롬프트"):
return (raw_str, platform_field or "")
# 구버전 type이면 platform으로
if raw_str in LEGACY_TYPE_TO_V2:
return (LEGACY_TYPE_TO_V2[raw_str], platform_field or raw_str)
# 사용자가 자유 type 박은 경우 그대로 둠
return (raw_str, platform_field or "")
# ── 폴더 스캔 ────────────────────────────────────────────────
# 캐시는 v0.2에선 안 씀. 함수만 분리해두고 1000개+에서 mtime 캐시 끼움.
def scan_folder(folder: Path) -> list[dict]:
items = []
if not folder.exists():
return items
for f in sorted(folder.glob("*.md"), key=lambda x: x.name, reverse=True):
try:
text = f.read_text(encoding="utf-8", errors="replace")
fm, body = parse_frontmatter(text)
v2_type, platform = normalize_type(fm.get("type", ""), fm.get("platform", ""))
preview = " ".join(body.split())[:240]
items.append({
"filename": f.name,
"folder": rel_to_base(folder),
"rel_path": rel_to_base(f),
"title": fm.get("title", f.stem),
"type": v2_type,
"category": fm.get("category", []) if isinstance(fm.get("category"), list) else [],
"platform": platform,
"captured_at": fm.get("captured_at", ""),
"tags": fm.get("tags", []) if isinstance(fm.get("tags"), list) else [],
"memo": fm.get("memo", ""),
"source_url": fm.get("source_url", ""),
"needs_review": bool(fm.get("needs_review", False)),
"preview": preview,
"mtime": f.stat().st_mtime,
})
except Exception as e:
log_error(f"scan_folder({folder.name})", e)
items.append({
"filename": f.name,
"folder": rel_to_base(folder),
"rel_path": rel_to_base(f),
"title": f.stem,
"type": "",
"category": [],
"platform": "",
"captured_at": "",
"tags": [],
"memo": "",
"source_url": "",
"needs_review": False,
"preview": "",
"parse_error": True,
"mtime": 0,
})
return items
# ── 자료 라우트 ──────────────────────────────────────────────
@app.route("/")
def index():
return send_from_directory(str(STATIC_DIR), "index.html")
@app.route("/api/inbox")
def api_inbox():
"""선별 대기 폴더 스캔."""
try:
return jsonify({"items": scan_folder(SELECTION_DIR)})
except Exception as e:
log_error("GET /api/inbox", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/archive")
def api_archive():
try:
return jsonify({"items": scan_folder(ARCHIVE_DIR)})
except Exception as e:
log_error("GET /api/archive", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/trash")
def api_trash():
try:
return jsonify({"items": scan_folder(TRASH_DIR)})
except Exception as e:
log_error("GET /api/trash", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/file")
def api_file():
rel = request.args.get("path", "")
try:
target = safe_resolve(rel)
if target is None:
return jsonify({"ok": False, "error": "잘못된 경로"}), 400
if not target.exists():
return jsonify({"ok": False, "error": "파일 없음"}), 404
return jsonify({"content": target.read_text(encoding="utf-8", errors="replace")})
except Exception as e:
log_error("GET /api/file", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/move", methods=["POST"])
def api_move():
"""파일을 폴더 간 이동. dest_folder는 '선별 대기'|'아카이브'|'_휴지통' 중 하나."""
try:
body = request.get_json(force=True)
rel = body.get("rel_path", "")
dest_folder = body.get("dest_folder", "")
src = safe_resolve(rel)
if src is None or not src.exists():
return jsonify({"ok": False, "error": "원본 없음"}), 404
if dest_folder not in ("선별 대기", "아카이브", "_휴지통"):
return jsonify({"ok": False, "error": "dest_folder 잘못됨"}), 400
dest_dir = BASE / dest_folder
dest_dir.mkdir(exist_ok=True)
dest = unique_path(dest_dir, src.name)
shutil.move(str(src), str(dest))
return jsonify({"ok": True, "new_rel_path": rel_to_base(dest)})
except Exception as e:
log_error("POST /api/move", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/save-frontmatter", methods=["POST"])
def api_save_frontmatter():
"""단일 md의 frontmatter 갱신. body의 fm은 키-값 dict."""
try:
body = request.get_json(force=True)
rel = body.get("rel_path", "")
new_fm = body.get("fm", {})
target = safe_resolve(rel)
if target is None or not target.exists():
return jsonify({"ok": False, "error": "파일 없음"}), 404
text = target.read_text(encoding="utf-8", errors="replace")
old_fm, body_text = parse_frontmatter(text)
old_fm.update(new_fm)
# tags/category는 리스트로 강제
for k in ("tags", "category"):
if k in old_fm and not isinstance(old_fm[k], list):
if isinstance(old_fm[k], str) and old_fm[k]:
old_fm[k] = [s.strip() for s in old_fm[k].split(",") if s.strip()]
else:
old_fm[k] = []
new_text = serialize_frontmatter(old_fm, body_text)
target.write_text(new_text, encoding="utf-8")
return jsonify({"ok": True})
except Exception as e:
log_error("POST /api/save-frontmatter", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/delete-permanent", methods=["POST"])
def api_delete_permanent():
"""휴지통 파일 완전 삭제. 휴지통 하위만 허용."""
try:
body = request.get_json(force=True)
rel = body.get("rel_path", "")
target = safe_resolve(rel)
if target is None or not target.exists():
return jsonify({"ok": False, "error": "파일 없음"}), 404
try:
target.relative_to(TRASH_DIR.resolve())
except ValueError:
return jsonify({"ok": False, "error": "휴지통 외 파일은 완전삭제 불가"}), 400
target.unlink()
return jsonify({"ok": True})
except Exception as e:
log_error("POST /api/delete-permanent", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/categories-in-use")
def api_categories_in_use():
"""선별 대기 + 아카이브에서 실제 찍힌 category 값들 수집해 반환 (필터 옵션용)."""
try:
all_items = scan_folder(SELECTION_DIR) + scan_folder(ARCHIVE_DIR)
categories = set()
types = set()
for it in all_items:
if isinstance(it.get("category"), list):
for c in it["category"]:
if c:
categories.add(str(c))
if it.get("type"):
types.add(it["type"])
return jsonify({
"categories": sorted(categories),
"types": sorted(types | set(TYPE_BASE)),
})
except Exception as e:
log_error("GET /api/categories-in-use", e)
return jsonify({"ok": False, "error": str(e)}), 500
# ── 카테고리(워크스페이스) 라우트 ────────────────────────────
def _validate_cat_name(name: str) -> bool:
if not name or not name.strip():
return False
return bool(re.fullmatch(r"[^/\\:*?\"<>|]+", name))
def _count_items_by_category() -> dict:
"""선별 대기 + 아카이브 전체 자료의 category별 카운트 반환."""
counts: dict = {}
for folder in (SELECTION_DIR, ARCHIVE_DIR):
if not folder.exists():
continue
for f in folder.glob("*.md"):
try:
fm, _ = parse_frontmatter(f.read_text(encoding="utf-8", errors="replace"))
for cat in (fm.get("category") or []):
if cat:
counts[cat] = counts.get(cat, 0) + 1
except Exception:
pass
return counts
def _read_guide_fm(name: str) -> dict:
"""카테고리 안내 글 frontmatter 반환. 없으면 {}."""
f = CATEGORY_DIR / f"{name}.md"
if not f.exists():
return {}
try:
fm, _ = parse_frontmatter(f.read_text(encoding="utf-8", errors="replace"))
return fm
except Exception:
return {}
def _build_tree_data() -> dict:
"""guide_names, guide_parents(name→parent|None), children_map(name→[child...]) 빌드."""
guide_names: set = set()
guide_parents: dict = {}
if CATEGORY_DIR.exists():
for f in CATEGORY_DIR.glob("*.md"):
if f.name.lower() == "readme.md":
continue
n = f.stem
guide_names.add(n)
try:
fm, _ = parse_frontmatter(f.read_text(encoding="utf-8", errors="replace"))
p = (fm.get("parent") or "").strip()
guide_parents[n] = p if p else None
except Exception:
guide_parents[n] = None
children_map: dict = {n: [] for n in guide_names}
for n, p in guide_parents.items():
if p and p in children_map:
children_map[p].append(n)
return {"guide_names": guide_names, "guide_parents": guide_parents, "children_map": children_map}
def _update_guide_fm_parent(guide_path, new_parent: str | None):
"""안내 글 파일의 parent 필드만 갱신. 나머지 내용 보존."""
try:
text = guide_path.read_text(encoding="utf-8", errors="replace")
except FileNotFoundError:
text = ""
fm, body = parse_frontmatter(text)
if new_parent:
fm["parent"] = new_parent
else:
fm.pop("parent", None)
if fm:
guide_path.write_text(serialize_frontmatter(fm, body), encoding="utf-8")
else:
guide_path.write_text(body, encoding="utf-8")
@app.route("/api/categories", methods=["GET"])
def api_categories_get():
"""카테고리 목록. 2단계 트리 구조. 부모 먼저, 자식이 부모 뒤에 오는 평면 배열 반환."""
try:
tree = _build_tree_data()
guide_names = tree["guide_names"]
guide_parents = tree["guide_parents"]
children_map = tree["children_map"]
counts = _count_items_by_category()
# 안내 글 없이 자료에만 박힌 이름, 또는 parent로 참조됐지만 파일 없는 이름도 포함
all_names = guide_names | set(counts.keys())
for n in list(guide_names):
p = guide_parents.get(n)
if p and p not in guide_names:
all_names.add(p)
# 각 카테고리 dict 빌드
cat_map: dict = {}
for name in all_names:
parent = guide_parents.get(name)
has_guide = name in guide_names
item_count_self = counts.get(name, 0)
children = sorted(children_map.get(name, []))
item_count_total = item_count_self + sum(counts.get(c, 0) for c in children)
orphan = False
if not has_guide:
orphan = True
elif parent and parent not in guide_names:
orphan = True
cat_map[name] = {
"name": name,
"parent": parent,
"has_guide": has_guide,
"guide_path": f"_정야총괄/카테고리/{name}.md" if has_guide else None,
"item_count_self": item_count_self,
"item_count_total": item_count_total,
"children": children,
"orphan": orphan,
}
# 정렬: 부모 먼저, 자식이 부모 바로 뒤
result: list = []
added: set = set()
def _add(name):
if name in added or name not in cat_map:
return
added.add(name)
result.append(cat_map[name])
for child in sorted(children_map.get(name, [])):
_add(child)
for n in sorted(n for n in cat_map if not cat_map[n]["parent"]):
_add(n)
for n in sorted(cat_map):
_add(n)
return jsonify({"categories": result})
except Exception as e:
log_error("GET /api/categories", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/categories", methods=["POST"])
def api_categories_post():
"""카테고리 신규 등록. parent 검증(2단계 제한) 포함."""
try:
body = request.get_json(force=True)
name = (body.get("name") or "").strip()
parent = (body.get("parent") or "").strip()
guide = (body.get("guide") or "").strip()
if not _validate_cat_name(name):
return jsonify({"ok": False, "error": "유효하지 않은 카테고리 이름"}), 400
target = CATEGORY_DIR / f"{name}.md"
if target.exists():
return jsonify({"ok": False, "error": "이미 존재하는 카테고리"}), 409
if parent:
if not (CATEGORY_DIR / f"{parent}.md").exists():
return jsonify({"ok": False, "error": f"부모 카테고리 '{parent}'가 존재하지 않습니다. 먼저 등록해주세요"}), 400
if (_read_guide_fm(parent).get("parent") or "").strip():
return jsonify({"ok": False, "error": f"'{parent}'는 이미 하위 카테고리입니다. 3단계 이상은 지원되지 않습니다"}), 400
if not guide:
guide = f"# {name}\n\n_안내 글 비어있음 — 이 카테고리가 뭔지·뭐 하려는지 적어주세요._"
if parent:
content = f'---\nparent: "{yaml_escape(parent)}"\n---\n{guide}'
else:
content = guide
target.write_text(content, encoding="utf-8")
return jsonify({"ok": True, "name": name})
except Exception as e:
log_error("POST /api/categories", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/categories/<name>", methods=["PUT"])
def api_category_update(name):
"""이름 변경 및/또는 부모 변경. 자료·자식 안내 글 일괄 갱신."""
try:
body = request.get_json(force=True)
new_name = (body.get("new_name") or "").strip() or name
new_parent_provided = "new_parent" in body
new_parent = (body.get("new_parent") or "").strip() if new_parent_provided else None
old_file = CATEGORY_DIR / f"{name}.md"
if not old_file.exists():
return jsonify({"ok": False, "error": "카테고리 없음"}), 404
# ── 이름 변경 검증
if new_name != name:
if not _validate_cat_name(new_name):
return jsonify({"ok": False, "error": "유효하지 않은 새 이름"}), 400
if (CATEGORY_DIR / f"{new_name}.md").exists():
return jsonify({"ok": False, "error": "새 이름이 이미 존재함"}), 409
# ── 부모 변경 검증
if new_parent_provided and new_parent:
if new_parent == name:
return jsonify({"ok": False, "error": "자기 자신을 부모로 설정할 수 없습니다"}), 400
if not (CATEGORY_DIR / f"{new_parent}.md").exists():
return jsonify({"ok": False, "error": f"부모 카테고리 '{new_parent}'가 존재하지 않습니다"}), 400
if (_read_guide_fm(new_parent).get("parent") or "").strip():
return jsonify({"ok": False, "error": f"'{new_parent}'는 이미 하위 카테고리입니다. 3단계 이상은 지원되지 않습니다"}), 400
tree = _build_tree_data()
if new_parent in tree["children_map"].get(name, []):
return jsonify({"ok": False, "error": "자식 카테고리를 부모로 설정할 수 없습니다 (순환 금지)"}), 400
# ── 1. 안내 글 parent 필드 갱신
if new_parent_provided:
_update_guide_fm_parent(old_file, new_parent if new_parent else None)
# ── 2. 파일 rename
if new_name != name:
new_file = CATEGORY_DIR / f"{new_name}.md"
old_file.rename(new_file)
# ── 3. 자료 category 배열 일괄 치환
migrated_count = 0
if new_name != name:
for folder in (SELECTION_DIR, ARCHIVE_DIR):
if not folder.exists():
continue
for f in folder.glob("*.md"):
try:
text = f.read_text(encoding="utf-8", errors="replace")
fm, body_text = parse_frontmatter(text)
cats = fm.get("category", [])
if isinstance(cats, list) and name in cats:
fm["category"] = [new_name if c == name else c for c in cats]
f.write_text(serialize_frontmatter(fm, body_text), encoding="utf-8")
migrated_count += 1
except Exception as ie:
log_error(f"category_update:item({f.name})", ie)
# ── 4. 자식 안내 글 parent 필드 치환
child_repointed_count = 0
if new_name != name and CATEGORY_DIR.exists():
for f in CATEGORY_DIR.glob("*.md"):
if f.name.lower() == "readme.md" or f.stem == new_name:
continue
try:
fm, body_text = parse_frontmatter(f.read_text(encoding="utf-8", errors="replace"))
if (fm.get("parent") or "").strip() == name:
fm["parent"] = new_name
f.write_text(serialize_frontmatter(fm, body_text), encoding="utf-8")
child_repointed_count += 1
except Exception as ge:
log_error(f"category_update:guide({f.name})", ge)
return jsonify({"ok": True, "migrated_count": migrated_count, "child_repointed_count": child_repointed_count})
except Exception as e:
log_error(f"PUT /api/categories/{name}", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/categories/<name>", methods=["DELETE"])
def api_category_delete(name):
"""카테고리 삭제. 자식이 있으면 거부. 자료 frontmatter는 orphan으로 유지."""
try:
guide_file = CATEGORY_DIR / f"{name}.md"
if not guide_file.exists():
return jsonify({"ok": False, "error": "카테고리 없음"}), 404
children = sorted(_build_tree_data()["children_map"].get(name, []))
if children:
return jsonify({"ok": False, "error": "자식 카테고리가 있어 삭제할 수 없습니다. 먼저 자식을 삭제하거나 다른 부모로 이동하세요.", "children": children}), 400
orphaned_count = _count_items_by_category().get(name, 0)
guide_file.unlink()
return jsonify({"ok": True, "orphaned_count": orphaned_count})
except Exception as e:
log_error(f"DELETE /api/categories/{name}", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/categories/<name>/안내글", methods=["GET"])
def api_category_guide_get(name):
"""카테고리 안내 글 원문 반환."""
try:
guide_file = CATEGORY_DIR / f"{name}.md"
if not guide_file.exists():
return jsonify({"ok": False, "error": "안내 글 없음"}), 404
return jsonify({"name": name, "body": guide_file.read_text(encoding="utf-8", errors="replace")})
except Exception as e:
log_error(f"GET /api/categories/{name}/안내글", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/categories/<name>/안내글", methods=["PUT"])
def api_category_guide_put(name):
"""카테고리 안내 글 전체 덮어쓰기."""
try:
req_body = request.get_json(force=True)
body_text = req_body.get("body", "")
CATEGORY_DIR.mkdir(parents=True, exist_ok=True)
(CATEGORY_DIR / f"{name}.md").write_text(body_text, encoding="utf-8")
return jsonify({"ok": True})
except Exception as e:
log_error(f"PUT /api/categories/{name}/안내글", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/categories/<name>/자료", methods=["GET"])
def api_category_items(name):
"""이 카테고리에 연결된 자료 목록. ?include_children=true(기본) 시 자식 자료 누적."""
try:
include_children = request.args.get("include_children", "true").lower() != "false"
names_to_match = {name}
if include_children:
names_to_match.update(_build_tree_data()["children_map"].get(name, []))
all_items = scan_folder(SELECTION_DIR) + scan_folder(ARCHIVE_DIR)
matched = [it for it in all_items if names_to_match & set(it.get("category") or [])]
return jsonify({"items": matched})
except Exception as e:
log_error(f"GET /api/categories/{name}/자료", e)
return jsonify({"ok": False, "error": str(e)}), 500
# ── 프롬프트 라이브러리 ──────────────────────────────────────
@app.route("/api/prompts", methods=["GET"])
def api_prompts_get():
try:
return jsonify(json.loads(PROMPTS_FILE.read_text(encoding="utf-8")))
except Exception as e:
log_error("GET /api/prompts", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/prompts", methods=["POST"])
def api_prompts_post():
try:
body = request.get_json(force=True)
PROMPTS_FILE.write_text(json.dumps(body, ensure_ascii=False, indent=2), encoding="utf-8")
return jsonify({"ok": True})
except Exception as e:
log_error("POST /api/prompts", e)
return jsonify({"ok": False, "error": str(e)}), 500
# ── 수집 탭 (capture 흡수) ────────────────────────────────────
HTTP_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept-Language": "ko-KR,ko;q=0.9",
}
TRACKING_PARAMS = {
"fbclid", "si", "igsh", "img_index",
"utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content",
"source", "proxyReferer", "trackingCode",
}
def clean_url(url):
parsed = urlparse(url)
params = parse_qs(parsed.query, keep_blank_values=True)
cleaned = {k: v for k, v in params.items() if k not in TRACKING_PARAMS}
return urlunparse(parsed._replace(query=urlencode(cleaned, doseq=True)))
def detect_platform(url):
parsed = urlparse(url)
domain = parsed.netloc.lower()
path = parsed.path
if "notion.site" in domain or "notion.so" in domain:
return "notion"
if "youtube.com" in domain or "youtu.be" in domain:
return "youtube_shorts" if "/shorts/" in path else "youtube"
if "instagram.com" in domain:
return "instagram_reel" if "/reel/" in path else "instagram"
if "blog.naver.com" in domain or "m.blog.naver.com" in domain:
return "naver"
if "slashpage.com" in domain:
return "slashpage"
return "misc"
def slug_title(url, platform):
path = unquote(urlparse(url).path).strip("/")
if platform == "notion":
t = re.sub(r'-?[0-9a-f]{32}$', '', path)
t = re.sub(r'-?[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$', '', t)
t = re.sub(r'[-_]+', ' ', t).strip()
return t or "(제목 없음)"
if platform == "youtube_shorts":
return "YouTube Shorts"
if platform == "youtube":
return "YouTube"
if platform == "instagram_reel":
return "Instagram Reel"
if platform == "instagram":
return "Instagram"
if platform == "naver":
return "Naver Blog"
if platform == "slashpage":
seg = path.split("/")[-1]
return seg if seg else "Slashpage"
seg = path.split("/")[-1]
return seg if seg else urlparse(url).netloc
def fetch_youtube_title(url, timeout=5):
try:
req = urllib.request.Request(
f"https://www.youtube.com/oembed?url={url}&format=json",
headers=HTTP_HEADERS,
)
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read()).get("title")
except Exception:
return None
def fetch_html_title(url, timeout=6):
try:
req = urllib.request.Request(url, headers=HTTP_HEADERS)
with urllib.request.urlopen(req, timeout=timeout) as r:
html = r.read(80000).decode(r.headers.get_content_charset() or "utf-8", errors="ignore")
m = re.search(r'<title[^>]*>([^<]{1,200})</title>', html, re.IGNORECASE)
if m:
t = re.sub(r'\s+', ' ', m.group(1)).strip()
t = re.split(r'\s*[|\-–]\s*(?:Naver|네이버|블로그|Blog|Slashpage).*$', t, flags=re.IGNORECASE)[0].strip()
return t or None
except Exception:
pass
return None
def parse_kakao_text(text):
entries = []
current_date = None
date_pat = re.compile(r'^(\d{4})년\s+(\d{1,2})월\s+(\d{1,2})일\s+\S+요일\s*$')
msg_pat = re.compile(r'^\[(.+?)\]\s+\[(오전|오후)\s+(\d+):(\d+)\]\s+(https?://\S+)\s*$')
for line in text.strip().splitlines():
line = line.strip()
if not line:
continue
m = date_pat.match(line)
if m:
y, mo, d = m.groups()
current_date = f"{y}-{int(mo):02d}-{int(d):02d}"
continue
m = msg_pat.match(line)
if m:
ampm, h, mi = m.group(2), int(m.group(3)), int(m.group(4))
if ampm == "오후" and h != 12:
h += 12
elif ampm == "오전" and h == 12:
h = 0
url = clean_url(m.group(5))
plat = detect_platform(url)
date_part = current_date or datetime.now().strftime("%Y-%m-%d")
captured_at = f"{date_part}T{h:02d}:{mi:02d}:00"
entries.append({
"id": str(uuid.uuid4()),
"title": slug_title(url, plat),
"source_url": url,
"platform": plat,
"v2_type": "외부자료",
"captured_at": captured_at,
"content": "",
"tags": [],
"memo": "",
})
return entries
def entry_to_md(entry):
fm = {
"title": entry.get("title", ""),
"type": entry.get("v2_type") or "외부자료",
"category": [],
"captured_at": entry.get("captured_at") or datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
"tags": entry.get("tags", []) or [],
"source_url": entry.get("source_url", "") or "",
"memo": entry.get("memo", "") or "",
}
plat = entry.get("platform", "")
if plat:
fm["platform"] = plat
content = entry.get("content", "") or ""
body = f"# {fm['title']}\n"
if content:
body += f"\n{content}\n"
return serialize_frontmatter(fm, body)
def filename_for(captured_at, title):
dt_part = (captured_at or "").replace("T", "_").replace(":", "")
if not dt_part:
dt_part = datetime.now().strftime("%Y-%m-%d_%H%M%S")
slug = make_slug(title)
return f"{dt_part}_{slug}.md" if slug else f"{dt_part}.md"
@app.route("/api/capture/kakao", methods=["POST"])
def api_capture_kakao():
try:
data = request.get_json(force=True)
return jsonify({"entries": parse_kakao_text(data.get("text", ""))})
except Exception as e:
log_error("POST /api/capture/kakao", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/capture/fetch", methods=["POST"])
def api_capture_fetch():
try:
data = request.get_json(force=True)
url = data.get("url", "")
plat = detect_platform(url)
title = None
status = "done"
if plat in ("youtube", "youtube_shorts"):
title = fetch_youtube_title(url)
if not title:
status = "failed"
elif plat in ("instagram", "instagram_reel"):
status = "skipped"
title = slug_title(url, plat)
else:
title = fetch_html_title(url)
if not title:
status = "failed"
if not title:
title = slug_title(url, plat)
if status == "done":
status = "failed"
return jsonify({"title": title, "platform": plat, "status": status})
except Exception as e:
log_error("POST /api/capture/fetch", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/capture/keep", methods=["POST"])
def api_capture_keep():
if "file" not in request.files:
return jsonify({"ok": False, "error": "파일 없음"}), 400
try:
f = request.files["file"]
entries = []
zf = zipfile.ZipFile(io.BytesIO(f.read()))
for name in zf.namelist():
if not re.search(r'Keep/[^/]+\.json$', name):
continue
with zf.open(name) as jf:
note = json.loads(jf.read().decode("utf-8"))
if note.get("isTrashed"):
continue
title = (note.get("title") or "").strip()
text_content = note.get("textContent", "") or ""
if not title:
first = text_content.strip().splitlines()[0] if text_content.strip() else ""
title = first[:100]
if "listContent" in note:
lines = []
for item in note["listContent"]:
checked = item.get("isChecked", False)
lines.append(f"- [{'x' if checked else ' '}] {item.get('text', '')}")
body_text = "\n".join(lines)
else:
body_text = text_content
tags = [lbl["name"] for lbl in note.get("labels", []) if "name" in lbl]
try:
sec = int(note.get("createdTimestampUsec", 0)) / 1_000_000
captured_at = datetime.utcfromtimestamp(sec).strftime("%Y-%m-%dT%H:%M:%S")
except Exception:
captured_at = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
entries.append({
"id": str(uuid.uuid4()),
"title": title or "(제목 없음)",
"source_url": "",
"platform": "keep",
"v2_type": "메모",
"captured_at": captured_at,
"content": body_text,
"tags": tags,
"memo": "",
})
return jsonify({"entries": entries})
except Exception as e:
log_error("POST /api/capture/keep", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/capture/text", methods=["POST"])
def api_capture_text():
try:
data = request.get_json(force=True)
text = data.get("text", "")
auto_split = data.get("auto_split", True)
now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
entries = []
if auto_split:
for chunk in re.split(r'\n(?=#)', text):
chunk = chunk.strip()
if not chunk.startswith("#"):
continue
lines = chunk.splitlines()
title = re.sub(r'^#+\s*', '', lines[0]).strip()
body_text = "\n".join(lines[1:]).strip()
entries.append({
"id": str(uuid.uuid4()),
"title": title or "(제목 없음)",
"source_url": "",
"platform": "text",
"v2_type": "메모",
"captured_at": now,
"content": body_text,
"tags": [],
"memo": "",
})
else:
manual_title = (data.get("title") or "").strip()
if not manual_title:
first = text.strip().splitlines()[0] if text.strip() else ""
manual_title = first[:100]
entries.append({
"id": str(uuid.uuid4()),
"title": manual_title or "(제목 없음)",
"source_url": "",
"platform": "text",
"v2_type": "메모",
"captured_at": now,
"content": text,
"tags": [],
"memo": "",
})
return jsonify({"entries": entries})
except Exception as e:
log_error("POST /api/capture/text", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/capture/memo", methods=["POST"])
def api_capture_memo():
try:
data = request.get_json(force=True)
title = (data.get("title") or "").strip()
body_text = (data.get("body") or "").strip()
if not body_text:
return jsonify({"ok": False, "error": "본문 필수"}), 400
if not title:
title = body_text.splitlines()[0][:50]
now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
entry = {
"title": title,
"source_url": "",
"platform": "memo",
"v2_type": "메모",
"captured_at": now,
"content": body_text,
"tags": [],
"memo": "",
}
fname = filename_for(now, title)
dest = unique_path(SELECTION_DIR, fname)
dest.write_text(entry_to_md(entry), encoding="utf-8")
return jsonify({"ok": True, "filename": dest.name})
except Exception as e:
log_error("POST /api/capture/memo", e)
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/capture/save", methods=["POST"])
def api_capture_save():
try:
data = request.get_json(force=True)
entries = data.get("entries", [])
saved = 0
failed = 0
files = []
for entry in entries:
try:
fname = filename_for(entry.get("captured_at", ""), entry.get("title", ""))
dest = unique_path(SELECTION_DIR, fname)
dest.write_text(entry_to_md(entry), encoding="utf-8")
files.append(dest.name)
saved += 1
except Exception as e:
log_error("capture/save:entry", e)
failed += 1
return jsonify({"ok": True, "saved": saved, "failed": failed, "files": files})
except Exception as e:
log_error("POST /api/capture/save", e)
return jsonify({"ok": False, "error": str(e)}), 500
# ── Main ─────────────────────────────────────────────────────
if __name__ == "__main__":
init_data()
print("정야 운영판 v0.3 → http://localhost:5002")
print(f" 선별 대기: {SELECTION_DIR}")
print(f" 아카이브: {ARCHIVE_DIR}")
print(f" 휴지통: {TRASH_DIR}")
print("종료: Ctrl+C")
app.run(host="127.0.0.1", port=5002, debug=False)