feat(db,models,dao,logic): nicotine inventory + flavor snapshots; drop legacy nic_pct; migrations, tests

• Migrations
	◦ Create nicotine table with NOCASE strings, base PG/VG check, strength_mg_per_ml (>0), soft-delete, and optional inventory metadata (bottle_size_ml, cost, purchase_date).
	◦ Alter recipe: add nic_target_mg_per_ml (NOT NULL), nic_strength_mg_per_ml (NOT NULL), nicotine_id (FK ON DELETE SET NULL); keep nic_base. Drop legacy nic_pct via rebuild migration.
	◦ Alter flavor: add bottle_size_ml, cost, purchase_date (nullable).
	◦ Alter recipe_flavor: add flavor snapshot columns (flavor_name_snapshot, flavor_company_snapshot NOCASE, flavor_base_snapshot PG/VG CHECK).
	◦ Ensure migration order runs “_created” before ALTERs.
• Models
	◦ Add Nicotine model.
	◦ Update Recipe with nic_target_mg_per_ml, nic_strength_mg_per_ml, nic_base, optional nicotine FK.
	◦ Update Flavor with optional inventory metadata.
	◦ Update RecipeFlavor with snapshot fields and unique index remains.
• DAOs
	◦ Add NicotineRepository (CRUD, list with soft-delete filters; hard_delete_when_safe blocked when referenced by recipes).
	◦ Update RecipeRepository:
		▪ create/update accept nic_target_mg_per_ml, nic_strength_mg_per_ml, nic_base, optional nicotine_id.
		▪ If nicotine_id provided, snapshot strength/base from inventory unless overridden.
		▪ replace_ingredients uses diff-based updates and populates flavor snapshots on insert.
	◦ Update FlavorRepository to edit inventory metadata; soft/undelete; hard delete only when safe.
• DB init
	◦ Bind Peewee models to runtime DB; tweak migration ordering logic.
• Logic
	◦ Add calculation engine (compute_breakdown, breakdown_for_view) supporting target nicotine mg/mL and stock strength mg/mL; rounding policy for display.
• Tests
	◦ Update/init DB tests to include nicotine table and flexible migration counts.
	◦ Add calculation tests (120 mL example, nic in VG, 50 mg/mL stock, validations).
	◦ Add DAO tests including flavor snapshots stability and replace_ingredients diff behavior.
	◦ All tests passing: 15 passed.
• Docs
	◦ Update SPECS and MILESTONES; strike through Milestones 3 and 4.

BREAKING CHANGE: remove Recipe.nic_pct field and any compatibility code. API changes in RecipeRepository.create/update now require mg/mL fields and use snapshot+FK model for nicotine.
This commit is contained in:
Funky Waddle 2025-12-18 19:55:22 -06:00
parent 8b8b04539e
commit c8655e24e4
20 changed files with 826 additions and 40 deletions

View file

@ -37,13 +37,13 @@ This document outlines the sequence of milestones to deliver the FunkyJuice Reci
--- ---
### Milestone 4 — Calculation engine ### ~~Milestone 4 — Calculation engine~~
- Implement pure functions in `logic/calculations.py` to compute PG/VG/Nic and per-flavor mL from inputs (percents + size). - ~~Implement pure functions in `logic/calculations.py` to compute PG/VG/Nic and per-flavor mL from inputs (percents + size).~~
- Include validation (non-negative PG/VG; base_pg + base_vg = 100; percent totals ≈ 100 within tolerance). - ~~Include validation (non-negative PG/VG; base_pg + base_vg = 100; percent totals ≈ 100 within tolerance).~~
- Define rounding/display policy (e.g., 1 decimal for View screen). - ~~Define rounding/display policy (e.g., 1 decimal for View screen).~~
Acceptance: ~~Acceptance:~~
- Tests cover the README example (120 mL case) and edge cases (nic in PG vs VG, excessive flavor %). - ~~Tests cover the README example (120 mL case) and edge cases (nic in PG vs VG, excessive flavor %).~~
--- ---

View file

