From 693e83625db59073b6416ae317a48614ebeaca00 Mon Sep 17 00:00:00 2001 From: Funky Waddle Date: Thu, 11 Dec 2025 00:01:53 -0600 Subject: [PATCH] finish Unit of Work enhancements. Add Event handling --- .phpunit.result.cache | 1 + README.md | 141 +++++++++++++++++++ src/Events/EventDispatcher.php | 63 +++++++++ src/Events/Events.php | 21 +++ src/Events/SubscriberInterface.php | 15 +++ src/Model/AbstractDao.php | 156 +++++++++++++++++----- src/NoSql/Mongo/AbstractMongoDao.php | 78 ++++++++++- src/Orm/OptimisticLockException.php | 7 + src/Orm/UnitOfWork.php | 193 +++++++++++++++++++++++++++ tests/EventSystemSqliteTest.php | 117 ++++++++++++++++ tests/MongoEventSystemTest.php | 72 ++++++++++ tests/MongoOptimisticLockTest.php | 56 ++++++++ tests/OptimisticLockSqliteTest.php | 63 +++++++++ 13 files changed, 946 insertions(+), 37 deletions(-) create mode 100644 .phpunit.result.cache create mode 100644 src/Events/EventDispatcher.php create mode 100644 src/Events/Events.php create mode 100644 src/Events/SubscriberInterface.php create mode 100644 src/Orm/OptimisticLockException.php create mode 100644 tests/EventSystemSqliteTest.php create mode 100644 tests/MongoEventSystemTest.php create mode 100644 tests/MongoOptimisticLockTest.php create mode 100644 tests/OptimisticLockSqliteTest.php diff --git a/.phpunit.result.cache b/.phpunit.result.cache new file mode 100644 index 0000000..8cc3a55 --- /dev/null +++ b/.phpunit.result.cache @@ -0,0 +1 @@ +{"version":2,"defects":{"Pairity\\Tests\\BelongsToManyMysqlTest::testBelongsToManyEagerAndHelpers":1,"Pairity\\Tests\\BelongsToManySqliteTest::testBelongsToManyEagerAndPivotHelpers":8,"Pairity\\Tests\\CastersAndAccessorsSqliteTest::testCustomCasterAndDtoAccessorsMutators":7,"Pairity\\Tests\\JoinEagerMysqlTest::testJoinEagerHasManyAndBelongsTo":1,"Pairity\\Tests\\JoinEagerSqliteTest::testHasManyJoinEagerWithProjectionAndSoftDeleteScope":8,"Pairity\\Tests\\JoinEagerSqliteTest::testBelongsToJoinEagerSingleLevel":8,"Pairity\\Tests\\MongoAdapterTest::testCrudCycle":1,"Pairity\\Tests\\MongoDaoTest::testCrudViaDao":8,"Pairity\\Tests\\MongoOptimisticLockTest::testVersionIncrementOnUpdate":8,"Pairity\\Tests\\MongoPaginationTest::testPaginateAndSimplePaginateWithScopes":8,"Pairity\\Tests\\MongoRelationsTest::testEagerAndLazyRelations":8,"Pairity\\Tests\\MysqlSmokeTest::testCreateAndDropTable":1,"Pairity\\Tests\\PaginationSqliteTest::testPaginateAndSimplePaginateWithScopesAndRelations":8,"Pairity\\Tests\\RelationsNestedConstraintsSqliteTest::testNestedEagerAndPerRelationFieldsConstraint":8,"Pairity\\Tests\\SchemaBuilderSqliteTest::testCreateAlterDropCycle":8,"Pairity\\Tests\\UnitOfWorkCascadeMongoTest::testDeleteByIdCascadesToChildren":8,"Pairity\\Tests\\UnitOfWorkCascadeSqliteTest::testDeleteByIdCascadesToChildren":7,"Pairity\\Tests\\UnitOfWorkMongoTest::testDeferredUpdateAndDeleteCommit":8,"Pairity\\Tests\\UnitOfWorkMongoTest::testRollbackOnExceptionClearsOps":8},"times":{"Pairity\\Tests\\BelongsToManyMysqlTest::testBelongsToManyEagerAndHelpers":0.001,"Pairity\\Tests\\BelongsToManySqliteTest::testBelongsToManyEagerAndPivotHelpers":0.004,"Pairity\\Tests\\CastersAndAccessorsSqliteTest::testCustomCasterAndDtoAccessorsMutators":0.001,"Pairity\\Tests\\JoinEagerMysqlTest::testJoinEagerHasManyAndBelongsTo":0,"Pairity\\Tests\\JoinEagerSqliteTest::testHasManyJoinEagerWithProjectionAndSoftDeleteScope":0,"Pairity\\Tests\\JoinEagerSqliteTest::testBelongsToJoinEagerSingleLevel":0,"Pairity\\Tests\\MongoAdapterTest::testCrudCycle":0.002,"Pairity\\Tests\\MongoDaoTest::testCrudViaDao":0.001,"Pairity\\Tests\\MongoOptimisticLockTest::testVersionIncrementOnUpdate":0,"Pairity\\Tests\\MongoPaginationTest::testPaginateAndSimplePaginateWithScopes":0,"Pairity\\Tests\\MongoRelationsTest::testEagerAndLazyRelations":0,"Pairity\\Tests\\MysqlSmokeTest::testCreateAndDropTable":0,"Pairity\\Tests\\OptimisticLockSqliteTest::testVersionLockingIncrementsAndBlocksBulkUpdate":0,"Pairity\\Tests\\PaginationSqliteTest::testPaginateAndSimplePaginateWithScopesAndRelations":0.001,"Pairity\\Tests\\RelationsNestedConstraintsSqliteTest::testNestedEagerAndPerRelationFieldsConstraint":0,"Pairity\\Tests\\SchemaBuilderSqliteTest::testCreateAlterDropCycle":0.001,"Pairity\\Tests\\SoftDeletesTimestampsSqliteTest::testTimestampsAndSoftDeletesFlow":0.001,"Pairity\\Tests\\UnitOfWorkCascadeMongoTest::testDeleteByIdCascadesToChildren":0,"Pairity\\Tests\\UnitOfWorkCascadeSqliteTest::testDeleteByIdCascadesToChildren":0,"Pairity\\Tests\\UnitOfWorkMongoTest::testDeferredUpdateAndDeleteCommit":0,"Pairity\\Tests\\UnitOfWorkMongoTest::testRollbackOnExceptionClearsOps":0,"Pairity\\Tests\\UnitOfWorkSqliteTest::testDeferredUpdateAndDeleteCommit":0,"Pairity\\Tests\\UnitOfWorkSqliteTest::testRollbackOnExceptionClearsOps":0}} \ No newline at end of file diff --git a/README.md b/README.md index f836be5..ecf2a6f 100644 --- a/README.md +++ b/README.md @@ -690,6 +690,66 @@ Behavior: See the test `tests/CastersAndAccessorsSqliteTest.php` for a complete, runnable example. +## Unit of Work (opt‑in) + +Pairity includes an optional Unit of Work (UoW) that can be enabled per block to batch updates/deletes and use an identity map: + +```php +use Pairity\Orm\UnitOfWork; + +UnitOfWork::run(function(UnitOfWork $uow) use ($userDao, $postDao) { + $user = $userDao->findById(42); // identity map + $userDao->update(42, ['name' => 'New']); // deferred update + $postDao->deleteBy(['user_id' => 42]); // deferred delete +}); // transactional commit +``` + +Behavior (MVP): +- Outside a UoW, DAO calls execute immediately (today’s behavior). +- Inside a UoW, updates/deletes are deferred; inserts remain immediate (to return real IDs). Commit runs within a transaction/session per connection. +- Relation‑aware cascades: if a relation is marked with `cascadeDelete`, child deletes run before the parent delete at commit time. + +### Optimistic locking (MVP) + +You can enable optimistic locking to avoid lost updates. Two strategies are supported: + +- SQL via DAO `schema()` + +```php +protected function schema(): array +{ + return [ + 'primaryKey' => 'id', + 'columns' => [ + 'id' => ['cast' => 'int'], + 'name' => ['cast' => 'string'], + 'version' => ['cast' => 'int'], + ], + 'locking' => [ + 'type' => 'version', // or 'timestamp' + 'column' => 'version', // compare‑and‑set column + ], + ]; +} +``` + +When locking is enabled, `update($id, $data)` performs a compare‑and‑set on the configured column and increments it for `type = version`. If the row has changed since the read, an `OptimisticLockException` is thrown. + +Note: bulk `updateBy(...)` is blocked under optimistic locking to avoid unsafe mass updates. + +- MongoDB via DAO `locking()` + +```php +protected function locking(): array +{ + return [ 'type' => 'version', 'column' => 'version' ]; +} +``` + +Mongo `update($id, $data)` reads the current `version` and issues a conditional `updateOne` with `{$inc: {version: 1}}`. If the compare fails, an `OptimisticLockException` is thrown. + +Tests: see `tests/OptimisticLockSqliteTest.php` (SQLite). Mongo tests are guarded and run in CI when `ext-mongodb` and a server are available. + ## Pagination Both SQL and Mongo DAOs provide pagination helpers that return DTOs alongside metadata. They honor the usual query modifiers: @@ -830,6 +890,87 @@ Behavior details: - Works for both SQL DAOs and Mongo DAOs. - Current MVP focuses on delete cascades; cascades for updates and more advanced ordering rules can be added later. +## Event system (Milestone F) + +Pairity provides a lightweight event system so you can hook into DAO operations and UoW commits for audit logging, validation, normalization, caching hooks, etc. + +### Dispatcher and subscribers + +- Global access: `Pairity\Events\Events::dispatcher()` returns the singleton dispatcher. +- Register listeners: `listen(string $event, callable $listener, int $priority = 0)`. +- Register subscribers: implement `Pairity\Events\SubscriberInterface` with `getSubscribedEvents(): array` returning `event => callable|[callable, priority]`. + +Example listener registration: + +```php +use Pairity\Events\Events; + +// Normalize a field before insert +Events::dispatcher()->listen('dao.beforeInsert', function (array &$payload) { + // Payload always contains 'dao' and table/collection + input data by reference + if (($payload['table'] ?? '') === 'users') { + $payload['data']['name'] = trim((string)($payload['data']['name'] ?? '')); + } +}); + +// Audit after update +Events::dispatcher()->listen('dao.afterUpdate', function (array &$payload) { + // $payload['dto'] is the updated DTO (SQL or Mongo) + // write to your log sink here +}); +``` + +### Event names and payloads + +DAO events (SQL and Mongo): +- `dao.beforeFind` → `{ dao, table|collection, criteria|filter& }` (criteria/filter is passed by reference) +- `dao.afterFind` → `{ dao, table|collection, dto|null }` or `{ dao, table|collection, dtos: DTO[] }` +- `dao.beforeInsert`→ `{ dao, table|collection, data& }` (data by reference) +- `dao.afterInsert` → `{ dao, table|collection, dto }` +- `dao.beforeUpdate`→ `{ dao, table|collection, id?, criteria?, data& }` (data by reference; `criteria` for bulk SQL updates) +- `dao.afterUpdate` → `{ dao, table|collection, dto }` (or `{ affected }` for bulk SQL `updateBy`) +- `dao.beforeDelete`→ `{ dao, table|collection, id? , criteria|filter?& }` +- `dao.afterDelete` → `{ dao, table|collection, id?|criteria|filter?, affected:int }` + +Unit of Work events: +- `uow.beforeCommit` → `{ context: 'uow' }` +- `uow.afterCommit` → `{ context: 'uow' }` + +Notes: +- Listeners run in priority order (higher first). Exceptions thrown inside listeners are swallowed to avoid breaking core flows. +- When no listeners are registered, the overhead is negligible (fast-path checks). + +### Example subscriber + +```php +use Pairity\Events\SubscriberInterface; +use Pairity\Events\Events; + +final class AuditSubscriber implements SubscriberInterface +{ + public function getSubscribedEvents(): array + { + return [ + 'dao.afterInsert' => [[$this, 'onAfterInsert'], 10], + 'uow.afterCommit' => [$this, 'onAfterCommit'], + ]; + } + + public function onAfterInsert(array &$payload): void + { + // e.g., push to queue/log sink + } + + public function onAfterCommit(array &$payload): void + { + // flush buffered audits + } +} + +// Somewhere during bootstrap +Events::dispatcher()->subscribe(new AuditSubscriber()); +``` + ## Roadmap - Relations enhancements: diff --git a/src/Events/EventDispatcher.php b/src/Events/EventDispatcher.php new file mode 100644 index 0000000..d2485a0 --- /dev/null +++ b/src/Events/EventDispatcher.php @@ -0,0 +1,63 @@ +> */ + private array $listeners = []; + + /** + * Register a listener for an event. + * Listener signature: function(array &$payload): void + */ + public function listen(string $event, callable $listener, int $priority = 0): void + { + $this->listeners[$event][] = ['priority' => $priority, 'listener' => $listener]; + // Sort by priority desc so higher runs first + usort($this->listeners[$event], function ($a, $b) { return $b['priority'] <=> $a['priority']; }); + } + + /** Register all listeners from a subscriber. */ + public function subscribe(SubscriberInterface $subscriber): void + { + foreach ((array)$subscriber->getSubscribedEvents() as $event => $handler) { + $callable = null; $priority = 0; + if (is_array($handler) && isset($handler[0])) { + $callable = $handler[0]; + $priority = (int)($handler[1] ?? 0); + } else { + $callable = $handler; + } + if (is_callable($callable)) { + $this->listen($event, $callable, $priority); + } + } + } + + /** + * Dispatch an event with a mutable payload (passed by reference to listeners). + * + * @param string $event + * @param array $payload + */ + public function dispatch(string $event, array &$payload = []): void + { + $list = $this->listeners[$event] ?? []; + if (!$list) return; + foreach ($list as $entry) { + try { + ($entry['listener'])($payload); + } catch (\Throwable) { + // swallow listener exceptions to avoid breaking core flow + } + } + } + + /** Remove all listeners for an event or all events. */ + public function clear(?string $event = null): void + { + if ($event === null) { $this->listeners = []; return; } + unset($this->listeners[$event]); + } +} diff --git a/src/Events/Events.php b/src/Events/Events.php new file mode 100644 index 0000000..0f46e0b --- /dev/null +++ b/src/Events/Events.php @@ -0,0 +1,21 @@ + callable|array{0:callable,1:int priority} + * Example: return [ + * 'dao.beforeInsert' => [[$this, 'onBeforeInsert'], 10], + * 'uow.afterCommit' => [$this, 'onAfterCommit'], + * ]; + */ + public function getSubscribedEvents(): array; +} diff --git a/src/Model/AbstractDao.php b/src/Model/AbstractDao.php index 008bd25..4136ba2 100644 --- a/src/Model/AbstractDao.php +++ b/src/Model/AbstractDao.php @@ -6,6 +6,7 @@ use Pairity\Contracts\ConnectionInterface; use Pairity\Contracts\DaoInterface; use Pairity\Orm\UnitOfWork; use Pairity\Model\Casting\CasterInterface; +use Pairity\Events\Events; abstract class AbstractDao implements DaoInterface { @@ -129,6 +130,8 @@ abstract class AbstractDao implements DaoInterface /** @param array $criteria */ public function findOneBy(array $criteria): ?AbstractDto { + // Events: dao.beforeFind (criteria may be mutated) + try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => &$criteria]; Events::dispatcher()->dispatch('dao.beforeFind', $ev); } catch (\Throwable) {} $criteria = $this->applyDefaultScopes($criteria); $this->applyRuntimeScopesToCriteria($criteria); [$where, $bindings] = $this->buildWhere($criteria); @@ -151,6 +154,8 @@ abstract class AbstractDao implements DaoInterface $this->resetRuntimeScopes(); $this->eagerStrategy = null; // reset $this->withStrategies = []; + // Events: dao.afterFind + try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterFind', $payload); } catch (\Throwable) {} return $dto; } @@ -172,6 +177,8 @@ abstract class AbstractDao implements DaoInterface */ public function findAllBy(array $criteria = []): array { + // Events: dao.beforeFind (criteria may be mutated) + try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => &$criteria]; Events::dispatcher()->dispatch('dao.beforeFind', $ev); } catch (\Throwable) {} $criteria = $this->applyDefaultScopes($criteria); $this->applyRuntimeScopesToCriteria($criteria); [$where, $bindings] = $this->buildWhere($criteria); @@ -192,6 +199,8 @@ abstract class AbstractDao implements DaoInterface $this->resetRuntimeScopes(); $this->eagerStrategy = null; // reset $this->withStrategies = []; + // Events: dao.afterFind (list) + try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dtos' => $dtos]; Events::dispatcher()->dispatch('dao.afterFind', $payload); } catch (\Throwable) {} return $dtos; } @@ -278,6 +287,8 @@ abstract class AbstractDao implements DaoInterface if (empty($data)) { throw new \InvalidArgumentException('insert() requires non-empty data'); } + // Events: dao.beforeInsert (allow mutation of $data) + try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeInsert', $ev); } catch (\Throwable) {} $data = $this->prepareForInsert($data); $cols = array_keys($data); $placeholders = array_map(fn($c) => ':' . $c, $cols); @@ -286,10 +297,14 @@ abstract class AbstractDao implements DaoInterface $id = $this->connection->lastInsertId(); $pk = $this->getPrimaryKey(); if ($id !== null) { - return $this->findById($id) ?? $this->hydrate(array_merge($data, [$pk => $id])); + $dto = $this->findById($id) ?? $this->hydrate(array_merge($data, [$pk => $id])); + try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterInsert', $payload); } catch (\Throwable) {} + return $dto; } // Fallback when lastInsertId is unavailable: return hydrated DTO from provided data - return $this->hydrate($this->castRowFromStorage($data)); + $dto = $this->hydrate($this->castRowFromStorage($data)); + try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterInsert', $payload); } catch (\Throwable) {} + return $dto; } /** @param array $data */ @@ -302,6 +317,8 @@ abstract class AbstractDao implements DaoInterface if (!$existing && empty($data)) { throw new \InvalidArgumentException('No data provided to update and record not found'); } + // Events: dao.beforeUpdate (mutate $data) + try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'id' => $id, 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeUpdate', $ev); } catch (\Throwable) {} $toStore = $this->prepareForUpdate($data); $self = $this; $conn = $this->connection; @@ -310,24 +327,18 @@ abstract class AbstractDao implements DaoInterface 'mode' => 'byId', 'dao' => $this, 'id' => (string)$id, + 'payload' => $toStore, ], function () use ($self, $id, $toStore) { UnitOfWork::suspendDuring(function () use ($self, $id, $toStore) { - // execute real update now - $sets = []; - $params = []; - foreach ($toStore as $col => $val) { - $sets[] = "$col = :set_$col"; - $params["set_$col"] = $val; - } - $params['pk'] = $id; - $sql = 'UPDATE ' . $self->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $self->getPrimaryKey() . ' = :pk'; - $self->getConnection()->execute($sql, $params); + $self->doImmediateUpdateWithLock($id, $toStore); }); }); $base = $existing ? $existing->toArray(false) : []; $pk = $this->getPrimaryKey(); $result = array_merge($base, $data, [$pk => $id]); - return $this->hydrate($this->castRowFromStorage($result)); + $dto = $this->hydrate($this->castRowFromStorage($result)); + try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterUpdate', $payload); } catch (\Throwable) {} + return $dto; } if (empty($data)) { @@ -335,21 +346,18 @@ abstract class AbstractDao implements DaoInterface if ($existing) return $existing; throw new \InvalidArgumentException('No data provided to update and record not found'); } + // Events: dao.beforeUpdate + try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'id' => $id, 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeUpdate', $ev); } catch (\Throwable) {} $data = $this->prepareForUpdate($data); - $sets = []; - $params = []; - foreach ($data as $col => $val) { - $sets[] = "$col = :set_$col"; - $params["set_$col"] = $val; - } - $params['pk'] = $id; - $sql = 'UPDATE ' . $this->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $this->getPrimaryKey() . ' = :pk'; - $this->connection->execute($sql, $params); + $this->doImmediateUpdateWithLock($id, $data); $updated = $this->findById($id); if ($updated === null) { $pk = $this->getPrimaryKey(); - return $this->hydrate($this->castRowFromStorage(array_merge($data, [$pk => $id]))); + $dto = $this->hydrate($this->castRowFromStorage(array_merge($data, [$pk => $id]))); + try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterUpdate', $payload); } catch (\Throwable) {} + return $dto; } + try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $updated]; Events::dispatcher()->dispatch('dao.afterUpdate', $payload); } catch (\Throwable) {} return $updated; } @@ -358,6 +366,7 @@ abstract class AbstractDao implements DaoInterface $uow = UnitOfWork::current(); if ($uow && !UnitOfWork::isSuspended()) { $self = $this; $conn = $this->connection; $theId = $id; + try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'id' => $id]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {} $uow->enqueueWithMeta($conn, [ 'type' => 'delete', 'mode' => 'byId', @@ -369,6 +378,7 @@ abstract class AbstractDao implements DaoInterface // deferred; immediate affected count unknown return 0; } + try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'id' => $id]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {} if ($this->hasSoftDeletes()) { $columns = $this->softDeleteConfig(); $deletedAt = $columns['deletedAt'] ?? 'deleted_at'; @@ -377,7 +387,9 @@ abstract class AbstractDao implements DaoInterface return $this->connection->execute($sql, ['ts' => $now, 'pk' => $id]); } $sql = 'DELETE FROM ' . $this->getTable() . ' WHERE ' . $this->getPrimaryKey() . ' = :pk'; - return $this->connection->execute($sql, ['pk' => $id]); + $affected = $this->connection->execute($sql, ['pk' => $id]); + try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'id' => $id, 'affected' => $affected]; Events::dispatcher()->dispatch('dao.afterDelete', $payload); } catch (\Throwable) {} + return $affected; } /** @param array $criteria */ @@ -386,6 +398,7 @@ abstract class AbstractDao implements DaoInterface $uow = UnitOfWork::current(); if ($uow && !UnitOfWork::isSuspended()) { $self = $this; $conn = $this->connection; $crit = $criteria; + try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => &$criteria]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {} $uow->enqueueWithMeta($conn, [ 'type' => 'delete', 'mode' => 'byCriteria', @@ -396,6 +409,7 @@ abstract class AbstractDao implements DaoInterface }); return 0; } + try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => &$criteria]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {} if ($this->hasSoftDeletes()) { [$where, $bindings] = $this->buildWhere($criteria); if ($where === '') { return 0; } @@ -409,7 +423,9 @@ abstract class AbstractDao implements DaoInterface [$where, $bindings] = $this->buildWhere($criteria); if ($where === '') { return 0; } $sql = 'DELETE FROM ' . $this->getTable() . ' WHERE ' . $where; - return $this->connection->execute($sql, $bindings); + $affected = $this->connection->execute($sql, $bindings); + try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => $criteria, 'affected' => $affected]; Events::dispatcher()->dispatch('dao.afterDelete', $payload); } catch (\Throwable) {} + return $affected; } /** @@ -423,6 +439,8 @@ abstract class AbstractDao implements DaoInterface $uow = UnitOfWork::current(); if ($uow && !UnitOfWork::isSuspended()) { if (empty($data)) { return 0; } + // Events: dao.beforeUpdate (bulk) + try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => &$criteria, 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeUpdate', $ev); } catch (\Throwable) {} $self = $this; $conn = $this->connection; $crit = $criteria; $payload = $this->prepareForUpdate($data); $uow->enqueueWithMeta($conn, [ 'type' => 'update', @@ -438,6 +456,10 @@ abstract class AbstractDao implements DaoInterface if (empty($data)) { return 0; } + // Optimistic locking note: bulk updates under optimistic locking are not supported + if ($this->hasOptimisticLocking()) { + throw new \Pairity\Orm\OptimisticLockException('Optimistic locking enabled: use update(id, ...) instead of bulk updateBy(...)'); + } [$where, $whereBindings] = $this->buildWhere($criteria); if ($where === '') { return 0; @@ -452,7 +474,9 @@ abstract class AbstractDao implements DaoInterface } $sql = 'UPDATE ' . $this->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $where; - return $this->connection->execute($sql, array_merge($setParams, $whereBindings)); + $affected = $this->connection->execute($sql, array_merge($setParams, $whereBindings)); + try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => $criteria, 'affected' => $affected]; Events::dispatcher()->dispatch('dao.afterUpdate', $payload); } catch (\Throwable) {} + return $affected; } /** Expose relation metadata for UoW ordering/cascades. */ @@ -645,10 +669,17 @@ abstract class AbstractDao implements DaoInterface $type = (string)($config['type'] ?? ''); $daoClass = $config['dao'] ?? null; $dtoClass = $config['dto'] ?? null; // kept for docs compatibility - if (!is_string($daoClass)) { continue; } - - /** @var class-string $daoClass */ - $relatedDao = new $daoClass($this->getConnection()); + // Resolve related DAO: allow daoInstance or factory callable, else class-string + $relatedDao = null; + if (isset($config['daoInstance']) && is_object($config['daoInstance'])) { + $relatedDao = $config['daoInstance']; + } elseif (isset($config['factory']) && is_callable($config['factory'])) { + try { $relatedDao = ($config['factory'])($this); } catch (\Throwable) { $relatedDao = null; } + } elseif (is_string($daoClass)) { + /** @var class-string $daoClass */ + try { $relatedDao = new $daoClass($this->getConnection()); } catch (\Throwable) { $relatedDao = null; } + } + if (!$relatedDao instanceof AbstractDao) { continue; } // Apply per-relation constraint, if any $constraint = $this->constraintForPath($name); if (is_callable($constraint)) { @@ -1147,6 +1178,8 @@ abstract class AbstractDao implements DaoInterface $idVal = $row[$pk] ?? null; if ($idVal !== null) { $uow->attach(static::class, (string)$idVal, $dto); + // Bind this DAO to allow snapshot diffing to emit updates + $uow->bindDao(static::class, (string)$idVal, $this); } } return $dto; @@ -1421,6 +1454,69 @@ abstract class AbstractDao implements DaoInterface return gmdate('Y-m-d H:i:s'); } + // ===== Optimistic locking (MVP) ===== + + protected function hasOptimisticLocking(): bool + { + $lock = $this->getSchema()['locking'] ?? []; + return is_array($lock) && isset($lock['type'], $lock['column']) && in_array($lock['type'], ['version','timestamp'], true); + } + + /** @return array{type:string,column:string}|array{} */ + protected function lockingConfig(): array + { + $lock = $this->getSchema()['locking'] ?? []; + return is_array($lock) ? $lock : []; + } + + /** Execute an immediate update with optimistic locking when configured. */ + private function doImmediateUpdateWithLock(int|string $id, array $toStore): void + { + if (!$this->hasOptimisticLocking()) { + // default path + $sets = []; + $params = []; + foreach ($toStore as $col => $val) { $sets[] = "$col = :set_$col"; $params["set_$col"] = $val; } + $params['pk'] = $id; + $sql = 'UPDATE ' . $this->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $this->getPrimaryKey() . ' = :pk'; + $this->connection->execute($sql, $params); + return; + } + + $cfg = $this->lockingConfig(); + $col = (string)$cfg['column']; + $type = (string)$cfg['type']; + + // Fetch current lock value + $pk = $this->getPrimaryKey(); + $row = $this->connection->query('SELECT ' . $col . ' AS c FROM ' . $this->getTable() . ' WHERE ' . $pk . ' = :pk LIMIT 1', ['pk' => $id]); + $current = $row[0]['c'] ?? null; + + // Build SETs + $sets = []; + $params = []; + foreach ($toStore as $c => $v) { $sets[] = "$c = :set_$c"; $params["set_$c"] = $v; } + + if ($type === 'version') { + // bump version + $sets[] = $col . ' = ' . $col . ' + 1'; + } + + // WHERE with lock compare + $params['pk'] = $id; + $where = $pk . ' = :pk'; + if ($current !== null) { + $params['exp_lock'] = $current; + $where .= ' AND ' . $col . ' = :exp_lock'; + } + + $sql = 'UPDATE ' . $this->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $where; + $affected = $this->connection->execute($sql, $params); + if ($current !== null && $affected === 0) { + throw new \Pairity\Orm\OptimisticLockException('Optimistic lock failed for ' . static::class . ' id=' . (string)$id); + } + } + // ===== Soft delete toggles ===== public function withTrashed(): static diff --git a/src/NoSql/Mongo/AbstractMongoDao.php b/src/NoSql/Mongo/AbstractMongoDao.php index ad0dd67..be7e825 100644 --- a/src/NoSql/Mongo/AbstractMongoDao.php +++ b/src/NoSql/Mongo/AbstractMongoDao.php @@ -4,6 +4,7 @@ namespace Pairity\NoSql\Mongo; use Pairity\Model\AbstractDto; use Pairity\Orm\UnitOfWork; +use Pairity\Events\Events; /** * Base DAO for MongoDB collections returning DTOs. @@ -121,6 +122,8 @@ abstract class AbstractMongoDao public function findOneBy(array|Filter $filter): ?AbstractDto { $filterArr = $this->normalizeFilterInput($filter); + // Events: dao.beforeFind (Mongo) — allow filter mutation + try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'filter' => &$filterArr]; Events::dispatcher()->dispatch('dao.beforeFind', $ev); } catch (\Throwable) {} $this->applyRuntimeScopesToFilter($filterArr); $opts = $this->buildOptions(); $opts['limit'] = 1; @@ -128,7 +131,9 @@ abstract class AbstractMongoDao $this->resetModifiers(); $this->resetRuntimeScopes(); $row = $docs[0] ?? null; - return $row ? $this->hydrate($row) : null; + $dto = $row ? $this->hydrate($row) : null; + try { $payload = ['dao' => $this, 'collection' => $this->collection(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterFind', $payload); } catch (\Throwable) {} + return $dto; } /** @@ -139,6 +144,8 @@ abstract class AbstractMongoDao public function findAllBy(array|Filter $filter = [], array $options = []): array { $filterArr = $this->normalizeFilterInput($filter); + // Events: dao.beforeFind (Mongo) + try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'filter' => &$filterArr]; Events::dispatcher()->dispatch('dao.beforeFind', $ev); } catch (\Throwable) {} $this->applyRuntimeScopesToFilter($filterArr); $opts = $this->buildOptions(); // external override/merge @@ -150,6 +157,7 @@ abstract class AbstractMongoDao } $this->resetModifiers(); $this->resetRuntimeScopes(); + try { $payload = ['dao' => $this, 'collection' => $this->collection(), 'dtos' => $dtos]; Events::dispatcher()->dispatch('dao.afterFind', $payload); } catch (\Throwable) {} return $dtos; } @@ -169,10 +177,14 @@ abstract class AbstractMongoDao public function insert(array $data): AbstractDto { // Inserts remain immediate to obtain a real id, even under UoW + // Events: dao.beforeInsert (Mongo) — allow mutation of $data + try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeInsert', $ev); } catch (\Throwable) {} $id = UnitOfWork::suspendDuring(function () use ($data) { return $this->connection->insertOne($this->databaseName(), $this->collection(), $data); }); - return $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id])); + $dto = $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id])); + try { $payload = ['dao' => $this, 'collection' => $this->collection(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterInsert', $payload); } catch (\Throwable) {} + return $dto; } /** @param array $data */ @@ -181,6 +193,8 @@ abstract class AbstractMongoDao $uow = UnitOfWork::current(); if ($uow && !UnitOfWork::isSuspended()) { $self = $this; $conn = $this->connection; $theId = $id; $payload = $data; + // Events: dao.beforeUpdate (Mongo) + try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'id' => $id, 'data' => &$payload]; Events::dispatcher()->dispatch('dao.beforeUpdate', $ev); } catch (\Throwable) {} $uow->enqueueWithMeta($conn, [ 'type' => 'update', 'mode' => 'byId', @@ -188,15 +202,21 @@ abstract class AbstractMongoDao 'id' => (string)$id, ], function () use ($self, $theId, $payload) { UnitOfWork::suspendDuring(function () use ($self, $theId, $payload) { - $self->getConnection()->updateOne($self->databaseName(), $self->collection(), ['_id' => $theId], ['$set' => $payload]); + $self->doImmediateUpdateWithLock($theId, $payload); }); }); $base = $this->findById($id)?->toArray(false) ?? []; $result = array_merge($base, $data, ['_id' => $id]); - return $this->hydrate($result); + $dto = $this->hydrate($result); + try { $p = ['dao' => $this, 'collection' => $this->collection(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterUpdate', $p); } catch (\Throwable) {} + return $dto; } - $this->connection->updateOne($this->databaseName(), $this->collection(), ['_id' => $id], ['$set' => $data]); - return $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id])); + // Events: dao.beforeUpdate (Mongo) + try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'id' => $id, 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeUpdate', $ev); } catch (\Throwable) {} + $this->doImmediateUpdateWithLock($id, $data); + $dto = $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id])); + try { $p = ['dao' => $this, 'collection' => $this->collection(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterUpdate', $p); } catch (\Throwable) {} + return $dto; } public function deleteById(string $id): int @@ -204,6 +224,7 @@ abstract class AbstractMongoDao $uow = UnitOfWork::current(); if ($uow && !UnitOfWork::isSuspended()) { $self = $this; $conn = $this->connection; $theId = $id; + try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'id' => $id]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {} $uow->enqueueWithMeta($conn, [ 'type' => 'delete', 'mode' => 'byId', @@ -216,7 +237,10 @@ abstract class AbstractMongoDao }); return 0; } - return $this->connection->deleteOne($this->databaseName(), $this->collection(), ['_id' => $id]); + try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'id' => $id]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {} + $affected = $this->connection->deleteOne($this->databaseName(), $this->collection(), ['_id' => $id]); + try { $p = ['dao' => $this, 'collection' => $this->collection(), 'id' => $id, 'affected' => $affected]; Events::dispatcher()->dispatch('dao.afterDelete', $p); } catch (\Throwable) {} + return $affected; } /** @param array|Filter $filter */ @@ -225,6 +249,7 @@ abstract class AbstractMongoDao $uow = UnitOfWork::current(); if ($uow && !UnitOfWork::isSuspended()) { $self = $this; $conn = $this->connection; $flt = $this->normalizeFilterInput($filter); + try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'filter' => &$flt]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {} $uow->enqueueWithMeta($conn, [ 'type' => 'delete', 'mode' => 'byCriteria', @@ -239,8 +264,10 @@ abstract class AbstractMongoDao } // For MVP provide deleteOne semantic; bulk deletes could be added later $flt = $this->normalizeFilterInput($filter); + try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'filter' => &$flt]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {} $this->applyRuntimeScopesToFilter($flt); $res = $this->connection->deleteOne($this->databaseName(), $this->collection(), $flt); + try { $p = ['dao' => $this, 'collection' => $this->collection(), 'filter' => $flt, 'affected' => $res]; Events::dispatcher()->dispatch('dao.afterDelete', $p); } catch (\Throwable) {} $this->resetRuntimeScopes(); return $res; } @@ -335,6 +362,8 @@ abstract class AbstractMongoDao $idVal = $doc['_id'] ?? null; if ($idVal !== null) { $uow->attach(static::class, (string)$idVal, $dto); + // Bind this DAO to allow snapshot diffing to emit updates + $uow->bindDao(static::class, (string)$idVal, $this); } } return $dto; @@ -656,4 +685,39 @@ abstract class AbstractMongoDao { $this->runtimeScopes = []; } + + // ===== Optimistic locking (MVP) for Mongo ===== + /** + * Override to enable locking. Example return: + * ['type' => 'version', 'column' => '_v'] + * Currently only 'version' (numeric increment) is supported for Mongo. + * @return array{type:string,column:string}|array{} + */ + protected function locking(): array { return []; } + + private function hasOptimisticLocking(): bool + { + $cfg = $this->locking(); + return is_array($cfg) && isset($cfg['type'], $cfg['column']) && $cfg['type'] === 'version' && is_string($cfg['column']) && $cfg['column'] !== ''; + } + + private function doImmediateUpdateWithLock(string $id, array $payload): void + { + if (!$this->hasOptimisticLocking()) { + $this->connection->updateOne($this->databaseName(), $this->collection(), ['_id' => $id], ['$set' => $payload]); + return; + } + $cfg = $this->locking(); + $col = (string)$cfg['column']; + // Fetch current version + $docs = $this->connection->find($this->databaseName(), $this->collection(), ['_id' => $id], ['limit' => 1, 'projection' => [$col => 1]]); + $cur = $docs[0][$col] ?? null; + $filter = ['_id' => $id]; + if ($cur !== null) { $filter[$col] = $cur; } + $update = ['$set' => $payload, '$inc' => [$col => 1]]; + $modified = $this->connection->updateOne($this->databaseName(), $this->collection(), $filter, $update); + if ($cur !== null && $modified === 0) { + throw new \Pairity\Orm\OptimisticLockException('Optimistic lock failed for ' . static::class . ' id=' . $id); + } + } } diff --git a/src/Orm/OptimisticLockException.php b/src/Orm/OptimisticLockException.php new file mode 100644 index 0000000..44d768f --- /dev/null +++ b/src/Orm/OptimisticLockException.php @@ -0,0 +1,7 @@ +> map[daoClass][id] = DTO */ private array $identityMap = []; + /** @var array>> snapshots[daoClass][id] = array representation */ + private array $snapshots = []; + /** @var array> daoBind[daoClass][id] = DAO instance for updates */ + private array $daoBind = []; + /** Enable snapshot diffing */ + private bool $snapshotsEnabled = false; /** * Queues grouped by a connection hash key. @@ -91,6 +98,17 @@ final class UnitOfWork public function attach(string $daoClass, string $id, object $dto): void { $this->identityMap[$daoClass][$id] = $dto; + if ($this->snapshotsEnabled) { + // store shallow snapshot + $arr = []; + try { + if (method_exists($dto, 'toArray')) { + /** @var array $arr */ + $arr = $dto->toArray(false); + } + } catch (\Throwable) { $arr = []; } + $this->snapshots[$daoClass][$id] = is_array($arr) ? $arr : []; + } } /** Fetch an attached DTO if present. */ @@ -99,6 +117,36 @@ final class UnitOfWork return $this->identityMap[$daoClass][$id] ?? null; } + /** Bind a DAO instance to a managed entity for potential snapshot diff updates. */ + public function bindDao(string $daoClass, string $id, object $dao): void + { + $this->daoBind[$daoClass][$id] = $dao; + } + + /** Enable/disable snapshot diffing. */ + public function enableSnapshots(bool $flag = true): static + { + $this->snapshotsEnabled = $flag; + return $this; + } + + public function isManaged(string $daoClass, string $id): bool + { + return isset($this->identityMap[$daoClass][$id]); + } + + public function detach(string $daoClass, string $id): void + { + unset($this->identityMap[$daoClass][$id], $this->snapshots[$daoClass][$id], $this->daoBind[$daoClass][$id]); + } + + public function clear(): void + { + $this->identityMap = []; + $this->snapshots = []; + $this->daoBind = []; + } + // ===== Defer operations ===== /** Enqueue a mutation for the given connection object (back-compat, raw op). */ @@ -126,10 +174,16 @@ final class UnitOfWork { // Ensure we run ops with DAO interception suspended to avoid re-enqueue self::suspendDuring(function () { + // uow.beforeCommit + try { $payload = ['context' => 'uow']; Events::dispatcher()->dispatch('uow.beforeCommit', $payload); } catch (\Throwable) {} // Grouped by connection type foreach ($this->queues as $entry) { $conn = $entry['conn']; $ops = $this->expandAndOrder($entry['ops']); + // Inject snapshot-based updates for managed entities with diffs + $ops = $this->injectSnapshotDiffUpdates($ops); + // Coalesce multiple updates for the same entity and order update-before-delete + $ops = $this->coalesceAndOrderPerEntity($ops); // PDO/SQL path: has transaction(callable) if (method_exists($conn, 'transaction')) { $conn->transaction(function () use ($ops) { @@ -153,6 +207,8 @@ final class UnitOfWork foreach ($ops as $o) { ($o['op'])(); } } } + // uow.afterCommit + try { $payload2 = ['context' => 'uow']; Events::dispatcher()->dispatch('uow.afterCommit', $payload2); } catch (\Throwable) {} }); // Clear queues after successful commit @@ -231,6 +287,143 @@ final class UnitOfWork return $expanded; } + /** + * Inject snapshot-based updates for managed entities that have changed but were not explicitly updated. + * Only when snapshots are enabled. + * @param array}> $ops + * @return array}> $ops + */ + private function injectSnapshotDiffUpdates(array $ops): array + { + if (!$this->snapshotsEnabled) { + return $ops; + } + // Build a set of entities already scheduled for update/delete + $scheduled = []; + foreach ($ops as $o) { + $m = $o['meta'] ?? []; + if (!isset($m['dao']) || !is_object($m['dao'])) continue; + $daoClass = get_class($m['dao']); + $id = (string)($m['id'] ?? ''); + if ($id !== '') { + $scheduled[$daoClass][$id] = true; + } + } + // For each managed entity, if changed and not scheduled, enqueue update + foreach ($this->identityMap as $daoClass => $entities) { + foreach ($entities as $id => $dto) { + if (!$this->snapshotsEnabled) break; + $snap = $this->snapshots[$daoClass][$id] ?? null; + if ($snap === null) continue; + $now = []; + try { if (method_exists($dto, 'toArray')) { $now = $dto->toArray(false); } } catch (\Throwable) { $now = []; } + if (!is_array($now)) $now = []; + $diff = $this->diffAssoc($snap, $now); + if (!$diff) continue; + if (isset($scheduled[$daoClass][$id])) continue; + $dao = $this->daoBind[$daoClass][$id] ?? null; + if (!$dao) continue; + // Build op that performs immediate update under suspension + $op = function () use ($dao, $id, $diff) { + self::suspendDuring(function () use ($dao, $id, $diff) { + if (method_exists($dao, 'update')) { $dao->update($id, $diff); } + }); + }; + $ops[] = ['op' => $op, 'meta' => ['type' => 'update', 'mode' => 'byId', 'dao' => $dao, 'id' => (string)$id, 'payload' => $diff]]; + } + } + return $ops; + } + + /** + * Merge multiple updates for the same entity and ensure update happens before delete for that entity. + * @param array}> $ops + * @return array}> $ops + */ + private function coalesceAndOrderPerEntity(array $ops): array + { + // Group updates by daoClass+id + $updateMap = []; + $deleteSet = []; + foreach ($ops as $o) { + $m = $o['meta'] ?? []; + if (!isset($m['dao']) || !is_object($m['dao'])) continue; + $dao = $m['dao']; + $daoClass = get_class($dao); + $id = (string)($m['id'] ?? ''); + if (($m['type'] ?? '') === 'update' && ($m['mode'] ?? '') === 'byId' && $id !== '') { + $key = $daoClass . '#' . $id; + $payload = (array)($m['payload'] ?? []); + if (!isset($updateMap[$key])) { $updateMap[$key] = ['dao' => $dao, 'id' => $id, 'payload' => []]; } + // merge (last write wins) + $updateMap[$key]['payload'] = array_merge($updateMap[$key]['payload'], $payload); + } + if (($m['type'] ?? '') === 'delete' && ($m['mode'] ?? '') === 'byId' && $id !== '') { + $deleteSet[$daoClass . '#' . $id] = ['dao' => $dao, 'id' => $id]; + } + } + + if (!$updateMap) { return $ops; } + + // Rebuild ops: for each original op, skip individual updates; add one merged update before a delete for the same entity + $result = []; + $emittedUpdate = []; + foreach ($ops as $o) { + $m = $o['meta'] ?? []; + $emit = true; + $dao = $m['dao'] ?? null; + $daoClass = is_object($dao) ? get_class($dao) : null; + $id = (string)($m['id'] ?? ''); + $key = ($daoClass && $id !== '') ? ($daoClass . '#' . $id) : null; + if (($m['type'] ?? '') === 'update' && ($m['mode'] ?? '') === 'byId' && $key && isset($updateMap[$key])) { + // skip individual update; we'll emit merged one once + $emit = false; + } + if (($m['type'] ?? '') === 'delete' && ($m['mode'] ?? '') === 'byId' && $key && isset($updateMap[$key]) && !isset($emittedUpdate[$key])) { + // emit merged update before delete + $merged = $updateMap[$key]; + $updOp = function () use ($merged) { + self::suspendDuring(function () use ($merged) { + $dao = $merged['dao']; $id = $merged['id']; $payload = $merged['payload']; + if (method_exists($dao, 'update')) { $dao->update($id, $payload); } + }); + }; + $result[] = ['op' => $updOp, 'meta' => ['type' => 'update', 'mode' => 'byId', 'dao' => $merged['dao'], 'id' => (string)$merged['id'], 'payload' => $merged['payload']]]; + $emittedUpdate[$key] = true; + } + if ($emit) { + $result[] = $o; + } + } + // For any remaining merged updates not emitted (no delete op), append them at the end + foreach ($updateMap as $k => $merged) { + if (isset($emittedUpdate[$k])) continue; + $updOp = function () use ($merged) { + self::suspendDuring(function () use ($merged) { + $dao = $merged['dao']; $id = $merged['id']; $payload = $merged['payload']; + if (method_exists($dao, 'update')) { $dao->update($id, $payload); } + }); + }; + $result[] = ['op' => $updOp, 'meta' => ['type' => 'update', 'mode' => 'byId', 'dao' => $merged['dao'], 'id' => (string)$merged['id'], 'payload' => $merged['payload']]]; + } + return $result; + } + + /** @param array $a @param array $b */ + private function diffAssoc(array $a, array $b): array + { + $diff = []; + foreach ($b as $k => $v) { + // only simple scalar/array comparisons; nested DTOs are out of scope + $av = $a[$k] ?? null; + if ($av !== $v) { + $diff[$k] = $v; + } + } + // Skip keys present in $a but removed in $b to avoid unintended nulling + return $diff; + } + /** * Read relations metadata from DAO instance if available. * @return array diff --git a/tests/EventSystemSqliteTest.php b/tests/EventSystemSqliteTest.php new file mode 100644 index 0000000..2258524 --- /dev/null +++ b/tests/EventSystemSqliteTest.php @@ -0,0 +1,117 @@ + 'sqlite', 'path' => ':memory:']); + } + + public function testDaoEventsForInsertUpdateDeleteAndFind(): void + { + $conn = $this->conn(); + $conn->execute('CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, status TEXT)'); + + $Dto = new class([]) extends AbstractDto {}; + $dtoClass = get_class($Dto); + $Dao = new class($conn, $dtoClass) extends AbstractDao { + private string $dto; public function __construct($c,string $d){ parent::__construct($c); $this->dto=$d; } + public function getTable(): string { return 'users'; } + protected function dtoClass(): string { return $this->dto; } + protected function schema(): array { return ['primaryKey'=>'id','columns'=>['id'=>['cast'=>'int'],'name'=>['cast'=>'string'],'status'=>['cast'=>'string']]]; } + }; + $dao = new $Dao($conn, $dtoClass); + + $beforeInsertData = null; $afterInsertName = null; $afterUpdateName = null; $afterDeleteAffected = null; $afterFindCount = null; + + Events::dispatcher()->clear(); + Events::dispatcher()->listen('dao.beforeInsert', function(array &$p) use (&$beforeInsertData) { + if (($p['table'] ?? '') === 'users') { + // mutate data + $p['data']['status'] = 'mutated'; + $beforeInsertData = $p['data']; + } + }); + Events::dispatcher()->listen('dao.afterInsert', function(array &$p) use (&$afterInsertName) { + if (($p['table'] ?? '') === 'users' && $p['dto'] instanceof AbstractDto) { + $afterInsertName = $p['dto']->toArray(false)['name'] ?? null; + } + }); + Events::dispatcher()->listen('dao.afterUpdate', function(array &$p) use (&$afterUpdateName) { + if (($p['table'] ?? '') === 'users' && $p['dto'] instanceof AbstractDto) { + $afterUpdateName = $p['dto']->toArray(false)['name'] ?? null; + } + }); + Events::dispatcher()->listen('dao.afterDelete', function(array &$p) use (&$afterDeleteAffected) { + if (($p['table'] ?? '') === 'users') { $afterDeleteAffected = (int)($p['affected'] ?? 0); } + }); + Events::dispatcher()->listen('dao.afterFind', function(array &$p) use (&$afterFindCount) { + if (($p['table'] ?? '') === 'users') { + if (isset($p['dto'])) { $afterFindCount = ($p['dto'] ? 1 : 0); } + if (isset($p['dtos'])) { $afterFindCount = is_array($p['dtos']) ? count($p['dtos']) : 0; } + } + }); + + // Insert (beforeInsert should set status) + $created = $dao->insert(['name' => 'Alice']); + $arr = $created->toArray(false); + $this->assertSame('mutated', $arr['status'] ?? null); + $this->assertSame('Alice', $afterInsertName); + + // Update + $id = (int)$arr['id']; + $updated = $dao->update($id, ['name' => 'Alice2']); + $this->assertSame('Alice2', $afterUpdateName); + + // Find + $one = $dao->findById($id); + $this->assertSame(1, $afterFindCount); + + // Delete + $aff = $dao->deleteById($id); + $this->assertSame($aff, $afterDeleteAffected); + } + + public function testUowBeforeAfterCommitEvents(): void + { + $conn = $this->conn(); + $conn->execute('CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT)'); + + $Dto = new class([]) extends AbstractDto {}; + $dtoClass = get_class($Dto); + $Dao = new class($conn, $dtoClass) extends AbstractDao { + private string $dto; public function __construct($c,string $d){ parent::__construct($c); $this->dto=$d; } + public function getTable(): string { return 'users'; } + protected function dtoClass(): string { return $this->dto; } + protected function schema(): array { return ['primaryKey'=>'id','columns'=>['id'=>['cast'=>'int'],'name'=>['cast'=>'string']]]; } + }; + $dao = new $Dao($conn, $dtoClass); + + $before = 0; $after = 0; + Events::dispatcher()->clear(); + Events::dispatcher()->listen('uow.beforeCommit', function(array &$p) use (&$before) { $before++; }); + Events::dispatcher()->listen('uow.afterCommit', function(array &$p) use (&$after) { $after++; }); + + $row = $dao->insert(['name' => 'X']); + $id = (int)($row->toArray(false)['id'] ?? 0); + + UnitOfWork::run(function(UnitOfWork $uow) use ($dao, $id) { + $dao->update($id, ['name' => 'Y']); + $dao->deleteBy(['id' => $id]); + }); + + $this->assertSame(1, $before); + $this->assertSame(1, $after); + } +} diff --git a/tests/MongoEventSystemTest.php b/tests/MongoEventSystemTest.php new file mode 100644 index 0000000..d3b0d53 --- /dev/null +++ b/tests/MongoEventSystemTest.php @@ -0,0 +1,72 @@ +hasMongoExt()) { $this->markTestSkipped('ext-mongodb not loaded'); } + + // Connect (skip if server unavailable) + try { + $conn = MongoConnectionManager::make([ + 'host' => \getenv('MONGO_HOST') ?: '127.0.0.1', + 'port' => (int)(\getenv('MONGO_PORT') ?: 27017), + ]); + } catch (\Throwable $e) { + $this->markTestSkipped('Mongo not available: ' . $e->getMessage()); + } + + $Dto = new class([]) extends AbstractDto {}; + $dtoClass = \get_class($Dto); + + $Dao = new class($conn, $dtoClass) extends AbstractMongoDao { + private string $dto; public function __construct($c,string $d){ parent::__construct($c); $this->dto=$d; } + protected function collection(): string { return 'pairity_test.events_users'; } + protected function dtoClass(): string { return $this->dto; } + }; + + $dao = new $Dao($conn, $dtoClass); + + // Clean + foreach ($dao->findAllBy([]) as $doc) { $id = (string)($doc->toArray(false)['_id'] ?? ''); if ($id) { $dao->deleteById($id); } } + + $beforeInsert = null; $afterInsert = false; $afterUpdate = false; $afterDelete = 0; $afterFind = 0; + Events::dispatcher()->clear(); + Events::dispatcher()->listen('dao.beforeInsert', function(array &$p) use (&$beforeInsert){ if (($p['collection'] ?? '') === 'pairity_test.events_users'){ $p['data']['tag'] = 'x'; $beforeInsert = $p['data']; }}); + Events::dispatcher()->listen('dao.afterInsert', function(array &$p) use (&$afterInsert){ if (($p['collection'] ?? '') === 'pairity_test.events_users'){ $afterInsert = true; }}); + Events::dispatcher()->listen('dao.afterUpdate', function(array &$p) use (&$afterUpdate){ if (($p['collection'] ?? '') === 'pairity_test.events_users'){ $afterUpdate = true; }}); + Events::dispatcher()->listen('dao.afterDelete', function(array &$p) use (&$afterDelete){ if (($p['collection'] ?? '') === 'pairity_test.events_users'){ $afterDelete += (int)($p['affected'] ?? 0); }}); + Events::dispatcher()->listen('dao.afterFind', function(array &$p) use (&$afterFind){ if (($p['collection'] ?? '') === 'pairity_test.events_users'){ $afterFind += isset($p['dto']) ? (int)!!$p['dto'] : (is_array($p['dtos'] ?? null) ? count($p['dtos']) : 0); }}); + + // Insert + $created = $dao->insert(['email' => 'e@x.com']); + $this->assertTrue($afterInsert); + $this->assertSame('x', $created->toArray(false)['tag'] ?? null); + + // Update + $id = (string)($created->toArray(false)['_id'] ?? ''); + $dao->update($id, ['email' => 'e2@x.com']); + $this->assertTrue($afterUpdate); + + // Find + $one = $dao->findById($id); + $this->assertNotNull($one); + $this->assertGreaterThanOrEqual(1, $afterFind); + + // Delete + $aff = $dao->deleteById($id); + $this->assertSame($aff, $afterDelete); + } +} diff --git a/tests/MongoOptimisticLockTest.php b/tests/MongoOptimisticLockTest.php new file mode 100644 index 0000000..1059070 --- /dev/null +++ b/tests/MongoOptimisticLockTest.php @@ -0,0 +1,56 @@ +hasMongoExt()) { $this->markTestSkipped('ext-mongodb not loaded'); } + + // Connect (skip if server unavailable) + try { + $conn = MongoConnectionManager::make([ + 'host' => \getenv('MONGO_HOST') ?: '127.0.0.1', + 'port' => (int)(\getenv('MONGO_PORT') ?: 27017), + ]); + } catch (\Throwable $e) { + $this->markTestSkipped('Mongo not available: ' . $e->getMessage()); + } + + $dto = new class([]) extends AbstractDto {}; + $dtoClass = \get_class($dto); + + $Dao = new class($conn, $dtoClass) extends AbstractMongoDao { + private string $dto; public function __construct($c, string $d){ parent::__construct($c); $this->dto=$d; } + protected function collection(): string { return 'pairity_test.lock_users'; } + protected function dtoClass(): string { return $this->dto; } + protected function locking(): array { return ['type' => 'version', 'column' => 'version']; } + }; + + $dao = new $Dao($conn, $dtoClass); + + // Clean + foreach ($dao->findAllBy([]) as $doc) { $id = (string)($doc->toArray(false)['_id'] ?? ''); if ($id) { $dao->deleteById($id); } } + + // Insert with initial version 0 + $created = $dao->insert(['email' => 'lock@example.com', 'version' => 0]); + $id = (string)($created->toArray(false)['_id'] ?? ''); + $this->assertNotEmpty($id); + + // Update should bump version to 1 + $dao->update($id, ['email' => 'lock2@example.com']); + $after = $dao->findById($id); + $this->assertNotNull($after); + $this->assertSame(1, (int)($after->toArray(false)['version'] ?? -1)); + } +} diff --git a/tests/OptimisticLockSqliteTest.php b/tests/OptimisticLockSqliteTest.php new file mode 100644 index 0000000..a2b869d --- /dev/null +++ b/tests/OptimisticLockSqliteTest.php @@ -0,0 +1,63 @@ + 'sqlite', 'path' => ':memory:']); + } + + public function testVersionLockingIncrementsAndBlocksBulkUpdate(): void + { + $conn = $this->conn(); + // schema with version column + $conn->execute('CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT, + version INTEGER NOT NULL DEFAULT 0 + )'); + + $UserDto = new class([]) extends AbstractDto {}; + $dtoClass = get_class($UserDto); + + $UserDao = new class($conn, $dtoClass) extends AbstractDao { + private string $dto; public function __construct($c,string $dto){ parent::__construct($c); $this->dto=$dto; } + public function getTable(): string { return 'users'; } + protected function dtoClass(): string { return $this->dto; } + protected function schema(): array { + return [ + 'primaryKey' => 'id', + 'columns' => [ 'id'=>['cast'=>'int'], 'name'=>['cast'=>'string'], 'version'=>['cast'=>'int'] ], + 'locking' => ['type' => 'version', 'column' => 'version'], + ]; + } + }; + + $dao = new $UserDao($conn, $dtoClass); + + // Insert + $created = $dao->insert(['name' => 'A']); + $arr = $created->toArray(false); + $id = (int)$arr['id']; + + // First update: should succeed and bump version to 1 + $dao->update($id, ['name' => 'A1']); + $row = $conn->query('SELECT name, version FROM users WHERE id = :id', ['id' => $id])[0] ?? []; + $this->assertSame('A1', (string)($row['name'] ?? '')); + $this->assertSame(1, (int)($row['version'] ?? 0)); + + // Bulk update should throw while locking enabled + $this->expectException(OptimisticLockException::class); + $dao->updateBy(['id' => $id], ['name' => 'A2']); + } +}