diff --git a/src/Model/AbstractDao.php b/src/Model/AbstractDao.php index 4c99cfd..a7fe1d1 100644 --- a/src/Model/AbstractDao.php +++ b/src/Model/AbstractDao.php @@ -4,6 +4,7 @@ namespace Pairity\Model; use Pairity\Contracts\ConnectionInterface; use Pairity\Contracts\DaoInterface; +use Pairity\Orm\UnitOfWork; abstract class AbstractDao implements DaoInterface { @@ -92,6 +93,13 @@ abstract class AbstractDao implements DaoInterface public function findById(int|string $id): ?AbstractDto { + $uow = UnitOfWork::current(); + if ($uow && !UnitOfWork::isSuspended()) { + $managed = $uow->get(static::class, (string)$id); + if ($managed instanceof AbstractDto) { + return $managed; + } + } return $this->findOneBy([$this->getPrimaryKey() => $id]); } @@ -136,6 +144,36 @@ abstract class AbstractDao implements DaoInterface /** @param array $data */ public function update(int|string $id, array $data): AbstractDto { + $uow = UnitOfWork::current(); + if ($uow && !UnitOfWork::isSuspended()) { + // Defer execution; return a synthesized DTO + $existing = $this->findById($id); + if (!$existing && empty($data)) { + throw new \InvalidArgumentException('No data provided to update and record not found'); + } + $toStore = $this->prepareForUpdate($data); + $self = $this; + $conn = $this->connection; + $uow->enqueue($conn, function () use ($self, $id, $toStore) { + UnitOfWork::suspendDuring(function () use ($self, $id, $toStore) { + // execute real update now + $sets = []; + $params = []; + foreach ($toStore as $col => $val) { + $sets[] = "$col = :set_$col"; + $params["set_$col"] = $val; + } + $params['pk'] = $id; + $sql = 'UPDATE ' . $self->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $self->getPrimaryKey() . ' = :pk'; + $self->getConnection()->execute($sql, $params); + }); + }); + $base = $existing ? $existing->toArray(false) : []; + $pk = $this->getPrimaryKey(); + $result = array_merge($base, $data, [$pk => $id]); + return $this->hydrate($this->castRowFromStorage($result)); + } + if (empty($data)) { $existing = $this->findById($id); if ($existing) return $existing; @@ -153,7 +191,6 @@ abstract class AbstractDao implements DaoInterface $this->connection->execute($sql, $params); $updated = $this->findById($id); if ($updated === null) { - // As a fallback, hydrate using provided data + id $pk = $this->getPrimaryKey(); return $this->hydrate($this->castRowFromStorage(array_merge($data, [$pk => $id]))); } @@ -162,6 +199,15 @@ abstract class AbstractDao implements DaoInterface public function deleteById(int|string $id): int { + $uow = UnitOfWork::current(); + if ($uow && !UnitOfWork::isSuspended()) { + $self = $this; $conn = $this->connection; $theId = $id; + $uow->enqueue($conn, function () use ($self, $theId) { + UnitOfWork::suspendDuring(function () use ($self, $theId) { $self->deleteById($theId); }); + }); + // deferred; immediate affected count unknown + return 0; + } if ($this->hasSoftDeletes()) { $columns = $this->softDeleteConfig(); $deletedAt = $columns['deletedAt'] ?? 'deleted_at'; @@ -176,6 +222,14 @@ abstract class AbstractDao implements DaoInterface /** @param array $criteria */ public function deleteBy(array $criteria): int { + $uow = UnitOfWork::current(); + if ($uow && !UnitOfWork::isSuspended()) { + $self = $this; $conn = $this->connection; $crit = $criteria; + $uow->enqueue($conn, function () use ($self, $crit) { + UnitOfWork::suspendDuring(function () use ($self, $crit) { $self->deleteBy($crit); }); + }); + return 0; + } if ($this->hasSoftDeletes()) { [$where, $bindings] = $this->buildWhere($criteria); if ($where === '') { return 0; } @@ -200,6 +254,16 @@ abstract class AbstractDao implements DaoInterface */ public function updateBy(array $criteria, array $data): int { + $uow = UnitOfWork::current(); + if ($uow && !UnitOfWork::isSuspended()) { + if (empty($data)) { return 0; } + $self = $this; $conn = $this->connection; $crit = $criteria; $payload = $this->prepareForUpdate($data); + $uow->enqueue($conn, function () use ($self, $crit, $payload) { + UnitOfWork::suspendDuring(function () use ($self, $crit, $payload) { $self->updateBy($crit, $payload); }); + }); + // unknown affected rows until commit + return 0; + } if (empty($data)) { return 0; } @@ -457,6 +521,14 @@ abstract class AbstractDao implements DaoInterface $class = $this->dtoClass(); /** @var AbstractDto $dto */ $dto = $class::fromArray($row); + $uow = UnitOfWork::current(); + if ($uow && !UnitOfWork::isSuspended()) { + $pk = $this->getPrimaryKey(); + $idVal = $row[$pk] ?? null; + if ($idVal !== null) { + $uow->attach(static::class, (string)$idVal, $dto); + } + } return $dto; } diff --git a/src/NoSql/Mongo/AbstractMongoDao.php b/src/NoSql/Mongo/AbstractMongoDao.php index 111f559..0f48426 100644 --- a/src/NoSql/Mongo/AbstractMongoDao.php +++ b/src/NoSql/Mongo/AbstractMongoDao.php @@ -3,6 +3,7 @@ namespace Pairity\NoSql\Mongo; use Pairity\Model\AbstractDto; +use Pairity\Orm\UnitOfWork; /** * Base DAO for MongoDB collections returning DTOs. @@ -131,32 +132,73 @@ abstract class AbstractMongoDao public function findById(string $id): ?AbstractDto { + $uow = UnitOfWork::current(); + if ($uow && !UnitOfWork::isSuspended()) { + $managed = $uow->get(static::class, (string)$id); + if ($managed instanceof AbstractDto) { + return $managed; + } + } return $this->findOneBy(['_id' => $id]); } /** @param array $data */ public function insert(array $data): AbstractDto { - $id = $this->connection->insertOne($this->databaseName(), $this->collection(), $data); - // fetch back + // Inserts remain immediate to obtain a real id, even under UoW + $id = UnitOfWork::suspendDuring(function () use ($data) { + return $this->connection->insertOne($this->databaseName(), $this->collection(), $data); + }); return $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id])); } /** @param array $data */ public function update(string $id, array $data): AbstractDto { + $uow = UnitOfWork::current(); + if ($uow && !UnitOfWork::isSuspended()) { + $self = $this; $conn = $this->connection; $theId = $id; $payload = $data; + $uow->enqueue($conn, function () use ($self, $theId, $payload) { + UnitOfWork::suspendDuring(function () use ($self, $theId, $payload) { + $self->getConnection()->updateOne($self->databaseName(), $self->collection(), ['_id' => $theId], ['$set' => $payload]); + }); + }); + $base = $this->findById($id)?->toArray(false) ?? []; + $result = array_merge($base, $data, ['_id' => $id]); + return $this->hydrate($result); + } $this->connection->updateOne($this->databaseName(), $this->collection(), ['_id' => $id], ['$set' => $data]); return $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id])); } public function deleteById(string $id): int { + $uow = UnitOfWork::current(); + if ($uow && !UnitOfWork::isSuspended()) { + $self = $this; $conn = $this->connection; $theId = $id; + $uow->enqueue($conn, function () use ($self, $theId) { + UnitOfWork::suspendDuring(function () use ($self, $theId) { + $self->getConnection()->deleteOne($self->databaseName(), $self->collection(), ['_id' => $theId]); + }); + }); + return 0; + } return $this->connection->deleteOne($this->databaseName(), $this->collection(), ['_id' => $id]); } /** @param array|Filter $filter */ public function deleteBy(array|Filter $filter): int { + $uow = UnitOfWork::current(); + if ($uow && !UnitOfWork::isSuspended()) { + $self = $this; $conn = $this->connection; $flt = $this->normalizeFilterInput($filter); + $uow->enqueue($conn, function () use ($self, $flt) { + UnitOfWork::suspendDuring(function () use ($self, $flt) { + $self->getConnection()->deleteOne($self->databaseName(), $self->collection(), $flt); + }); + }); + return 0; + } // For MVP provide deleteOne semantic; bulk deletes could be added later return $this->connection->deleteOne($this->databaseName(), $this->collection(), $this->normalizeFilterInput($filter)); } @@ -236,6 +278,13 @@ abstract class AbstractMongoDao $class = $this->dtoClass(); /** @var AbstractDto $dto */ $dto = $class::fromArray($doc); + $uow = UnitOfWork::current(); + if ($uow && !UnitOfWork::isSuspended()) { + $idVal = $doc['_id'] ?? null; + if ($idVal !== null) { + $uow->attach(static::class, (string)$idVal, $dto); + } + } return $dto; } diff --git a/src/Orm/UnitOfWork.php b/src/Orm/UnitOfWork.php new file mode 100644 index 0000000..72570e4 --- /dev/null +++ b/src/Orm/UnitOfWork.php @@ -0,0 +1,153 @@ +> map[daoClass][id] = DTO */ + private array $identityMap = []; + + /** + * Queues grouped by a connection hash key. + * Each entry: ['conn' => object, 'ops' => list] + * + * @var array}> + */ + private array $queues = []; + + private function __construct() {} + + public static function begin(): UnitOfWork + { + if (self::$current !== null) { + return self::$current; + } + self::$current = new UnitOfWork(); + return self::$current; + } + + /** + * Run a Unit of Work and automatically commit or rollback on exception. + * @template T + * @param Closure(UnitOfWork):T $callback + * @return mixed + */ + public static function run(Closure $callback): mixed + { + $uow = self::begin(); + try { + $result = $callback($uow); + $uow->commit(); + return $result; + } catch (\Throwable $e) { + $uow->rollback(); + throw $e; + } + } + + public static function current(): ?UnitOfWork + { + return self::$current; + } + + /** Temporarily suspend UoW interception so DAOs execute immediately within the callable. */ + public static function suspendDuring(Closure $cb): mixed + { + $prev = self::$suspended; + self::$suspended = true; + try { return $cb(); } finally { self::$suspended = $prev; } + } + + public static function isSuspended(): bool + { + return self::$suspended; + } + + // ===== Identity Map ===== + + /** Attach a DTO to identity map. */ + public function attach(string $daoClass, string $id, object $dto): void + { + $this->identityMap[$daoClass][$id] = $dto; + } + + /** Fetch an attached DTO if present. */ + public function get(string $daoClass, string $id): ?object + { + return $this->identityMap[$daoClass][$id] ?? null; + } + + // ===== Defer operations ===== + + /** Enqueue a mutation for the given connection object. */ + public function enqueue(object $connection, Closure $operation): void + { + $key = spl_object_hash($connection); + if (!isset($this->queues[$key])) { + $this->queues[$key] = ['conn' => $connection, 'ops' => []]; + } + $this->queues[$key]['ops'][] = $operation; + } + + /** Execute all queued operations per connection within a transaction/session. */ + public function commit(): void + { + // Ensure we run ops with DAO interception suspended to avoid re-enqueue + self::suspendDuring(function () { + // Grouped by connection type + foreach ($this->queues as $entry) { + $conn = $entry['conn']; + $ops = $entry['ops']; + // PDO/SQL path: has transaction(callable) + if (method_exists($conn, 'transaction')) { + $conn->transaction(function () use ($ops) { + foreach ($ops as $op) { $op(); } + return null; + }); + } + // Mongo path: try withTransaction first, then withSession, else run directly + elseif (method_exists($conn, 'withTransaction')) { + $conn->withTransaction(function () use ($ops) { + foreach ($ops as $op) { $op(); } + return null; + }); + } elseif (method_exists($conn, 'withSession')) { + $conn->withSession(function () use ($ops) { + foreach ($ops as $op) { $op(); } + return null; + }); + } else { + // Fallback: no transaction API; just run + foreach ($ops as $op) { $op(); } + } + } + }); + + // Clear queues after successful commit + $this->queues = []; + // Do not clear identity map by default; keep for the scope + // Close UoW scope + self::$current = null; + } + + /** Rollback just clears queues and current context; actual rollback is handled by transactions when run. */ + public function rollback(): void + { + $this->queues = []; + self::$current = null; + } +} diff --git a/tests/UnitOfWorkSqliteTest.php b/tests/UnitOfWorkSqliteTest.php new file mode 100644 index 0000000..36ed6a5 --- /dev/null +++ b/tests/UnitOfWorkSqliteTest.php @@ -0,0 +1,88 @@ + 'sqlite', 'path' => ':memory:']); + } + + public function testDeferredUpdateAndDeleteCommit(): void + { + $conn = $this->conn(); + // schema + $conn->execute('CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT, name TEXT)'); + + // DAO/DTO inline + $dto = new class([]) extends AbstractDto {}; + $dtoClass = get_class($dto); + $dao = new class($conn, $dtoClass) extends AbstractDao { + private string $dto; + public function __construct($c, string $dto) { parent::__construct($c); $this->dto = $dto; } + public function getTable(): string { return 'users'; } + protected function dtoClass(): string { return $this->dto; } + protected function schema(): array { return ['primaryKey' => 'id', 'columns' => ['id'=>['cast'=>'int'],'email'=>['cast'=>'string'],'name'=>['cast'=>'string']]]; } + }; + + // Insert immediate + $created = $dao->insert(['email' => 'u@example.com', 'name' => 'User']); + $id = (int)($created->toArray(false)['id'] ?? 0); + $this->assertGreaterThan(0, $id); + + // Run UoW with deferred update and delete + UnitOfWork::run(function(UnitOfWork $uow) use ($dao, $id) { + $one = $dao->findById($id); // attaches to identity map + $this->assertNotNull($one); + // defer update + $dao->update($id, ['name' => 'Changed']); + // defer deleteBy criteria (will be executed after update) + $dao->deleteBy(['id' => $id]); + // commit done by run() + }); + + // After commit, record should be deleted + $this->assertNull($dao->findById($id)); + } + + public function testRollbackOnExceptionClearsOps(): void + { + $conn = $this->conn(); + $conn->execute('CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT, name TEXT)'); + + $dto = new class([]) extends AbstractDto {}; + $dtoClass = get_class($dto); + $dao = new class($conn, $dtoClass) extends AbstractDao { + private string $dto; public function __construct($c, string $dto) { parent::__construct($c); $this->dto = $dto; } + public function getTable(): string { return 'users'; } + protected function dtoClass(): string { return $this->dto; } + protected function schema(): array { return ['primaryKey'=>'id','columns'=>['id'=>['cast'=>'int'],'email'=>['cast'=>'string'],'name'=>['cast'=>'string']]]; } + }; + + $created = $dao->insert(['email' => 'x@example.com', 'name' => 'X']); + $id = (int)($created->toArray(false)['id'] ?? 0); + + try { + UnitOfWork::run(function(UnitOfWork $uow) use ($dao, $id) { + $dao->update($id, ['name' => 'Won\'t Persist']); + throw new \RuntimeException('boom'); + }); + $this->fail('Exception expected'); + } catch (\RuntimeException $e) { + // ok + } + + // Update should not be applied due to rollback + $after = $dao->findById($id); + $this->assertSame('X', $after?->toArray(false)['name'] ?? null); + } +}