@ -128,8 +128,10 @@ recipe(
size_ml INTEGER NOT NULL, size_ml INTEGER NOT NULL,
base_pg_pct REAL NOT NULL, base_pg_pct REAL NOT NULL,
base_vg_pct REAL NOT NULL, base_vg_pct REAL NOT NULL,
nic_pct REAL CHECK(nic_pct >= 0 AND nic_pct <= 100) NOT NULL, nic_target_mg_per_ml REAL CHECK(nic_target_mg_per_ml >= 0) NOT NULL,
nic_strength_mg_per_ml REAL CHECK(nic_strength_mg_per_ml > 0) NOT NULL,
nic_base TEXT CHECK(nic_base IN ('PG','VG')) NOT NULL, nic_base TEXT CHECK(nic_base IN ('PG','VG')) NOT NULL,
nicotine_id INTEGER NULL REFERENCES nicotine(id) ON DELETE SET NULL,
CHECK ( CHECK (
base_pg_pct >= 0 AND base_pg_pct >= 0 AND
base_vg_pct >= 0 AND base_vg_pct >= 0 AND
@ -169,12 +171,14 @@ migrations(
* Size (ml), * Size (ml),
* PG Base %, * PG Base %,
* VG base %, * VG base %,
* Nic Base (PG/VG) * Nic target strength (mg/mL),
* Nic %, * Nic stock strength (mg/mL),
* Nic Base (PG/VG),
* Flavor List * Flavor List
* Recipe equations are: * Recipe equations are:
* flavor_pg_pct = sum(pct for flavor in flavors if flavor.base == 'PG') * flavor_pg_pct = sum(pct for flavor in flavors if flavor.base == 'PG')
* flavor_vg_pct = sum(pct for flavor in flavors if flavor.base == 'VG') * flavor_vg_pct = sum(pct for flavor in flavors if flavor.base == 'VG')
* nic_pct = (nic_target_mg_per_ml / nic_strength_mg_per_ml) * 100 # nicotine volume percent of final mix
* pg_pct = base_pg_pct - flavor_pg_pct - (nic_pct if nic_base == 'PG' else 0) * pg_pct = base_pg_pct - flavor_pg_pct - (nic_pct if nic_base == 'PG' else 0)
* vg_pct = base_vg_pct - flavor_vg_pct - (nic_pct if nic_base == 'VG' else 0) * vg_pct = base_vg_pct - flavor_vg_pct - (nic_pct if nic_base == 'VG' else 0)
* PG_ml = pg_pct / 100 * size_ml * PG_ml = pg_pct / 100 * size_ml

View file

@ -36,7 +36,17 @@ class FlavorRepository:
def get(self, flavor_id: int) -> Flavor: def get(self, flavor_id: int) -> Flavor:
return Flavor.get_by_id(flavor_id) return Flavor.get_by_id(flavor_id)
def update(self, flavor_id: int, *, name: Optional[str] = None, company: Optional[str] = None, base: Optional[str] = None) -> Flavor: def update(
self,
flavor_id: int,
*,
name: Optional[str] = None,
company: Optional[str] = None,
base: Optional[str] = None,
bottle_size_ml: Optional[float] = None,
cost: Optional[float] = None,
purchase_date: Optional[dt.datetime] = None,
) -> Flavor:
f = self.get(flavor_id) f = self.get(flavor_id)
if name is not None: if name is not None:
f.name = name f.name = name
@ -44,6 +54,12 @@ class FlavorRepository:
f.company = company f.company = company
if base is not None: if base is not None:
f.base = base f.base = base
if bottle_size_ml is not None:
f.bottle_size_ml = bottle_size_ml
if cost is not None:
f.cost = cost
if purchase_date is not None:
f.purchase_date = purchase_date
f.save() f.save()
return f return f

View file

@ -0,0 +1,115 @@
from __future__ import annotations
import datetime as dt
from typing import Iterable, List, Optional
from peewee import DoesNotExist, IntegrityError
from ..models.nicotine import Nicotine
from ..models.recipe import Recipe
class NicotineRepository:
def create(
self,
*,
name: str,
company: str,
base: str,
strength_mg_per_ml: float,
bottle_size_ml: Optional[float] = None,
cost: Optional[float] = None,
purchase_date: Optional[dt.datetime] = None,
) -> Nicotine:
try:
return Nicotine.create(
name=name,
company=company,
base=base,
strength_mg_per_ml=strength_mg_per_ml,
bottle_size_ml=bottle_size_ml,
cost=cost,
purchase_date=purchase_date,
)
except IntegrityError as e:
raise e
def list(
self,
*,
include_deleted: bool = False,
only_deleted: bool = False,
order_by: Iterable = (Nicotine.company, Nicotine.name),
) -> List[Nicotine]:
q = Nicotine.select()
if only_deleted:
q = q.where(Nicotine.is_deleted == True)
elif not include_deleted:
q = q.where(Nicotine.is_deleted == False)
if order_by:
q = q.order_by(*order_by)
return list(q)
def get(self, nicotine_id: int) -> Nicotine:
return Nicotine.get_by_id(nicotine_id)
def update(
self,
nicotine_id: int,
*,
name: Optional[str] = None,
company: Optional[str] = None,
base: Optional[str] = None,
strength_mg_per_ml: Optional[float] = None,
bottle_size_ml: Optional[float] = None,
cost: Optional[float] = None,
purchase_date: Optional[dt.datetime] = None,
) -> Nicotine:
n = self.get(nicotine_id)
if name is not None:
n.name = name
if company is not None:
n.company = company
if base is not None:
n.base = base
if strength_mg_per_ml is not None:
n.strength_mg_per_ml = strength_mg_per_ml
if bottle_size_ml is not None:
n.bottle_size_ml = bottle_size_ml
if cost is not None:
n.cost = cost
if purchase_date is not None:
n.purchase_date = purchase_date
n.save()
return n
def soft_delete(self, nicotine_id: int) -> bool:
n = self.get(nicotine_id)
if not n.is_deleted:
n.is_deleted = True
n.deleted_at = dt.datetime.utcnow()
n.save()
return True
def undelete(self, nicotine_id: int) -> bool:
n = self.get(nicotine_id)
if n.is_deleted:
n.is_deleted = False
n.deleted_at = None
n.save()
return True
def hard_delete_when_safe(self, nicotine_id: int) -> bool:
# If referenced by recipes (FK), block hard delete
ref_exists = Recipe.select().where(Recipe.nicotine == nicotine_id).exists()
if ref_exists:
return False
try:
n = self.get(nicotine_id)
except DoesNotExist:
return True
n.delete_instance()
return True
__all__ = ["NicotineRepository"]

View file

@ -7,6 +7,7 @@ from peewee import IntegrityError
from ..models.recipe import Recipe from ..models.recipe import Recipe
from ..models.recipe_flavor import RecipeFlavor from ..models.recipe_flavor import RecipeFlavor
from ..models.flavor import Flavor from ..models.flavor import Flavor
from ..models.nicotine import Nicotine
Ingredient = Tuple[int, float] # (flavor_id, pct) Ingredient = Tuple[int, float] # (flavor_id, pct)
@ -20,17 +21,47 @@ class RecipeRepository:
size_ml: int, size_ml: int,
base_pg_pct: float, base_pg_pct: float,
base_vg_pct: float, base_vg_pct: float,
nic_pct: float, nic_target_mg_per_ml: float,
nic_base: str, nic_strength_mg_per_ml: Optional[float] = None,
nic_base: Optional[str] = None,
nicotine_id: Optional[int] = None,
ingredients: Optional[Sequence[Ingredient]] = None, ingredients: Optional[Sequence[Ingredient]] = None,
) -> Recipe: ) -> Recipe:
# If a nicotine inventory item is provided, use it to fill snapshot defaults
inv_strength: Optional[float] = None
inv_base: Optional[str] = None
inv_obj: Optional[Nicotine] = None
if nicotine_id is not None:
try:
inv_obj = Nicotine.get_by_id(nicotine_id)
inv_strength = float(inv_obj.strength_mg_per_ml)
inv_base = str(inv_obj.base)
except Exception:
inv_obj = None
inv_strength = None
inv_base = None
# Resolve snapshot values: explicit args take precedence, then inventory, then defaults
resolved_strength = (
float(nic_strength_mg_per_ml)
if nic_strength_mg_per_ml is not None
else (inv_strength if inv_strength is not None else 100.0)
)
resolved_base = (
str(nic_base)
if nic_base is not None
else (inv_base if inv_base is not None else "PG")
)
recipe = Recipe.create( recipe = Recipe.create(
name=name, name=name,
size_ml=size_ml, size_ml=size_ml,
base_pg_pct=base_pg_pct, base_pg_pct=base_pg_pct,
base_vg_pct=base_vg_pct, base_vg_pct=base_vg_pct,
nic_pct=nic_pct, nic_target_mg_per_ml=nic_target_mg_per_ml,
nic_base=nic_base, nic_strength_mg_per_ml=resolved_strength,
nic_base=resolved_base,
nicotine=inv_obj if inv_obj is not None else None,
) )
if ingredients: if ingredients:
self.replace_ingredients(recipe.id, ingredients) self.replace_ingredients(recipe.id, ingredients)
@ -60,8 +91,10 @@ class RecipeRepository:
size_ml: Optional[int] = None, size_ml: Optional[int] = None,
base_pg_pct: Optional[float] = None, base_pg_pct: Optional[float] = None,
base_vg_pct: Optional[float] = None, base_vg_pct: Optional[float] = None,
nic_pct: Optional[float] = None, nic_target_mg_per_ml: Optional[float] = None,
nic_strength_mg_per_ml: Optional[float] = None,
nic_base: Optional[str] = None, nic_base: Optional[str] = None,
nicotine_id: Optional[int] = None,
) -> Recipe: ) -> Recipe:
r = self.get(recipe_id) r = self.get(recipe_id)
if name is not None: if name is not None:
@ -72,10 +105,17 @@ class RecipeRepository:
r.base_pg_pct = base_pg_pct r.base_pg_pct = base_pg_pct
if base_vg_pct is not None: if base_vg_pct is not None:
r.base_vg_pct = base_vg_pct r.base_vg_pct = base_vg_pct
if nic_pct is not None: if nic_target_mg_per_ml is not None:
r.nic_pct = nic_pct r.nic_target_mg_per_ml = nic_target_mg_per_ml
if nic_strength_mg_per_ml is not None:
r.nic_strength_mg_per_ml = nic_strength_mg_per_ml
if nic_base is not None: if nic_base is not None:
r.nic_base = nic_base r.nic_base = nic_base
if nicotine_id is not None:
try:
r.nicotine = Nicotine.get_by_id(nicotine_id)
except Exception:
r.nicotine = None
r.save() r.save()
return r return r
@ -133,7 +173,20 @@ class RecipeRepository:
) )
for fid in to_add: for fid in to_add:
RecipeFlavor.create(recipe=recipe_id, flavor=fid, pct=desired[fid]) # Populate flavor snapshots at time of insert
try:
fl = Flavor.get_by_id(fid)
RecipeFlavor.create(
recipe=recipe_id,
flavor=fid,
pct=desired[fid],
flavor_name_snapshot=fl.name,
flavor_company_snapshot=fl.company,
flavor_base_snapshot=fl.base,
)
except Exception:
# If flavor not found (shouldn't happen due to FK), fall back without snapshots
RecipeFlavor.create(recipe=recipe_id, flavor=fid, pct=desired[fid])
def delete(self, recipe_id: int) -> None: def delete(self, recipe_id: int) -> None:
r = self.get(recipe_id) r = self.get(recipe_id)

