104 lines
3.1 KiB
Python
104 lines
3.1 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
from types import SimpleNamespace
|
|
from pathlib import Path
|
|
|
|
import sqlite3
|
|
|
|
from funkyjuicerecipes.data.db import (
|
|
DB_FILENAME,
|
|
init_database,
|
|
get_db,
|
|
apply_migrations,
|
|
)
|
|
import funkyjuicerecipes.data.db as dbmod
|
|
|
|
|
|
class DummyPaths(SimpleNamespace):
|
|
@property
|
|
def data(self) -> Path: # type: ignore[override]
|
|
return self._data
|
|
|
|
@data.setter
|
|
def data(self, value: Path) -> None:
|
|
self._data = Path(value)
|
|
|
|
|
|
class DummyApp(SimpleNamespace):
|
|
def __init__(self, data_dir: Path) -> None:
|
|
super().__init__()
|
|
self.paths = DummyPaths()
|
|
self.paths.data = data_dir
|
|
|
|
|
|
def _pkg_db_path() -> Path:
|
|
# New behavior stores DB next to the data module; _db_path_for_app ignores the app argument
|
|
return Path(dbmod._db_path_for_app(None)) # type: ignore[arg-type]
|
|
|
|
|
|
def _cleanup_pkg_db() -> None:
|
|
p = _pkg_db_path()
|
|
try:
|
|
if p.exists():
|
|
p.unlink()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _tables(conn: sqlite3.Connection) -> set[str]:
|
|
cur = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
|
|
)
|
|
return {row[0] for row in cur.fetchall()}
|
|
|
|
|
|
def test_init_database_creates_file_and_tables(tmp_path: Path):
|
|
_cleanup_pkg_db()
|
|
app = DummyApp(tmp_path / "data")
|
|
|
|
db = init_database(app)
|
|
|
|
db_path = _pkg_db_path()
|
|
assert db_path.exists(), "DB file should be created next to the package (data folder)"
|
|
|
|
# Use sqlite3 to introspect tables
|
|
with sqlite3.connect(str(db_path)) as conn:
|
|
names = _tables(conn)
|
|
# migrations + our domain tables (including nicotine)
|
|
assert {"migrations", "flavor", "recipe", "recipe_flavor", "nicotine"}.issubset(names)
|
|
|
|
# migrations table should have entries for all applied migrations
|
|
cur = conn.execute("SELECT filename, batch FROM migrations ORDER BY filename")
|
|
rows = cur.fetchall()
|
|
# We now have more migrations over time; ensure at least the base set were applied
|
|
assert len(rows) >= 3
|
|
filenames = [r[0] for r in rows]
|
|
# Ensure expected filenames are present
|
|
assert filenames == sorted(filenames)
|
|
assert any("flavor_table_created" in f for f in filenames)
|
|
assert any("recipe_table_created" in f for f in filenames)
|
|
assert any("recipe_flavor_table_created" in f for f in filenames)
|
|
# New nicotine/inventory migrations should also be present
|
|
assert any("nicotine_table_created" in f for f in filenames)
|
|
# All migrations applied in one run should share the same batch
|
|
batches = {r[1] for r in rows}
|
|
assert len(batches) == 1
|
|
|
|
|
|
def test_apply_migrations_is_idempotent(tmp_path: Path):
|
|
_cleanup_pkg_db()
|
|
app = DummyApp(tmp_path / "data2")
|
|
db = init_database(app)
|
|
|
|
# Second application should detect none pending
|
|
pending = apply_migrations(db)
|
|
assert pending == []
|
|
|
|
# migrations count remains stable after re-run
|
|
db_path = _pkg_db_path()
|
|
with sqlite3.connect(str(db_path)) as conn:
|
|
cur = conn.execute("SELECT COUNT(*) FROM migrations")
|
|
(count,) = cur.fetchone()
|
|
assert count >= 3
|