SQL and Mongo relation-aware delete ordering and optional cascade deletes

This commit is contained in:
Funky Waddle 2025-12-10 08:55:00 -06:00
parent 5e007a72dd
commit a5182ae282
3 changed files with 166 additions and 16 deletions

View file

@ -154,7 +154,12 @@ abstract class AbstractDao implements DaoInterface
$toStore = $this->prepareForUpdate($data); $toStore = $this->prepareForUpdate($data);
$self = $this; $self = $this;
$conn = $this->connection; $conn = $this->connection;
$uow->enqueue($conn, function () use ($self, $id, $toStore) { $uow->enqueueWithMeta($conn, [
'type' => 'update',
'mode' => 'byId',
'dao' => $this,
'id' => (string)$id,
], function () use ($self, $id, $toStore) {
UnitOfWork::suspendDuring(function () use ($self, $id, $toStore) { UnitOfWork::suspendDuring(function () use ($self, $id, $toStore) {
// execute real update now // execute real update now
$sets = []; $sets = [];
@ -202,7 +207,12 @@ abstract class AbstractDao implements DaoInterface
$uow = UnitOfWork::current(); $uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) { if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $theId = $id; $self = $this; $conn = $this->connection; $theId = $id;
$uow->enqueue($conn, function () use ($self, $theId) { $uow->enqueueWithMeta($conn, [
'type' => 'delete',
'mode' => 'byId',
'dao' => $this,
'id' => (string)$id,
], function () use ($self, $theId) {
UnitOfWork::suspendDuring(function () use ($self, $theId) { $self->deleteById($theId); }); UnitOfWork::suspendDuring(function () use ($self, $theId) { $self->deleteById($theId); });
}); });
// deferred; immediate affected count unknown // deferred; immediate affected count unknown
@ -225,7 +235,12 @@ abstract class AbstractDao implements DaoInterface
$uow = UnitOfWork::current(); $uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) { if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $crit = $criteria; $self = $this; $conn = $this->connection; $crit = $criteria;
$uow->enqueue($conn, function () use ($self, $crit) { $uow->enqueueWithMeta($conn, [
'type' => 'delete',
'mode' => 'byCriteria',
'dao' => $this,
'criteria' => $criteria,
], function () use ($self, $crit) {
UnitOfWork::suspendDuring(function () use ($self, $crit) { $self->deleteBy($crit); }); UnitOfWork::suspendDuring(function () use ($self, $crit) { $self->deleteBy($crit); });
}); });
return 0; return 0;
@ -258,7 +273,12 @@ abstract class AbstractDao implements DaoInterface
if ($uow && !UnitOfWork::isSuspended()) { if ($uow && !UnitOfWork::isSuspended()) {
if (empty($data)) { return 0; } if (empty($data)) { return 0; }
$self = $this; $conn = $this->connection; $crit = $criteria; $payload = $this->prepareForUpdate($data); $self = $this; $conn = $this->connection; $crit = $criteria; $payload = $this->prepareForUpdate($data);
$uow->enqueue($conn, function () use ($self, $crit, $payload) { $uow->enqueueWithMeta($conn, [
'type' => 'update',
'mode' => 'byCriteria',
'dao' => $this,
'criteria' => $criteria,
], function () use ($self, $crit, $payload) {
UnitOfWork::suspendDuring(function () use ($self, $crit, $payload) { $self->updateBy($crit, $payload); }); UnitOfWork::suspendDuring(function () use ($self, $crit, $payload) { $self->updateBy($crit, $payload); });
}); });
// unknown affected rows until commit // unknown affected rows until commit
@ -284,6 +304,12 @@ abstract class AbstractDao implements DaoInterface
return $this->connection->execute($sql, array_merge($setParams, $whereBindings)); return $this->connection->execute($sql, array_merge($setParams, $whereBindings));
} }
/** Expose relation metadata for UoW ordering/cascades. */
public function relationMap(): array
{
return $this->relations();
}
/** /**
* @param array<string,mixed> $criteria * @param array<string,mixed> $criteria
* @return array{0:string,1:array<string,mixed>} * @return array{0:string,1:array<string,mixed>}

View file

@ -158,7 +158,12 @@ abstract class AbstractMongoDao
$uow = UnitOfWork::current(); $uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) { if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $theId = $id; $payload = $data; $self = $this; $conn = $this->connection; $theId = $id; $payload = $data;
$uow->enqueue($conn, function () use ($self, $theId, $payload) { $uow->enqueueWithMeta($conn, [
'type' => 'update',
'mode' => 'byId',
'dao' => $this,
'id' => (string)$id,
], function () use ($self, $theId, $payload) {
UnitOfWork::suspendDuring(function () use ($self, $theId, $payload) { UnitOfWork::suspendDuring(function () use ($self, $theId, $payload) {
$self->getConnection()->updateOne($self->databaseName(), $self->collection(), ['_id' => $theId], ['$set' => $payload]); $self->getConnection()->updateOne($self->databaseName(), $self->collection(), ['_id' => $theId], ['$set' => $payload]);
}); });
@ -176,7 +181,12 @@ abstract class AbstractMongoDao
$uow = UnitOfWork::current(); $uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) { if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $theId = $id; $self = $this; $conn = $this->connection; $theId = $id;
$uow->enqueue($conn, function () use ($self, $theId) { $uow->enqueueWithMeta($conn, [
'type' => 'delete',
'mode' => 'byId',
'dao' => $this,
'id' => (string)$id,
], function () use ($self, $theId) {
UnitOfWork::suspendDuring(function () use ($self, $theId) { UnitOfWork::suspendDuring(function () use ($self, $theId) {
$self->getConnection()->deleteOne($self->databaseName(), $self->collection(), ['_id' => $theId]); $self->getConnection()->deleteOne($self->databaseName(), $self->collection(), ['_id' => $theId]);
}); });
@ -192,7 +202,12 @@ abstract class AbstractMongoDao
$uow = UnitOfWork::current(); $uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) { if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $flt = $this->normalizeFilterInput($filter); $self = $this; $conn = $this->connection; $flt = $this->normalizeFilterInput($filter);
$uow->enqueue($conn, function () use ($self, $flt) { $uow->enqueueWithMeta($conn, [
'type' => 'delete',
'mode' => 'byCriteria',
'dao' => $this,
'criteria' => $flt,
], function () use ($self, $flt) {
UnitOfWork::suspendDuring(function () use ($self, $flt) { UnitOfWork::suspendDuring(function () use ($self, $flt) {
$self->getConnection()->deleteOne($self->databaseName(), $self->collection(), $flt); $self->getConnection()->deleteOne($self->databaseName(), $self->collection(), $flt);
}); });
@ -432,4 +447,10 @@ abstract class AbstractMongoDao
$this->with = []; $this->with = [];
// keep relationFields for potential subsequent relation loads within same high-level call // keep relationFields for potential subsequent relation loads within same high-level call
} }
/** Expose relation metadata for UoW ordering/cascades. */
public function relationMap(): array
{
return $this->relations();
}
} }

View file

@ -3,6 +3,8 @@
namespace Pairity\Orm; namespace Pairity\Orm;
use Closure; use Closure;
use Pairity\Model\AbstractDao as SqlDao;
use Pairity\NoSql\Mongo\AbstractMongoDao as MongoDao;
/** /**
* Opt-in Unit of Work (MVP): * Opt-in Unit of Work (MVP):
@ -23,9 +25,15 @@ final class UnitOfWork
/** /**
* Queues grouped by a connection hash key. * Queues grouped by a connection hash key.
* Each entry: ['conn' => object, 'ops' => list<Closure>] * Each entry: ['conn' => object, 'ops' => list<array{op:Closure, meta:array<string,mixed>}>]
* meta keys (MVP):
* - type: 'update'|'delete'|'raw'
* - mode: 'byId'|'byCriteria'|'raw'
* - dao: object (DAO instance)
* - id: string (for byId)
* - criteria: array (for byCriteria)
* *
* @var array<string, array{conn:object, ops:array<int,Closure>}> * @var array<string, array{conn:object, ops:array<int,array{op:Closure,meta:array<string,mixed>}>}>
*/ */
private array $queues = []; private array $queues = [];
@ -93,14 +101,24 @@ final class UnitOfWork
// ===== Defer operations ===== // ===== Defer operations =====
/** Enqueue a mutation for the given connection object. */ /** Enqueue a mutation for the given connection object (back-compat, raw op). */
public function enqueue(object $connection, Closure $operation): void public function enqueue(object $connection, Closure $operation): void
{ {
$key = spl_object_hash($connection); $key = spl_object_hash($connection);
if (!isset($this->queues[$key])) { if (!isset($this->queues[$key])) {
$this->queues[$key] = ['conn' => $connection, 'ops' => []]; $this->queues[$key] = ['conn' => $connection, 'ops' => []];
} }
$this->queues[$key]['ops'][] = $operation; $this->queues[$key]['ops'][] = ['op' => $operation, 'meta' => ['type' => 'raw', 'mode' => 'raw']];
}
/** Enqueue a mutation with metadata for relation-aware ordering/cascades. */
public function enqueueWithMeta(object $connection, array $meta, Closure $operation): void
{
$key = spl_object_hash($connection);
if (!isset($this->queues[$key])) {
$this->queues[$key] = ['conn' => $connection, 'ops' => []];
}
$this->queues[$key]['ops'][] = ['op' => $operation, 'meta' => $meta];
} }
/** Execute all queued operations per connection within a transaction/session. */ /** Execute all queued operations per connection within a transaction/session. */
@ -111,28 +129,28 @@ final class UnitOfWork
// Grouped by connection type // Grouped by connection type
foreach ($this->queues as $entry) { foreach ($this->queues as $entry) {
$conn = $entry['conn']; $conn = $entry['conn'];
$ops = $entry['ops']; $ops = $this->expandAndOrder($entry['ops']);
// PDO/SQL path: has transaction(callable) // PDO/SQL path: has transaction(callable)
if (method_exists($conn, 'transaction')) { if (method_exists($conn, 'transaction')) {
$conn->transaction(function () use ($ops) { $conn->transaction(function () use ($ops) {
foreach ($ops as $op) { $op(); } foreach ($ops as $o) { ($o['op'])(); }
return null; return null;
}); });
} }
// Mongo path: try withTransaction first, then withSession, else run directly // Mongo path: try withTransaction first, then withSession, else run directly
elseif (method_exists($conn, 'withTransaction')) { elseif (method_exists($conn, 'withTransaction')) {
$conn->withTransaction(function () use ($ops) { $conn->withTransaction(function () use ($ops) {
foreach ($ops as $op) { $op(); } foreach ($ops as $o) { ($o['op'])(); }
return null; return null;
}); });
} elseif (method_exists($conn, 'withSession')) { } elseif (method_exists($conn, 'withSession')) {
$conn->withSession(function () use ($ops) { $conn->withSession(function () use ($ops) {
foreach ($ops as $op) { $op(); } foreach ($ops as $o) { ($o['op'])(); }
return null; return null;
}); });
} else { } else {
// Fallback: no transaction API; just run // Fallback: no transaction API; just run
foreach ($ops as $op) { $op(); } foreach ($ops as $o) { ($o['op'])(); }
} }
} }
}); });
@ -150,4 +168,89 @@ final class UnitOfWork
$this->queues = []; $this->queues = [];
self::$current = null; self::$current = null;
} }
/**
* Expand cascades and order ops so child deletes run before parent deletes.
* @param array<int,array{op:Closure,meta:array<string,mixed>}> $ops
* @return array<int,array{op:Closure,meta:array<string,mixed>}> ordered ops
*/
private function expandAndOrder(array $ops): array
{
$expanded = [];
foreach ($ops as $o) {
$meta = $o['meta'] ?? [];
// Detect deleteById on a DAO with cascade-enabled relations
if (($meta['type'] ?? '') === 'delete' && ($meta['mode'] ?? '') === 'byId' && isset($meta['dao']) && is_object($meta['dao'])) {
$dao = $meta['dao'];
$parentId = (string)($meta['id'] ?? '');
if ($parentId !== '') {
// Determine relations and cascade flags
$rels = $this->readRelations($dao);
foreach ($rels as $name => $cfg) {
$type = (string)($cfg['type'] ?? '');
$cascade = false;
if (isset($cfg['cascadeDelete'])) {
$cascade = (bool)$cfg['cascadeDelete'];
} elseif (isset($cfg['cascade']['delete'])) {
$cascade = (bool)$cfg['cascade']['delete'];
}
if (!$cascade) { continue; }
if ($type === 'hasMany' || $type === 'hasOne') {
$childDaoClass = $cfg['dao'] ?? null;
$foreignKey = (string)($cfg['foreignKey'] ?? '');
$localKey = (string)($cfg['localKey'] ?? 'id');
if (!is_string($childDaoClass) || $foreignKey === '') { continue; }
// Instantiate child DAO sharing same connection
try {
/** @var object $childDao */
$childDao = new $childDaoClass($dao->getConnection());
} catch (\Throwable) {
continue;
}
// Create a child delete op to run before parent
$childOp = function () use ($childDao, $foreignKey, $parentId) {
self::suspendDuring(function () use ($childDao, $foreignKey, $parentId) {
// delete children by FK
if ($childDao instanceof SqlDao) {
$childDao->deleteBy([$foreignKey => $parentId]);
} elseif ($childDao instanceof MongoDao) {
$childDao->deleteBy([$foreignKey => $parentId]);
}
});
};
$expanded[] = ['op' => $childOp, 'meta' => ['type' => 'delete', 'mode' => 'byCriteria', 'dao' => $childDao]];
}
}
}
}
// Then the original op
$expanded[] = $o;
}
// Basic stable order is fine since cascades were inserted before parent.
return $expanded;
}
/**
* Read relations metadata from DAO instance if available.
* @return array<string,mixed>
*/
private function readRelations(object $dao): array
{
// Prefer a public relationMap() accessor if provided
if (method_exists($dao, 'relationMap')) {
try { $rels = $dao->relationMap(); if (is_array($rels)) return $rels; } catch (\Throwable) {}
}
// Fallback: try calling protected relations() via reflection
try {
$ref = new \ReflectionObject($dao);
if ($ref->hasMethod('relations')) {
$m = $ref->getMethod('relations');
$m->setAccessible(true);
$rels = $m->invoke($dao);
if (is_array($rels)) return $rels;
}
} catch (\Throwable) {}
return [];
}
} }