View file

@ -85,8 +85,9 @@ def _migration_modules() -> List[str]:
name = res.name name = res.name
if name.endswith(".py") and not name.startswith("_"): if name.endswith(".py") and not name.startswith("_"):
files.append(name) files.append(name)
# Sort by filename to define application order # Sort by filename, but ensure "*_created*" migrations run before others
files.sort() # so that any later ALTERs for the same table don't fail on fresh DBs.
files.sort(key=lambda n: (0 if "_created" in n else 1, n))
return files return files

View file

@ -0,0 +1,14 @@
from __future__ import annotations
def apply(db):
# Add optional inventory metadata to flavor
db.execute_sql(
"ALTER TABLE flavor ADD COLUMN bottle_size_ml REAL NULL"
)
db.execute_sql(
"ALTER TABLE flavor ADD COLUMN cost REAL NULL"
)
db.execute_sql(
"ALTER TABLE flavor ADD COLUMN purchase_date DATETIME NULL"
)

View file

@ -0,0 +1,24 @@
from __future__ import annotations
def apply(db):
db.execute_sql(
(
"CREATE TABLE IF NOT EXISTS nicotine (\n"
" id INTEGER PRIMARY KEY,\n"
" name TEXT COLLATE NOCASE NOT NULL,\n"
" company TEXT COLLATE NOCASE NOT NULL,\n"
" base TEXT CHECK(base IN ('PG','VG')) NOT NULL,\n"
" strength_mg_per_ml REAL CHECK(strength_mg_per_ml > 0) NOT NULL,\n"
" bottle_size_ml REAL NULL,\n"
" cost REAL NULL,\n"
" purchase_date DATETIME NULL,\n"
" is_deleted INTEGER NOT NULL DEFAULT 0,\n"
" deleted_at DATETIME NULL,\n"
" UNIQUE(company, name, strength_mg_per_ml, base)\n"
")"
)
)
db.execute_sql(
"CREATE INDEX IF NOT EXISTS idx_nicotine_company_name ON nicotine(company, name)"
)

