Unit of Work minimal viable product

This commit is contained in:
Funky Waddle 2025-12-10 08:36:11 -06:00
parent 55d256506a
commit 2c3b219d9c
4 changed files with 365 additions and 3 deletions

View file

@ -4,6 +4,7 @@ namespace Pairity\Model;
use Pairity\Contracts\ConnectionInterface;
use Pairity\Contracts\DaoInterface;
use Pairity\Orm\UnitOfWork;
abstract class AbstractDao implements DaoInterface
{
@ -92,6 +93,13 @@ abstract class AbstractDao implements DaoInterface
public function findById(int|string $id): ?AbstractDto
{
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$managed = $uow->get(static::class, (string)$id);
if ($managed instanceof AbstractDto) {
return $managed;
}
}
return $this->findOneBy([$this->getPrimaryKey() => $id]);
}
@ -136,6 +144,36 @@ abstract class AbstractDao implements DaoInterface
/** @param array<string,mixed> $data */
public function update(int|string $id, array $data): AbstractDto
{
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
// Defer execution; return a synthesized DTO
$existing = $this->findById($id);
if (!$existing && empty($data)) {
throw new \InvalidArgumentException('No data provided to update and record not found');
}
$toStore = $this->prepareForUpdate($data);
$self = $this;
$conn = $this->connection;
$uow->enqueue($conn, function () use ($self, $id, $toStore) {
UnitOfWork::suspendDuring(function () use ($self, $id, $toStore) {
// execute real update now
$sets = [];
$params = [];
foreach ($toStore as $col => $val) {
$sets[] = "$col = :set_$col";
$params["set_$col"] = $val;
}
$params['pk'] = $id;
$sql = 'UPDATE ' . $self->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $self->getPrimaryKey() . ' = :pk';
$self->getConnection()->execute($sql, $params);
});
});
$base = $existing ? $existing->toArray(false) : [];
$pk = $this->getPrimaryKey();
$result = array_merge($base, $data, [$pk => $id]);
return $this->hydrate($this->castRowFromStorage($result));
}
if (empty($data)) {
$existing = $this->findById($id);
if ($existing) return $existing;
@ -153,7 +191,6 @@ abstract class AbstractDao implements DaoInterface
$this->connection->execute($sql, $params);
$updated = $this->findById($id);
if ($updated === null) {
// As a fallback, hydrate using provided data + id
$pk = $this->getPrimaryKey();
return $this->hydrate($this->castRowFromStorage(array_merge($data, [$pk => $id])));
}
@ -162,6 +199,15 @@ abstract class AbstractDao implements DaoInterface
public function deleteById(int|string $id): int
{
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $theId = $id;
$uow->enqueue($conn, function () use ($self, $theId) {
UnitOfWork::suspendDuring(function () use ($self, $theId) { $self->deleteById($theId); });
});
// deferred; immediate affected count unknown
return 0;
}
if ($this->hasSoftDeletes()) {
$columns = $this->softDeleteConfig();
$deletedAt = $columns['deletedAt'] ?? 'deleted_at';
@ -176,6 +222,14 @@ abstract class AbstractDao implements DaoInterface
/** @param array<string,mixed> $criteria */
public function deleteBy(array $criteria): int
{
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $crit = $criteria;
$uow->enqueue($conn, function () use ($self, $crit) {
UnitOfWork::suspendDuring(function () use ($self, $crit) { $self->deleteBy($crit); });
});
return 0;
}
if ($this->hasSoftDeletes()) {
[$where, $bindings] = $this->buildWhere($criteria);
if ($where === '') { return 0; }
@ -200,6 +254,16 @@ abstract class AbstractDao implements DaoInterface
*/
public function updateBy(array $criteria, array $data): int
{
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
if (empty($data)) { return 0; }
$self = $this; $conn = $this->connection; $crit = $criteria; $payload = $this->prepareForUpdate($data);
$uow->enqueue($conn, function () use ($self, $crit, $payload) {
UnitOfWork::suspendDuring(function () use ($self, $crit, $payload) { $self->updateBy($crit, $payload); });
});
// unknown affected rows until commit
return 0;
}
if (empty($data)) {
return 0;
}
@ -457,6 +521,14 @@ abstract class AbstractDao implements DaoInterface
$class = $this->dtoClass();
/** @var AbstractDto $dto */
$dto = $class::fromArray($row);
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$pk = $this->getPrimaryKey();
$idVal = $row[$pk] ?? null;
if ($idVal !== null) {
$uow->attach(static::class, (string)$idVal, $dto);
}
}
return $dto;
}

View file

@ -3,6 +3,7 @@
namespace Pairity\NoSql\Mongo;
use Pairity\Model\AbstractDto;
use Pairity\Orm\UnitOfWork;
/**
* Base DAO for MongoDB collections returning DTOs.
@ -131,32 +132,73 @@ abstract class AbstractMongoDao
public function findById(string $id): ?AbstractDto
{
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$managed = $uow->get(static::class, (string)$id);
if ($managed instanceof AbstractDto) {
return $managed;
}
}
return $this->findOneBy(['_id' => $id]);
}
/** @param array<string,mixed> $data */
public function insert(array $data): AbstractDto
{
$id = $this->connection->insertOne($this->databaseName(), $this->collection(), $data);
// fetch back
// Inserts remain immediate to obtain a real id, even under UoW
$id = UnitOfWork::suspendDuring(function () use ($data) {
return $this->connection->insertOne($this->databaseName(), $this->collection(), $data);
});
return $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id]));
}
/** @param array<string,mixed> $data */
public function update(string $id, array $data): AbstractDto
{
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $theId = $id; $payload = $data;
$uow->enqueue($conn, function () use ($self, $theId, $payload) {
UnitOfWork::suspendDuring(function () use ($self, $theId, $payload) {
$self->getConnection()->updateOne($self->databaseName(), $self->collection(), ['_id' => $theId], ['$set' => $payload]);
});
});
$base = $this->findById($id)?->toArray(false) ?? [];
$result = array_merge($base, $data, ['_id' => $id]);
return $this->hydrate($result);
}
$this->connection->updateOne($this->databaseName(), $this->collection(), ['_id' => $id], ['$set' => $data]);
return $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id]));
}
public function deleteById(string $id): int
{
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $theId = $id;
$uow->enqueue($conn, function () use ($self, $theId) {
UnitOfWork::suspendDuring(function () use ($self, $theId) {
$self->getConnection()->deleteOne($self->databaseName(), $self->collection(), ['_id' => $theId]);
});
});
return 0;
}
return $this->connection->deleteOne($this->databaseName(), $this->collection(), ['_id' => $id]);
}
/** @param array<string,mixed>|Filter $filter */
public function deleteBy(array|Filter $filter): int
{
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $flt = $this->normalizeFilterInput($filter);
$uow->enqueue($conn, function () use ($self, $flt) {
UnitOfWork::suspendDuring(function () use ($self, $flt) {
$self->getConnection()->deleteOne($self->databaseName(), $self->collection(), $flt);
});
});
return 0;
}
// For MVP provide deleteOne semantic; bulk deletes could be added later
return $this->connection->deleteOne($this->databaseName(), $this->collection(), $this->normalizeFilterInput($filter));
}
@ -236,6 +278,13 @@ abstract class AbstractMongoDao
$class = $this->dtoClass();
/** @var AbstractDto $dto */
$dto = $class::fromArray($doc);
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$idVal = $doc['_id'] ?? null;
if ($idVal !== null) {
$uow->attach(static::class, (string)$idVal, $dto);
}
}
return $dto;
}

