Source code for scitex_dataset.database

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Timestamp: "2026-01-29 22:50:00 (ywatanabe)"
# File: /home/ywatanabe/proj/scitex-dataset/src/scitex_dataset/database.py

"""
Local SQLite database for fast dataset searching.

Usage:
    >>> from scitex_dataset import database as db
    >>> db.build()  # Fetch all sources and build database
    >>> results = db.search("alzheimer EEG", min_subjects=20)
"""

from __future__ import annotations

import json
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional

from scitex_dev.decorators import supports_return_as

from ._config import runtime_dir


def _default_db_path() -> Path:
    """Resolve the local SQLite path via the SciTeX local-state layout.

    Project scope wins over user scope; ``SCITEX_DIR`` relocates user
    scope. See ``general/01_ecosystem_06_local-state-directories``.
    """
    return runtime_dir() / "datasets.db"


# Kept as a property-style accessor; do not freeze at import time.
DEFAULT_DB_PATH = _default_db_path()

__all__ = [
    "build",
    "update",
    "search",
    "get_stats",
    "get_db_path",
    "clear",
]


[docs] def get_db_path() -> Path: """Get the database file path.""" return DEFAULT_DB_PATH
def _get_connection(db_path: Optional[Path] = None) -> sqlite3.Connection: """Get database connection, creating tables if needed.""" path = db_path or _default_db_path() path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(path) conn.row_factory = sqlite3.Row # Create tables if not exist conn.executescript(""" CREATE TABLE IF NOT EXISTS datasets ( id TEXT PRIMARY KEY, source TEXT NOT NULL, name TEXT, created TEXT, modified TEXT, n_subjects INTEGER DEFAULT 0, size_gb REAL DEFAULT 0, downloads INTEGER DEFAULT 0, views INTEGER DEFAULT 0, readme TEXT, license TEXT, doi TEXT, url TEXT, modalities TEXT, -- JSON array tasks TEXT, -- JSON array primary_modality TEXT, data_json TEXT, -- Full dataset as JSON indexed_at TEXT ); CREATE TABLE IF NOT EXISTS metadata ( key TEXT PRIMARY KEY, value TEXT ); CREATE INDEX IF NOT EXISTS idx_source ON datasets(source); CREATE INDEX IF NOT EXISTS idx_modalities ON datasets(modalities); CREATE INDEX IF NOT EXISTS idx_n_subjects ON datasets(n_subjects); CREATE INDEX IF NOT EXISTS idx_downloads ON datasets(downloads); CREATE VIRTUAL TABLE IF NOT EXISTS datasets_fts USING fts5( id, name, readme, tasks, content='datasets', content_rowid='rowid' ); -- Triggers to keep FTS in sync CREATE TRIGGER IF NOT EXISTS datasets_ai AFTER INSERT ON datasets BEGIN INSERT INTO datasets_fts(rowid, id, name, readme, tasks) VALUES (new.rowid, new.id, new.name, new.readme, new.tasks); END; CREATE TRIGGER IF NOT EXISTS datasets_ad AFTER DELETE ON datasets BEGIN INSERT INTO datasets_fts(datasets_fts, rowid, id, name, readme, tasks) VALUES ('delete', old.rowid, old.id, old.name, old.readme, old.tasks); END; CREATE TRIGGER IF NOT EXISTS datasets_au AFTER UPDATE ON datasets BEGIN INSERT INTO datasets_fts(datasets_fts, rowid, id, name, readme, tasks) VALUES ('delete', old.rowid, old.id, old.name, old.readme, old.tasks); INSERT INTO datasets_fts(rowid, id, name, readme, tasks) VALUES (new.rowid, new.id, new.name, new.readme, new.tasks); END; """) return conn def _insert_dataset( conn: sqlite3.Connection, dataset: Dict[str, Any], source: str, ) -> None: """Insert or update a dataset in the database.""" modalities = json.dumps(dataset.get("modalities", [])) tasks = json.dumps(dataset.get("tasks", [])) conn.execute( """ INSERT OR REPLACE INTO datasets ( id, source, name, created, modified, n_subjects, size_gb, downloads, views, readme, license, doi, url, modalities, tasks, primary_modality, data_json, indexed_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( f"{source}:{dataset.get('id', '')}", source, dataset.get("name"), dataset.get("created"), dataset.get("modified"), dataset.get("n_subjects", 0), dataset.get("size_gb", 0), dataset.get("downloads", 0), dataset.get("views", 0), dataset.get("readme"), dataset.get("license"), dataset.get("doi"), dataset.get("url"), modalities, tasks, dataset.get("primary_modality"), json.dumps(dataset), datetime.now().isoformat(), ), )
[docs] @supports_return_as def build( sources: Optional[List[str]] = None, db_path: Optional[Path] = None, logger=None, ) -> Dict[str, int]: """Build the local database from all sources. Parameters ---------- sources : list, optional Sources to fetch: ["openneuro", "dandi", "physionet"]. Default: all sources. db_path : Path, optional Database file path. Default: $SCITEX_DIR/dataset/runtime/datasets.db (~/.scitex/dataset/runtime/datasets.db when SCITEX_DIR is unset). logger : optional Logger for progress messages. Returns ------- dict Count of datasets indexed per source. """ if sources is None: from ._sources import CATALOG_SOURCES # HuggingFace is NOT included by default — its catalog is unbounded # and would dominate the index. Pass `sources=["huggingface", ...]` # explicitly to opt in (uses query="" + max=1000 cap). sources = list(CATALOG_SOURCES) conn = _get_connection(db_path) counts = {} for source in sources: if logger: logger.info(f"Fetching from {source}...") try: if source == "openneuro": from .neuroscience.openneuro import fetch_all_datasets, format_dataset elif source == "dandi": from .neuroscience.dandi import fetch_all_datasets, format_dataset elif source == "physionet": from .neuroscience.physionet import fetch_all_datasets, format_dataset elif source == "zenodo": from .general.zenodo import fetch_all_datasets, format_dataset elif source == "figshare": from .general.figshare import fetch_all_datasets, format_dataset elif source == "openml": from .general.openml import fetch_all_datasets, format_dataset elif source == "geo": from .biology.geo import fetch_all_datasets, format_dataset elif source == "chembl": from .pharmacology.chembl import fetch_all_datasets, format_dataset elif source == "moleculenet": from .pharmacology.moleculenet import fetch_all_datasets, format_dataset elif source == "clinicaltrials": from .medical.clinicaltrials import fetch_all_datasets, format_dataset elif source == "huggingface": from .general.huggingface import fetch_all_datasets, format_dataset else: if logger: logger.warning(f"Unknown source: {source}") continue raw = fetch_all_datasets(logger=logger) datasets = [format_dataset(ds) for ds in raw] for ds in datasets: _insert_dataset(conn, ds, source) counts[source] = len(datasets) if logger: logger.info(f"Indexed {len(datasets)} from {source}") except Exception as exc: if logger: logger.error(f"Error fetching {source}: {exc}") counts[source] = 0 # Update metadata conn.execute( "INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)", ("last_build", datetime.now().isoformat()), ) conn.execute( "INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)", ("total_datasets", str(sum(counts.values()))), ) conn.commit() conn.close() return counts
[docs] def update( source: str, db_path: Optional[Path] = None, logger=None, ) -> int: """Update a single source in the database. Parameters ---------- source : str Source to update: "openneuro", "dandi", or "physionet". db_path : Path, optional Database file path. logger : optional Logger for progress messages. Returns ------- int Number of datasets indexed. """ result = build(sources=[source], db_path=db_path, logger=logger) return result.get(source, 0)
[docs] @supports_return_as def get_stats(db_path: Optional[Path] = None) -> Dict[str, Any]: """Get database statistics. Returns ------- dict Statistics including counts per source, last build time, etc. """ path = db_path or _default_db_path() if not path.exists(): return {"exists": False, "message": "Database not built. Run: db.build()"} conn = _get_connection(db_path) # Get counts per source cursor = conn.execute( "SELECT source, COUNT(*) as count FROM datasets GROUP BY source" ) source_counts = {row["source"]: row["count"] for row in cursor} # Get metadata cursor = conn.execute("SELECT key, value FROM metadata") metadata = {row["key"]: row["value"] for row in cursor} # Get total cursor = conn.execute("SELECT COUNT(*) as total FROM datasets") total = cursor.fetchone()["total"] conn.close() return { "exists": True, "path": str(path), "total_datasets": total, "by_source": source_counts, "last_build": metadata.get("last_build"), "size_mb": round(path.stat().st_size / (1024 * 1024), 2), }
[docs] def clear(db_path: Optional[Path] = None) -> bool: """Delete the database file. Returns ------- bool True if deleted, False if didn't exist. """ path = db_path or _default_db_path() if path.exists(): path.unlink() return True return False
# EOF