View file

@ -0,0 +1,15 @@
from __future__ import annotations
def apply(db):
# Add snapshot columns for flavor identity at time of recipe creation
# Allow NULL for backward compatibility; application code will populate on insert.
db.execute_sql(
"ALTER TABLE recipe_flavor ADD COLUMN flavor_name_snapshot TEXT NULL"
)
db.execute_sql(
"ALTER TABLE recipe_flavor ADD COLUMN flavor_company_snapshot TEXT COLLATE NOCASE NULL"
)
db.execute_sql(
"ALTER TABLE recipe_flavor ADD COLUMN flavor_base_snapshot TEXT NULL CHECK(flavor_base_snapshot IN ('PG','VG'))"
)

View file

@ -0,0 +1,18 @@
from __future__ import annotations
def apply(db):
# Add columns for nicotine snapshot and optional inventory FK reference
db.execute_sql(
"ALTER TABLE recipe ADD COLUMN nic_target_mg_per_ml REAL NOT NULL DEFAULT 0"
)
db.execute_sql(
"ALTER TABLE recipe ADD COLUMN nic_strength_mg_per_ml REAL NOT NULL DEFAULT 100.0"
)
db.execute_sql(
"ALTER TABLE recipe ADD COLUMN nicotine_id INTEGER NULL"
)
# Optional index to speed up lookups by nicotine_id
db.execute_sql(
"CREATE INDEX IF NOT EXISTS idx_recipe_nicotine_id ON recipe(nicotine_id)"
)

View file

@ -0,0 +1,50 @@
from __future__ import annotations
def apply(db):
"""Remove legacy nic_pct from recipe by rebuilding the table.
This approach is compatible with older SQLite versions that don't support
ALTER TABLE ... DROP COLUMN.
"""
# Create new table without nic_pct, preserving constraints and FK
db.execute_sql(
(
"CREATE TABLE IF NOT EXISTS recipe__new (\n"
" id INTEGER PRIMARY KEY,\n"
" name TEXT NOT NULL UNIQUE,\n"
" size_ml INTEGER NOT NULL,\n"
" base_pg_pct REAL NOT NULL,\n"
" base_vg_pct REAL NOT NULL,\n"
" nic_target_mg_per_ml REAL CHECK(nic_target_mg_per_ml >= 0) NOT NULL,\n"
" nic_strength_mg_per_ml REAL CHECK(nic_strength_mg_per_ml > 0) NOT NULL,\n"
" nic_base TEXT CHECK(nic_base IN ('PG','VG')) NOT NULL,\n"
" nicotine_id INTEGER NULL REFERENCES nicotine(id) ON DELETE SET NULL,\n"
" CHECK (\n"
" base_pg_pct >= 0 AND\n"
" base_vg_pct >= 0 AND\n"
" (base_pg_pct + base_vg_pct) = 100\n"
" )\n"
")"
)
)
# Copy data (exclude nic_pct)
db.execute_sql(
(
"INSERT INTO recipe__new (id, name, size_ml, base_pg_pct, base_vg_pct, "
"nic_target_mg_per_ml, nic_strength_mg_per_ml, nic_base, nicotine_id) "
"SELECT id, name, size_ml, base_pg_pct, base_vg_pct, "
" nic_target_mg_per_ml, nic_strength_mg_per_ml, nic_base, nicotine_id "
"FROM recipe"
)
)
# Drop old table and rename new
db.execute_sql("DROP TABLE recipe")
db.execute_sql("ALTER TABLE recipe__new RENAME TO recipe")
# Recreate any indexes that referenced the table (unique on name is inline; add index on nicotine_id)
db.execute_sql(
"CREATE INDEX IF NOT EXISTS idx_recipe_nicotine_id ON recipe(nicotine_id)"
)

View file

@ -5,6 +5,7 @@ from peewee import (
BooleanField, BooleanField,
DateTimeField, DateTimeField,
Check, Check,
FloatField,
) )
from .base import BaseModel from .base import BaseModel
@ -14,6 +15,10 @@ class Flavor(BaseModel):
name = CharField(collation="NOCASE") name = CharField(collation="NOCASE")
company = CharField(collation="NOCASE") company = CharField(collation="NOCASE")
base = CharField(constraints=[Check("base IN ('PG','VG')")]) base = CharField(constraints=[Check("base IN ('PG','VG')")])
# Optional inventory metadata
bottle_size_ml = FloatField(null=True)
cost = FloatField(null=True)
purchase_date = DateTimeField(null=True)
is_deleted = BooleanField(default=False) is_deleted = BooleanField(default=False)
deleted_at = DateTimeField(null=True) deleted_at = DateTimeField(null=True)

View file

@ -0,0 +1,36 @@
from __future__ import annotations
from peewee import (
CharField,
BooleanField,
DateTimeField,
Check,
FloatField,
)
from .base import BaseModel
class Nicotine(BaseModel):
name = CharField(collation="NOCASE")
company = CharField(collation="NOCASE")
base = CharField(constraints=[Check("base IN ('PG','VG')")])
strength_mg_per_ml = FloatField(constraints=[Check("strength_mg_per_ml > 0")])
# Optional inventory metadata
bottle_size_ml = FloatField(null=True)
cost = FloatField(null=True)
purchase_date = DateTimeField(null=True)
# Soft delete
is_deleted = BooleanField(default=False)
deleted_at = DateTimeField(null=True)
class Meta:
table_name = "nicotine"
indexes = (
(("company", "name", "strength_mg_per_ml", "base"), True),
)
__all__ = ["Nicotine"]

View file