153
src/Orm/UnitOfWork.php Normal file
View file

@ -0,0 +1,153 @@
<?php
namespace Pairity\Orm;
use Closure;
/**
* Opt-in Unit of Work (MVP):
* - Ambient/current context with begin()/run()/current()
* - Identity Map per DAO class + primary key (string)
* - Deferred operation queues grouped by connection object
* - commit() opens transactions/sessions per connection and executes queued ops
*/
final class UnitOfWork
{
/** @var UnitOfWork|null */
private static ?UnitOfWork $current = null;
/** @var bool */
private static bool $suspended = false; // when true, DAOs should execute immediately
/** @var array<string, array<string, object>> map[daoClass][id] = DTO */
private array $identityMap = [];
/**
* Queues grouped by a connection hash key.
* Each entry: ['conn' => object, 'ops' => list<Closure>]
*
* @var array<string, array{conn:object, ops:array<int,Closure>}>
*/
private array $queues = [];
private function __construct() {}
public static function begin(): UnitOfWork
{
if (self::$current !== null) {
return self::$current;
}
self::$current = new UnitOfWork();
return self::$current;
}
/**
* Run a Unit of Work and automatically commit or rollback on exception.
* @template T
* @param Closure(UnitOfWork):T $callback
* @return mixed
*/
public static function run(Closure $callback): mixed
{
$uow = self::begin();
try {
$result = $callback($uow);
$uow->commit();
return $result;
} catch (\Throwable $e) {
$uow->rollback();
throw $e;
}
}
public static function current(): ?UnitOfWork
{
return self::$current;
}
/** Temporarily suspend UoW interception so DAOs execute immediately within the callable. */
public static function suspendDuring(Closure $cb): mixed
{
$prev = self::$suspended;
self::$suspended = true;
try { return $cb(); } finally { self::$suspended = $prev; }
}
public static function isSuspended(): bool
{
return self::$suspended;
}
// ===== Identity Map =====
/** Attach a DTO to identity map. */
public function attach(string $daoClass, string $id, object $dto): void
{
$this->identityMap[$daoClass][$id] = $dto;
}
/** Fetch an attached DTO if present. */
public function get(string $daoClass, string $id): ?object
{
return $this->identityMap[$daoClass][$id] ?? null;
}
// ===== Defer operations =====
/** Enqueue a mutation for the given connection object. */
public function enqueue(object $connection, Closure $operation): void
{
$key = spl_object_hash($connection);
if (!isset($this->queues[$key])) {
$this->queues[$key] = ['conn' => $connection, 'ops' => []];
}
$this->queues[$key]['ops'][] = $operation;
}
/** Execute all queued operations per connection within a transaction/session. */
public function commit(): void
{
// Ensure we run ops with DAO interception suspended to avoid re-enqueue
self::suspendDuring(function () {
// Grouped by connection type
foreach ($this->queues as $entry) {
$conn = $entry['conn'];
$ops = $entry['ops'];
// PDO/SQL path: has transaction(callable)
if (method_exists($conn, 'transaction')) {
$conn->transaction(function () use ($ops) {
foreach ($ops as $op) { $op(); }
return null;
});
}
// Mongo path: try withTransaction first, then withSession, else run directly
elseif (method_exists($conn, 'withTransaction')) {
$conn->withTransaction(function () use ($ops) {
foreach ($ops as $op) { $op(); }
return null;
});
} elseif (method_exists($conn, 'withSession')) {
$conn->withSession(function () use ($ops) {
foreach ($ops as $op) { $op(); }
return null;
});
} else {
// Fallback: no transaction API; just run
foreach ($ops as $op) { $op(); }
}
}
});
// Clear queues after successful commit
$this->queues = [];
// Do not clear identity map by default; keep for the scope
// Close UoW scope
self::$current = null;
}
/** Rollback just clears queues and current context; actual rollback is handled by transactions when run. */
public function rollback(): void
{
$this->queues = [];
self::$current = null;
}
}