@ -6,9 +6,11 @@ from peewee import (
FloatField, FloatField,
Check, Check,
SQL, SQL,
ForeignKeyField,
) )
from .base import BaseModel from .base import BaseModel
from .nicotine import Nicotine
class Recipe(BaseModel): class Recipe(BaseModel):
@ -16,8 +18,10 @@ class Recipe(BaseModel):
size_ml = IntegerField(constraints=[Check("size_ml > 0")]) size_ml = IntegerField(constraints=[Check("size_ml > 0")])
base_pg_pct = FloatField(constraints=[Check("base_pg_pct >= 0")]) base_pg_pct = FloatField(constraints=[Check("base_pg_pct >= 0")])
base_vg_pct = FloatField(constraints=[Check("base_vg_pct >= 0")]) base_vg_pct = FloatField(constraints=[Check("base_vg_pct >= 0")])
nic_pct = FloatField(constraints=[Check("nic_pct >= 0 AND nic_pct <= 100")]) nic_target_mg_per_ml = FloatField(constraints=[Check("nic_target_mg_per_ml >= 0")])
nic_strength_mg_per_ml = FloatField(constraints=[Check("nic_strength_mg_per_ml > 0")])
nic_base = CharField(constraints=[Check("nic_base IN ('PG','VG')")]) nic_base = CharField(constraints=[Check("nic_base IN ('PG','VG')")])
nicotine = ForeignKeyField(Nicotine, null=True, backref="recipes", on_delete="SET NULL")
class Meta: class Meta:
table_name = "recipe" table_name = "recipe"

View file

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from peewee import ForeignKeyField, FloatField, Check from peewee import ForeignKeyField, FloatField, Check, CharField
from .base import BaseModel from .base import BaseModel
from .flavor import Flavor from .flavor import Flavor
@ -11,6 +11,12 @@ class RecipeFlavor(BaseModel):
recipe = ForeignKeyField(Recipe, backref="ingredients", on_delete="CASCADE") recipe = ForeignKeyField(Recipe, backref="ingredients", on_delete="CASCADE")
flavor = ForeignKeyField(Flavor, backref="recipes", on_delete="RESTRICT") flavor = ForeignKeyField(Flavor, backref="recipes", on_delete="RESTRICT")
pct = FloatField(constraints=[Check("pct >= 0 AND pct <= 100")]) pct = FloatField(constraints=[Check("pct >= 0 AND pct <= 100")])
# Snapshots of flavor identity at the time the ingredient entry is created
flavor_name_snapshot = CharField(null=True)
flavor_company_snapshot = CharField(null=True, collation="NOCASE")
flavor_base_snapshot = CharField(
null=True, constraints=[Check("flavor_base_snapshot IN ('PG','VG')")]
)
class Meta: class Meta:
table_name = "recipe_flavor" table_name = "recipe_flavor"

View file

@ -0,0 +1,7 @@
"""Business logic utilities.
This package contains the calculation engine used to compute recipe component
amounts (PG, VG, Nicotine, and per-flavor volumes) from recipe inputs.
"""
__all__ = []

View file