View file

@ -0,0 +1,88 @@
<?php
declare(strict_types=1);
namespace Pairity\Tests;
use PHPUnit\Framework\TestCase;
use Pairity\Database\ConnectionManager;
use Pairity\Model\AbstractDto;
use Pairity\Model\AbstractDao;
use Pairity\Orm\UnitOfWork;
final class UnitOfWorkSqliteTest extends TestCase
{
private function conn()
{
return ConnectionManager::make(['driver' => 'sqlite', 'path' => ':memory:']);
}
public function testDeferredUpdateAndDeleteCommit(): void
{
$conn = $this->conn();
// schema
$conn->execute('CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT, name TEXT)');
// DAO/DTO inline
$dto = new class([]) extends AbstractDto {};
$dtoClass = get_class($dto);
$dao = new class($conn, $dtoClass) extends AbstractDao {
private string $dto;
public function __construct($c, string $dto) { parent::__construct($c); $this->dto = $dto; }
public function getTable(): string { return 'users'; }
protected function dtoClass(): string { return $this->dto; }
protected function schema(): array { return ['primaryKey' => 'id', 'columns' => ['id'=>['cast'=>'int'],'email'=>['cast'=>'string'],'name'=>['cast'=>'string']]]; }
};
// Insert immediate
$created = $dao->insert(['email' => 'u@example.com', 'name' => 'User']);
$id = (int)($created->toArray(false)['id'] ?? 0);
$this->assertGreaterThan(0, $id);
// Run UoW with deferred update and delete
UnitOfWork::run(function(UnitOfWork $uow) use ($dao, $id) {
$one = $dao->findById($id); // attaches to identity map
$this->assertNotNull($one);
// defer update
$dao->update($id, ['name' => 'Changed']);
// defer deleteBy criteria (will be executed after update)
$dao->deleteBy(['id' => $id]);
// commit done by run()
});
// After commit, record should be deleted
$this->assertNull($dao->findById($id));
}
public function testRollbackOnExceptionClearsOps(): void
{
$conn = $this->conn();
$conn->execute('CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT, name TEXT)');
$dto = new class([]) extends AbstractDto {};
$dtoClass = get_class($dto);
$dao = new class($conn, $dtoClass) extends AbstractDao {
private string $dto; public function __construct($c, string $dto) { parent::__construct($c); $this->dto = $dto; }
public function getTable(): string { return 'users'; }
protected function dtoClass(): string { return $this->dto; }
protected function schema(): array { return ['primaryKey'=>'id','columns'=>['id'=>['cast'=>'int'],'email'=>['cast'=>'string'],'name'=>['cast'=>'string']]]; }
};
$created = $dao->insert(['email' => 'x@example.com', 'name' => 'X']);
$id = (int)($created->toArray(false)['id'] ?? 0);
try {
UnitOfWork::run(function(UnitOfWork $uow) use ($dao, $id) {
$dao->update($id, ['name' => 'Won\'t Persist']);
throw new \RuntimeException('boom');
});
$this->fail('Exception expected');
} catch (\RuntimeException $e) {
// ok
}
// Update should not be applied due to rollback
$after = $dao->findById($id);
$this->assertSame('X', $after?->toArray(false)['name'] ?? null);
}
}