@ -0,0 +1,204 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, List, Mapping, Sequence, Tuple, Union
FlavorInput = Union[
Mapping[str, Union[str, float]],
Tuple[str, str, float], # (name, base, pct)
]
@dataclass(frozen=True)
class FlavorAmount:
name: str
base: str # 'PG' or 'VG'
pct: float
ml: float
@dataclass(frozen=True)
class RecipeBreakdown:
size_ml: int
pg_pct: float
vg_pct: float
nic_pct: float # volume percent of nicotine stock used in final mix
nic_base: str
# Extra nicotine-related fields for clarity
nic_target_mg_per_ml: float # target final strength
nic_strength_mg_per_ml: float # stock/base concentration
pg_ml: float
vg_ml: float
nic_ml: float
flavor_prep_ml: float
flavors: Tuple[FlavorAmount, ...]
EPS = 1e-7
def _norm_flavor(item: FlavorInput) -> Tuple[str, str, float]:
if isinstance(item, tuple) and len(item) == 3:
name, base, pct = item
return str(name), str(base), float(pct)
elif isinstance(item, Mapping):
name = str(item.get("name", ""))
base = str(item.get("base"))
pct = float(item.get("pct"))
return name, base, pct
else:
raise TypeError("Flavor input must be a mapping with keys (name, base, pct) or a tuple (name, base, pct)")
def _validate_inputs(
*,
size_ml: int,
base_pg_pct: float,
base_vg_pct: float,
nic_target_mg_per_ml: float,
nic_strength_mg_per_ml: float,
nic_base: str,
flavors: Sequence[Tuple[str, str, float]],
) -> None:
if size_ml <= 0:
raise ValueError("size_ml must be positive")
if abs((base_pg_pct + base_vg_pct) - 100.0) > 1e-6:
raise ValueError("Base PG% + VG% must equal 100")
if nic_base not in ("PG", "VG"):
raise ValueError("nic_base must be 'PG' or 'VG'")
if nic_strength_mg_per_ml <= 0:
raise ValueError("nic_strength_mg_per_ml must be > 0")
if nic_target_mg_per_ml < 0:
raise ValueError("nic_target_mg_per_ml must be >= 0")
for _, base, pct in flavors:
if base not in ("PG", "VG"):
raise ValueError("Flavor base must be 'PG' or 'VG'")
if not (0.0 <= pct <= 100.0):
raise ValueError("Flavor pct must be between 0 and 100")
def compute_breakdown(
*,
size_ml: int,
base_pg_pct: float,
base_vg_pct: float,
nic_target_mg_per_ml: float,
nic_strength_mg_per_ml: float,
nic_base: str,
flavors: Sequence[FlavorInput],
) -> RecipeBreakdown:
"""Compute recipe component amounts.
Parameters are pure data; this function does not touch the DB or UI.
`flavors` can be a sequence of dicts with keys `name`, `base`, `pct`,
or tuples `(name, base, pct)`.
"""
normalized: List[Tuple[str, str, float]] = [_norm_flavor(f) for f in flavors]
_validate_inputs(
size_ml=size_ml,
base_pg_pct=base_pg_pct,
base_vg_pct=base_vg_pct,
nic_target_mg_per_ml=nic_target_mg_per_ml,
nic_strength_mg_per_ml=nic_strength_mg_per_ml,
nic_base=nic_base,
flavors=normalized,
)
# Compute required nicotine stock volume percent for the target strength
nic_pct = (float(nic_target_mg_per_ml) / float(nic_strength_mg_per_ml)) * 100.0
if nic_pct > 100.0 + 1e-9:
raise ValueError("Target nicotine exceeds stock strength (requires >100% nic volume)")
flavor_pg_pct = sum(pct for _, base, pct in normalized if base == "PG")
flavor_vg_pct = sum(pct for _, base, pct in normalized if base == "VG")
pg_pct = base_pg_pct - flavor_pg_pct - (nic_pct if nic_base == "PG" else 0.0)
vg_pct = base_vg_pct - flavor_vg_pct - (nic_pct if nic_base == "VG" else 0.0)
# Validation: PG/VG must not be negative (within tolerance)
if pg_pct < -EPS or vg_pct < -EPS:
raise ValueError("Computed PG/VG would be negative. Reduce flavor or nicotine percentages.")
# Clamp tiny negatives to zero to account for float rounding
if pg_pct < 0 and pg_pct > -EPS:
pg_pct = 0.0
if vg_pct < 0 and vg_pct > -EPS:
vg_pct = 0.0
# Validate total ~ 100
total = flavor_pg_pct + flavor_vg_pct + nic_pct + pg_pct + vg_pct
if abs(total - 100.0) > 1e-5:
raise ValueError("Component percentages must sum to ~100% within tolerance")
# Convert to mL
def pct_to_ml(p: float) -> float:
return p / 100.0 * float(size_ml)
pg_ml = pct_to_ml(pg_pct)
vg_ml = pct_to_ml(vg_pct)
nic_ml = pct_to_ml(nic_pct)
flavors_out: List[FlavorAmount] = []
for name, base, pct in normalized:
flavors_out.append(FlavorAmount(name=name, base=base, pct=pct, ml=pct_to_ml(pct)))
flavor_prep_ml = pct_to_ml(flavor_pg_pct + flavor_vg_pct)
return RecipeBreakdown(
size_ml=int(size_ml),
pg_pct=float(pg_pct),
vg_pct=float(vg_pct),
nic_pct=float(nic_pct),
nic_base=str(nic_base),
nic_target_mg_per_ml=float(nic_target_mg_per_ml),
nic_strength_mg_per_ml=float(nic_strength_mg_per_ml),
pg_ml=float(pg_ml),
vg_ml=float(vg_ml),
nic_ml=float(nic_ml),
flavor_prep_ml=float(flavor_prep_ml),
flavors=tuple(flavors_out),
)
def round_ml(value: float, decimals: int = 1) -> float:
"""Round a milliliter value for display. Default 1 decimal place.
Uses standard rounding behavior.
"""
return round(float(value), decimals)
def breakdown_for_view(breakdown: RecipeBreakdown, decimals: int = 1) -> RecipeBreakdown:
"""Return a copy of breakdown with mL values rounded for display.
Percentages are left unchanged; only mL fields are rounded.
"""
rounded_flavors = tuple(
FlavorAmount(name=f.name, base=f.base, pct=f.pct, ml=round_ml(f.ml, decimals))
for f in breakdown.flavors
)
return RecipeBreakdown(
size_ml=breakdown.size_ml,
pg_pct=breakdown.pg_pct,
vg_pct=breakdown.vg_pct,
nic_pct=breakdown.nic_pct,
nic_base=breakdown.nic_base,
nic_target_mg_per_ml=breakdown.nic_target_mg_per_ml,
nic_strength_mg_per_ml=breakdown.nic_strength_mg_per_ml,
pg_ml=round_ml(breakdown.pg_ml, decimals),
vg_ml=round_ml(breakdown.vg_ml, decimals),
nic_ml=round_ml(breakdown.nic_ml, decimals),
flavor_prep_ml=round_ml(breakdown.flavor_prep_ml, decimals),
flavors=rounded_flavors,
)
__all__ = [
"FlavorAmount",
"RecipeBreakdown",
"compute_breakdown",
"round_ml",
"breakdown_for_view",
]

158
tests/test_calculations.py Normal file
View file

@ -0,0 +1,158 @@
from __future__ import annotations
import math
import pytest
from funkyjuicerecipes.logic.calculations import (
compute_breakdown,
breakdown_for_view,
)
def test_readme_style_example_120ml_nic_in_pg():
bd = compute_breakdown(
size_ml=120,
base_pg_pct=30.0,
base_vg_pct=70.0,
nic_target_mg_per_ml=3.0,
nic_strength_mg_per_ml=100.0,
nic_base="PG",
flavors=[
{"name": "Vanilla", "base": "PG", "pct": 8.0},
("Strawberry", "VG", 4.0),
],
)
# Percentages
assert bd.pg_pct == pytest.approx(19.0)
assert bd.vg_pct == pytest.approx(66.0)
assert bd.nic_pct == pytest.approx(3.0)
# mL calculations
assert bd.pg_ml == pytest.approx(22.8)
assert bd.vg_ml == pytest.approx(79.2)
assert bd.nic_ml == pytest.approx(3.6)
names = [f.name for f in bd.flavors]
assert names == ["Vanilla", "Strawberry"]
amounts = {f.name: f.ml for f in bd.flavors}
assert amounts["Vanilla"] == pytest.approx(9.6)
assert amounts["Strawberry"] == pytest.approx(4.8)
assert bd.flavor_prep_ml == pytest.approx(14.4)
# Rounded view (1 decimal) should match obvious roundings
v = breakdown_for_view(bd)
assert v.pg_ml == pytest.approx(22.8)
assert v.vg_ml == pytest.approx(79.2)
assert v.nic_ml == pytest.approx(3.6)
assert {f.name: f.ml for f in v.flavors}["Vanilla"] == pytest.approx(9.6)
assert {f.name: f.ml for f in v.flavors}["Strawberry"] == pytest.approx(4.8)
assert v.flavor_prep_ml == pytest.approx(14.4)
def test_nic_in_vg_changes_pg_vg_split():
bd = compute_breakdown(
size_ml=120,
base_pg_pct=30.0,
base_vg_pct=70.0,
nic_target_mg_per_ml=3.0,
nic_strength_mg_per_ml=100.0,
nic_base="VG",
flavors=[
{"name": "Vanilla", "base": "PG", "pct": 8.0},
("Strawberry", "VG", 4.0),
],
)
assert bd.pg_pct == pytest.approx(22.0)
assert bd.vg_pct == pytest.approx(63.0)
assert bd.pg_ml == pytest.approx(26.4)
assert bd.vg_ml == pytest.approx(75.6)
@pytest.mark.parametrize(
"base_pg,base_vg",
[
(70.0, 30.0),
(70.0000001, 29.9999999), # tolerance
],
)
def test_base_pg_vg_sum_to_100_with_tolerance(base_pg, base_vg):
bd = compute_breakdown(
size_ml=60,
base_pg_pct=base_pg,
base_vg_pct=base_vg,
nic_target_mg_per_ml=0.0,
nic_strength_mg_per_ml=100.0,
nic_base="PG",
flavors=[],
)
assert bd.pg_pct + bd.vg_pct + bd.nic_pct + sum(f.pct for f in bd.flavors) == pytest.approx(100.0)
def test_excessive_flavor_or_negative_pg_raises():
# This combination would push PG negative (base PG 10, nic 10 PG, flavor PG 5 => PG becomes -5)
with pytest.raises(ValueError):
compute_breakdown(
size_ml=30,
base_pg_pct=10.0,
base_vg_pct=90.0,
nic_target_mg_per_ml=10.0,
nic_strength_mg_per_ml=100.0,
nic_base="PG",
flavors=[("Whatever", "PG", 5.0)],
)
# Flavors sum > 100 should also fail via negative VG/PG after deduction
with pytest.raises(ValueError):
compute_breakdown(
size_ml=30,
base_pg_pct=50.0,
base_vg_pct=50.0,
nic_target_mg_per_ml=0.0,
nic_strength_mg_per_ml=100.0,
nic_base="PG",
flavors=[("A", "PG", 60.0), ("B", "VG", 50.0)],
)
def test_input_variants_tuple_and_mapping_supported():
bd = compute_breakdown(
size_ml=10,
base_pg_pct=50.0,
base_vg_pct=50.0,
nic_target_mg_per_ml=0.0,
nic_strength_mg_per_ml=100.0,
nic_base="PG",
flavors=[
{"name": "X", "base": "PG", "pct": 5.0},
("Y", "VG", 5.0),
],
)
def test_50mg_stock_requires_more_nic_volume_and_changes_pg_vg_split():
# With 50 mg/mL stock, to reach 3 mg/mL target we need 6% nic volume.
bd = compute_breakdown(
size_ml=120,
base_pg_pct=30.0,
base_vg_pct=70.0,
nic_target_mg_per_ml=3.0,
nic_strength_mg_per_ml=50.0,
nic_base="PG",
flavors=[
{"name": "Vanilla", "base": "PG", "pct": 8.0},
("Strawberry", "VG", 4.0),
],
)
# Now nic_pct (volume) is 6%
assert bd.nic_pct == pytest.approx(6.0)
# PG is further reduced by extra 3% vs the 100 mg/mL case
# Base pg 30 - flavor PG 8 - nic 6 = 16
assert bd.pg_pct == pytest.approx(16.0)
# VG remains 70 - flavor VG 4 = 66
assert bd.vg_pct == pytest.approx(66.0)
# mL
assert bd.nic_ml == pytest.approx(7.2) # 6% of 120
assert bd.pg_ml == pytest.approx(19.2) # 16% of 120
assert bd.vg_ml == pytest.approx(79.2) # 66% of 120
assert {f.name for f in bd.flavors} == {"Vanilla", "Strawberry"}

View file

@ -49,20 +49,23 @@ def test_init_database_creates_file_and_tables(tmp_path: Path):
# Use sqlite3 to introspect tables # Use sqlite3 to introspect tables
with sqlite3.connect(db_path) as conn: with sqlite3.connect(db_path) as conn:
names = _tables(conn) names = _tables(conn)
# migrations + our 3 domain tables # migrations + our domain tables (including nicotine)
assert {"migrations", "flavor", "recipe", "recipe_flavor"}.issubset(names) assert {"migrations", "flavor", "recipe", "recipe_flavor", "nicotine"}.issubset(names)
# migrations table should have entries for all applied migrations # migrations table should have entries for all applied migrations
cur = conn.execute("SELECT filename, batch FROM migrations ORDER BY filename") cur = conn.execute("SELECT filename, batch FROM migrations ORDER BY filename")
rows = cur.fetchall() rows = cur.fetchall()
assert len(rows) == 3 # We now have more migrations over time; ensure at least the base set were applied
assert len(rows) >= 3
filenames = [r[0] for r in rows] filenames = [r[0] for r in rows]
# Ensure expected filenames are present # Ensure expected filenames are present
assert filenames == sorted(filenames) assert filenames == sorted(filenames)
assert any("flavor_table_created" in f for f in filenames) assert any("flavor_table_created" in f for f in filenames)
assert any("recipe_table_created" in f for f in filenames) assert any("recipe_table_created" in f for f in filenames)
assert any("recipe_flavor_table_created" in f for f in filenames) assert any("recipe_flavor_table_created" in f for f in filenames)
# All three should be recorded in the same batch # New nicotine/inventory migrations should also be present
assert any("nicotine_table_created" in f for f in filenames)
# All migrations applied in one run should share the same batch
batches = {r[1] for r in rows} batches = {r[1] for r in rows}
assert len(batches) == 1 assert len(batches) == 1
@ -75,9 +78,9 @@ def test_apply_migrations_is_idempotent(tmp_path: Path):
pending = apply_migrations(db) pending = apply_migrations(db)
assert pending == [] assert pending == []
# migrations count remains 3 # migrations count remains stable after re-run
db_path = Path(app.paths.data) / DB_FILENAME db_path = Path(app.paths.data) / DB_FILENAME
with sqlite3.connect(db_path) as conn: with sqlite3.connect(db_path) as conn:
cur = conn.execute("SELECT COUNT(*) FROM migrations") cur = conn.execute("SELECT COUNT(*) FROM migrations")
(count,) = cur.fetchone() (count,) = cur.fetchone()
assert count == 3 assert count >= 3

View file

@ -12,6 +12,7 @@ from funkyjuicerecipes.data.dao.flavors import FlavorRepository
from funkyjuicerecipes.data.dao.recipes import RecipeRepository from funkyjuicerecipes.data.dao.recipes import RecipeRepository
from funkyjuicerecipes.data.models.flavor import Flavor from funkyjuicerecipes.data.models.flavor import Flavor
from funkyjuicerecipes.data.models.recipe_flavor import RecipeFlavor from funkyjuicerecipes.data.models.recipe_flavor import RecipeFlavor
from funkyjuicerecipes.data.models.flavor import Flavor as FlavorModel
class DummyPaths(SimpleNamespace): class DummyPaths(SimpleNamespace):
@ -84,7 +85,7 @@ def test_flavor_soft_delete_and_listing_filters(tmp_path: Path):
size_ml=30, size_ml=30,
base_pg_pct=50, base_pg_pct=50,
base_vg_pct=50, base_vg_pct=50,
nic_pct=0, nic_target_mg_per_ml=0,
nic_base="PG", nic_base="PG",
ingredients=[(f1.id, 5.0)], ingredients=[(f1.id, 5.0)],
) )
@ -113,7 +114,7 @@ def test_recipe_creation_and_get_with_ingredients_includes_soft_deleted_flavors(
size_ml=60, size_ml=60,
base_pg_pct=70, base_pg_pct=70,
base_vg_pct=30, base_vg_pct=30,
nic_pct=3, nic_target_mg_per_ml=3,
nic_base="PG", nic_base="PG",
ingredients=[(fa.id, 4.0), (fb.id, 2.0)], ingredients=[(fa.id, 4.0), (fb.id, 2.0)],
) )
@ -142,7 +143,7 @@ def test_replace_ingredients_diff_behavior(tmp_path: Path):
size_ml=30, size_ml=30,
base_pg_pct=50, base_pg_pct=50,
base_vg_pct=50, base_vg_pct=50,
nic_pct=0, nic_target_mg_per_ml=0,
nic_base="PG", nic_base="PG",
ingredients=[(f1.id, 3.0), (f2.id, 2.0)], ingredients=[(f1.id, 3.0), (f2.id, 2.0)],
) )
@ -179,3 +180,55 @@ def test_replace_ingredients_diff_behavior(tmp_path: Path):
assert RecipeFlavor.select().where( assert RecipeFlavor.select().where(
(RecipeFlavor.recipe == r.id) & (RecipeFlavor.flavor == f3.id) (RecipeFlavor.recipe == r.id) & (RecipeFlavor.flavor == f3.id)
).exists() ).exists()
def test_flavor_snapshots_populated_and_stable(tmp_path: Path):
app, db = _init_db(tmp_path)
flavors = FlavorRepository()
recipes = RecipeRepository()
# Create a flavor and recipe using it
f = flavors.create(name="Vanilla", company="CAP", base="PG")
r = recipes.create(
name="Vanilla Mix",
size_ml=30,
base_pg_pct=50,
base_vg_pct=50,
nic_target_mg_per_ml=0,
nic_base="PG",
ingredients=[(f.id, 5.0)],
)
rf = RecipeFlavor.get(RecipeFlavor.recipe == r.id)
assert rf.flavor_id == f.id
# Snapshots should be populated at creation time
assert rf.flavor_name_snapshot == "Vanilla"
assert rf.flavor_company_snapshot == "CAP"
assert rf.flavor_base_snapshot == "PG"
# Change the live Flavor fields
flavors.update(f.id, name="Vanilla Bean", company="Capella", base="VG")
# Snapshots must remain unchanged
rf2 = RecipeFlavor.get_by_id(rf.id)
assert rf2.flavor_name_snapshot == "Vanilla"
assert rf2.flavor_company_snapshot == "CAP"
assert rf2.flavor_base_snapshot == "PG"
# Pct-only update should not touch snapshots
recipes.replace_ingredients(r.id, [(f.id, 7.5)])
rf3 = RecipeFlavor.get(RecipeFlavor.recipe == r.id)
assert rf3.pct == 7.5
assert rf3.flavor_name_snapshot == "Vanilla"
assert rf3.flavor_company_snapshot == "CAP"
assert rf3.flavor_base_snapshot == "PG"
# Replacing with a different flavor should capture new snapshots
f2 = flavors.create(name="Cookie", company="FA", base="VG")
recipes.replace_ingredients(r.id, [(f2.id, 2.0)])
rf_after = RecipeFlavor.get(RecipeFlavor.recipe == r.id)
assert rf_after.flavor_id == f2.id
assert rf_after.pct == 2.0
assert rf_after.flavor_name_snapshot == "Cookie"
assert rf_after.flavor_company_snapshot == "FA"
assert rf_after.flavor_base_snapshot == "VG"