finish Unit of Work enhancements. Add Event handling

This commit is contained in:
Funky Waddle 2025-12-11 00:01:53 -06:00
parent cb1251ae14
commit 693e83625d
13 changed files with 946 additions and 37 deletions

1
.phpunit.result.cache Normal file
View file

@ -0,0 +1 @@
{"version":2,"defects":{"Pairity\\Tests\\BelongsToManyMysqlTest::testBelongsToManyEagerAndHelpers":1,"Pairity\\Tests\\BelongsToManySqliteTest::testBelongsToManyEagerAndPivotHelpers":8,"Pairity\\Tests\\CastersAndAccessorsSqliteTest::testCustomCasterAndDtoAccessorsMutators":7,"Pairity\\Tests\\JoinEagerMysqlTest::testJoinEagerHasManyAndBelongsTo":1,"Pairity\\Tests\\JoinEagerSqliteTest::testHasManyJoinEagerWithProjectionAndSoftDeleteScope":8,"Pairity\\Tests\\JoinEagerSqliteTest::testBelongsToJoinEagerSingleLevel":8,"Pairity\\Tests\\MongoAdapterTest::testCrudCycle":1,"Pairity\\Tests\\MongoDaoTest::testCrudViaDao":8,"Pairity\\Tests\\MongoOptimisticLockTest::testVersionIncrementOnUpdate":8,"Pairity\\Tests\\MongoPaginationTest::testPaginateAndSimplePaginateWithScopes":8,"Pairity\\Tests\\MongoRelationsTest::testEagerAndLazyRelations":8,"Pairity\\Tests\\MysqlSmokeTest::testCreateAndDropTable":1,"Pairity\\Tests\\PaginationSqliteTest::testPaginateAndSimplePaginateWithScopesAndRelations":8,"Pairity\\Tests\\RelationsNestedConstraintsSqliteTest::testNestedEagerAndPerRelationFieldsConstraint":8,"Pairity\\Tests\\SchemaBuilderSqliteTest::testCreateAlterDropCycle":8,"Pairity\\Tests\\UnitOfWorkCascadeMongoTest::testDeleteByIdCascadesToChildren":8,"Pairity\\Tests\\UnitOfWorkCascadeSqliteTest::testDeleteByIdCascadesToChildren":7,"Pairity\\Tests\\UnitOfWorkMongoTest::testDeferredUpdateAndDeleteCommit":8,"Pairity\\Tests\\UnitOfWorkMongoTest::testRollbackOnExceptionClearsOps":8},"times":{"Pairity\\Tests\\BelongsToManyMysqlTest::testBelongsToManyEagerAndHelpers":0.001,"Pairity\\Tests\\BelongsToManySqliteTest::testBelongsToManyEagerAndPivotHelpers":0.004,"Pairity\\Tests\\CastersAndAccessorsSqliteTest::testCustomCasterAndDtoAccessorsMutators":0.001,"Pairity\\Tests\\JoinEagerMysqlTest::testJoinEagerHasManyAndBelongsTo":0,"Pairity\\Tests\\JoinEagerSqliteTest::testHasManyJoinEagerWithProjectionAndSoftDeleteScope":0,"Pairity\\Tests\\JoinEagerSqliteTest::testBelongsToJoinEagerSingleLevel":0,"Pairity\\Tests\\MongoAdapterTest::testCrudCycle":0.002,"Pairity\\Tests\\MongoDaoTest::testCrudViaDao":0.001,"Pairity\\Tests\\MongoOptimisticLockTest::testVersionIncrementOnUpdate":0,"Pairity\\Tests\\MongoPaginationTest::testPaginateAndSimplePaginateWithScopes":0,"Pairity\\Tests\\MongoRelationsTest::testEagerAndLazyRelations":0,"Pairity\\Tests\\MysqlSmokeTest::testCreateAndDropTable":0,"Pairity\\Tests\\OptimisticLockSqliteTest::testVersionLockingIncrementsAndBlocksBulkUpdate":0,"Pairity\\Tests\\PaginationSqliteTest::testPaginateAndSimplePaginateWithScopesAndRelations":0.001,"Pairity\\Tests\\RelationsNestedConstraintsSqliteTest::testNestedEagerAndPerRelationFieldsConstraint":0,"Pairity\\Tests\\SchemaBuilderSqliteTest::testCreateAlterDropCycle":0.001,"Pairity\\Tests\\SoftDeletesTimestampsSqliteTest::testTimestampsAndSoftDeletesFlow":0.001,"Pairity\\Tests\\UnitOfWorkCascadeMongoTest::testDeleteByIdCascadesToChildren":0,"Pairity\\Tests\\UnitOfWorkCascadeSqliteTest::testDeleteByIdCascadesToChildren":0,"Pairity\\Tests\\UnitOfWorkMongoTest::testDeferredUpdateAndDeleteCommit":0,"Pairity\\Tests\\UnitOfWorkMongoTest::testRollbackOnExceptionClearsOps":0,"Pairity\\Tests\\UnitOfWorkSqliteTest::testDeferredUpdateAndDeleteCommit":0,"Pairity\\Tests\\UnitOfWorkSqliteTest::testRollbackOnExceptionClearsOps":0}}

141
README.md
View file

@ -690,6 +690,66 @@ Behavior:
See the test `tests/CastersAndAccessorsSqliteTest.php` for a complete, runnable example.
## Unit of Work (optin)
Pairity includes an optional Unit of Work (UoW) that can be enabled per block to batch updates/deletes and use an identity map:
```php
use Pairity\Orm\UnitOfWork;
UnitOfWork::run(function(UnitOfWork $uow) use ($userDao, $postDao) {
$user = $userDao->findById(42); // identity map
$userDao->update(42, ['name' => 'New']); // deferred update
$postDao->deleteBy(['user_id' => 42]); // deferred delete
}); // transactional commit
```
Behavior (MVP):
- Outside a UoW, DAO calls execute immediately (todays behavior).
- Inside a UoW, updates/deletes are deferred; inserts remain immediate (to return real IDs). Commit runs within a transaction/session per connection.
- Relationaware cascades: if a relation is marked with `cascadeDelete`, child deletes run before the parent delete at commit time.
### Optimistic locking (MVP)
You can enable optimistic locking to avoid lost updates. Two strategies are supported:
- SQL via DAO `schema()`
```php
protected function schema(): array
{
return [
'primaryKey' => 'id',
'columns' => [
'id' => ['cast' => 'int'],
'name' => ['cast' => 'string'],
'version' => ['cast' => 'int'],
],
'locking' => [
'type' => 'version', // or 'timestamp'
'column' => 'version', // compareandset column
],
];
}
```
When locking is enabled, `update($id, $data)` performs a compareandset on the configured column and increments it for `type = version`. If the row has changed since the read, an `OptimisticLockException` is thrown.
Note: bulk `updateBy(...)` is blocked under optimistic locking to avoid unsafe mass updates.
- MongoDB via DAO `locking()`
```php
protected function locking(): array
{
return [ 'type' => 'version', 'column' => 'version' ];
}
```
Mongo `update($id, $data)` reads the current `version` and issues a conditional `updateOne` with `{$inc: {version: 1}}`. If the compare fails, an `OptimisticLockException` is thrown.
Tests: see `tests/OptimisticLockSqliteTest.php` (SQLite). Mongo tests are guarded and run in CI when `ext-mongodb` and a server are available.
## Pagination
Both SQL and Mongo DAOs provide pagination helpers that return DTOs alongside metadata. They honor the usual query modifiers:
@ -830,6 +890,87 @@ Behavior details:
- Works for both SQL DAOs and Mongo DAOs.
- Current MVP focuses on delete cascades; cascades for updates and more advanced ordering rules can be added later.
## Event system (Milestone F)
Pairity provides a lightweight event system so you can hook into DAO operations and UoW commits for audit logging, validation, normalization, caching hooks, etc.
### Dispatcher and subscribers
- Global access: `Pairity\Events\Events::dispatcher()` returns the singleton dispatcher.
- Register listeners: `listen(string $event, callable $listener, int $priority = 0)`.
- Register subscribers: implement `Pairity\Events\SubscriberInterface` with `getSubscribedEvents(): array` returning `event => callable|[callable, priority]`.
Example listener registration:
```php
use Pairity\Events\Events;
// Normalize a field before insert
Events::dispatcher()->listen('dao.beforeInsert', function (array &$payload) {
// Payload always contains 'dao' and table/collection + input data by reference
if (($payload['table'] ?? '') === 'users') {
$payload['data']['name'] = trim((string)($payload['data']['name'] ?? ''));
}
});
// Audit after update
Events::dispatcher()->listen('dao.afterUpdate', function (array &$payload) {
// $payload['dto'] is the updated DTO (SQL or Mongo)
// write to your log sink here
});
```
### Event names and payloads
DAO events (SQL and Mongo):
- `dao.beforeFind``{ dao, table|collection, criteria|filter& }` (criteria/filter is passed by reference)
- `dao.afterFind``{ dao, table|collection, dto|null }` or `{ dao, table|collection, dtos: DTO[] }`
- `dao.beforeInsert``{ dao, table|collection, data& }` (data by reference)
- `dao.afterInsert``{ dao, table|collection, dto }`
- `dao.beforeUpdate``{ dao, table|collection, id?, criteria?, data& }` (data by reference; `criteria` for bulk SQL updates)
- `dao.afterUpdate``{ dao, table|collection, dto }` (or `{ affected }` for bulk SQL `updateBy`)
- `dao.beforeDelete``{ dao, table|collection, id? , criteria|filter?& }`
- `dao.afterDelete``{ dao, table|collection, id?|criteria|filter?, affected:int }`
Unit of Work events:
- `uow.beforeCommit``{ context: 'uow' }`
- `uow.afterCommit``{ context: 'uow' }`
Notes:
- Listeners run in priority order (higher first). Exceptions thrown inside listeners are swallowed to avoid breaking core flows.
- When no listeners are registered, the overhead is negligible (fast-path checks).
### Example subscriber
```php
use Pairity\Events\SubscriberInterface;
use Pairity\Events\Events;
final class AuditSubscriber implements SubscriberInterface
{
public function getSubscribedEvents(): array
{
return [
'dao.afterInsert' => [[$this, 'onAfterInsert'], 10],
'uow.afterCommit' => [$this, 'onAfterCommit'],
];
}
public function onAfterInsert(array &$payload): void
{
// e.g., push to queue/log sink
}
public function onAfterCommit(array &$payload): void
{
// flush buffered audits
}
}
// Somewhere during bootstrap
Events::dispatcher()->subscribe(new AuditSubscriber());
```
## Roadmap
- Relations enhancements:

View file

@ -0,0 +1,63 @@
<?php
namespace Pairity\Events;
final class EventDispatcher
{
/** @var array<string, array<int, array{priority:int, listener:callable}>> */
private array $listeners = [];
/**
* Register a listener for an event.
* Listener signature: function(array &$payload): void
*/
public function listen(string $event, callable $listener, int $priority = 0): void
{
$this->listeners[$event][] = ['priority' => $priority, 'listener' => $listener];
// Sort by priority desc so higher runs first
usort($this->listeners[$event], function ($a, $b) { return $b['priority'] <=> $a['priority']; });
}
/** Register all listeners from a subscriber. */
public function subscribe(SubscriberInterface $subscriber): void
{
foreach ((array)$subscriber->getSubscribedEvents() as $event => $handler) {
$callable = null; $priority = 0;
if (is_array($handler) && isset($handler[0])) {
$callable = $handler[0];
$priority = (int)($handler[1] ?? 0);
} else {
$callable = $handler;
}
if (is_callable($callable)) {
$this->listen($event, $callable, $priority);
}
}
}
/**
* Dispatch an event with a mutable payload (passed by reference to listeners).
*
* @param string $event
* @param array<string,mixed> $payload
*/
public function dispatch(string $event, array &$payload = []): void
{
$list = $this->listeners[$event] ?? [];
if (!$list) return;
foreach ($list as $entry) {
try {
($entry['listener'])($payload);
} catch (\Throwable) {
// swallow listener exceptions to avoid breaking core flow
}
}
}
/** Remove all listeners for an event or all events. */
public function clear(?string $event = null): void
{
if ($event === null) { $this->listeners = []; return; }
unset($this->listeners[$event]);
}
}

21
src/Events/Events.php Normal file
View file

@ -0,0 +1,21 @@
<?php
namespace Pairity\Events;
final class Events
{
private static ?EventDispatcher $dispatcher = null;
public static function dispatcher(): EventDispatcher
{
if (self::$dispatcher === null) {
self::$dispatcher = new EventDispatcher();
}
return self::$dispatcher;
}
public static function setDispatcher(?EventDispatcher $dispatcher): void
{
self::$dispatcher = $dispatcher;
}
}

View file

@ -0,0 +1,15 @@
<?php
namespace Pairity\Events;
interface SubscriberInterface
{
/**
* Return an array of event => callable|array{0:callable,1:int priority}
* Example: return [
* 'dao.beforeInsert' => [[$this, 'onBeforeInsert'], 10],
* 'uow.afterCommit' => [$this, 'onAfterCommit'],
* ];
*/
public function getSubscribedEvents(): array;
}

View file

@ -6,6 +6,7 @@ use Pairity\Contracts\ConnectionInterface;
use Pairity\Contracts\DaoInterface;
use Pairity\Orm\UnitOfWork;
use Pairity\Model\Casting\CasterInterface;
use Pairity\Events\Events;
abstract class AbstractDao implements DaoInterface
{
@ -129,6 +130,8 @@ abstract class AbstractDao implements DaoInterface
/** @param array<string,mixed> $criteria */
public function findOneBy(array $criteria): ?AbstractDto
{
// Events: dao.beforeFind (criteria may be mutated)
try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => &$criteria]; Events::dispatcher()->dispatch('dao.beforeFind', $ev); } catch (\Throwable) {}
$criteria = $this->applyDefaultScopes($criteria);
$this->applyRuntimeScopesToCriteria($criteria);
[$where, $bindings] = $this->buildWhere($criteria);
@ -151,6 +154,8 @@ abstract class AbstractDao implements DaoInterface
$this->resetRuntimeScopes();
$this->eagerStrategy = null; // reset
$this->withStrategies = [];
// Events: dao.afterFind
try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterFind', $payload); } catch (\Throwable) {}
return $dto;
}
@ -172,6 +177,8 @@ abstract class AbstractDao implements DaoInterface
*/
public function findAllBy(array $criteria = []): array
{
// Events: dao.beforeFind (criteria may be mutated)
try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => &$criteria]; Events::dispatcher()->dispatch('dao.beforeFind', $ev); } catch (\Throwable) {}
$criteria = $this->applyDefaultScopes($criteria);
$this->applyRuntimeScopesToCriteria($criteria);
[$where, $bindings] = $this->buildWhere($criteria);
@ -192,6 +199,8 @@ abstract class AbstractDao implements DaoInterface
$this->resetRuntimeScopes();
$this->eagerStrategy = null; // reset
$this->withStrategies = [];
// Events: dao.afterFind (list)
try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dtos' => $dtos]; Events::dispatcher()->dispatch('dao.afterFind', $payload); } catch (\Throwable) {}
return $dtos;
}
@ -278,6 +287,8 @@ abstract class AbstractDao implements DaoInterface
if (empty($data)) {
throw new \InvalidArgumentException('insert() requires non-empty data');
}
// Events: dao.beforeInsert (allow mutation of $data)
try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeInsert', $ev); } catch (\Throwable) {}
$data = $this->prepareForInsert($data);
$cols = array_keys($data);
$placeholders = array_map(fn($c) => ':' . $c, $cols);
@ -286,10 +297,14 @@ abstract class AbstractDao implements DaoInterface
$id = $this->connection->lastInsertId();
$pk = $this->getPrimaryKey();
if ($id !== null) {
return $this->findById($id) ?? $this->hydrate(array_merge($data, [$pk => $id]));
$dto = $this->findById($id) ?? $this->hydrate(array_merge($data, [$pk => $id]));
try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterInsert', $payload); } catch (\Throwable) {}
return $dto;
}
// Fallback when lastInsertId is unavailable: return hydrated DTO from provided data
return $this->hydrate($this->castRowFromStorage($data));
$dto = $this->hydrate($this->castRowFromStorage($data));
try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterInsert', $payload); } catch (\Throwable) {}
return $dto;
}
/** @param array<string,mixed> $data */
@ -302,6 +317,8 @@ abstract class AbstractDao implements DaoInterface
if (!$existing && empty($data)) {
throw new \InvalidArgumentException('No data provided to update and record not found');
}
// Events: dao.beforeUpdate (mutate $data)
try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'id' => $id, 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeUpdate', $ev); } catch (\Throwable) {}
$toStore = $this->prepareForUpdate($data);
$self = $this;
$conn = $this->connection;
@ -310,24 +327,18 @@ abstract class AbstractDao implements DaoInterface
'mode' => 'byId',
'dao' => $this,
'id' => (string)$id,
'payload' => $toStore,
], function () use ($self, $id, $toStore) {
UnitOfWork::suspendDuring(function () use ($self, $id, $toStore) {
// execute real update now
$sets = [];
$params = [];
foreach ($toStore as $col => $val) {
$sets[] = "$col = :set_$col";
$params["set_$col"] = $val;
}
$params['pk'] = $id;
$sql = 'UPDATE ' . $self->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $self->getPrimaryKey() . ' = :pk';
$self->getConnection()->execute($sql, $params);
$self->doImmediateUpdateWithLock($id, $toStore);
});
});
$base = $existing ? $existing->toArray(false) : [];
$pk = $this->getPrimaryKey();
$result = array_merge($base, $data, [$pk => $id]);
return $this->hydrate($this->castRowFromStorage($result));
$dto = $this->hydrate($this->castRowFromStorage($result));
try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterUpdate', $payload); } catch (\Throwable) {}
return $dto;
}
if (empty($data)) {
@ -335,21 +346,18 @@ abstract class AbstractDao implements DaoInterface
if ($existing) return $existing;
throw new \InvalidArgumentException('No data provided to update and record not found');
}
// Events: dao.beforeUpdate
try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'id' => $id, 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeUpdate', $ev); } catch (\Throwable) {}
$data = $this->prepareForUpdate($data);
$sets = [];
$params = [];
foreach ($data as $col => $val) {
$sets[] = "$col = :set_$col";
$params["set_$col"] = $val;
}
$params['pk'] = $id;
$sql = 'UPDATE ' . $this->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $this->getPrimaryKey() . ' = :pk';
$this->connection->execute($sql, $params);
$this->doImmediateUpdateWithLock($id, $data);
$updated = $this->findById($id);
if ($updated === null) {
$pk = $this->getPrimaryKey();
return $this->hydrate($this->castRowFromStorage(array_merge($data, [$pk => $id])));
$dto = $this->hydrate($this->castRowFromStorage(array_merge($data, [$pk => $id])));
try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterUpdate', $payload); } catch (\Throwable) {}
return $dto;
}
try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'dto' => $updated]; Events::dispatcher()->dispatch('dao.afterUpdate', $payload); } catch (\Throwable) {}
return $updated;
}
@ -358,6 +366,7 @@ abstract class AbstractDao implements DaoInterface
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $theId = $id;
try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'id' => $id]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {}
$uow->enqueueWithMeta($conn, [
'type' => 'delete',
'mode' => 'byId',
@ -369,6 +378,7 @@ abstract class AbstractDao implements DaoInterface
// deferred; immediate affected count unknown
return 0;
}
try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'id' => $id]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {}
if ($this->hasSoftDeletes()) {
$columns = $this->softDeleteConfig();
$deletedAt = $columns['deletedAt'] ?? 'deleted_at';
@ -377,7 +387,9 @@ abstract class AbstractDao implements DaoInterface
return $this->connection->execute($sql, ['ts' => $now, 'pk' => $id]);
}
$sql = 'DELETE FROM ' . $this->getTable() . ' WHERE ' . $this->getPrimaryKey() . ' = :pk';
return $this->connection->execute($sql, ['pk' => $id]);
$affected = $this->connection->execute($sql, ['pk' => $id]);
try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'id' => $id, 'affected' => $affected]; Events::dispatcher()->dispatch('dao.afterDelete', $payload); } catch (\Throwable) {}
return $affected;
}
/** @param array<string,mixed> $criteria */
@ -386,6 +398,7 @@ abstract class AbstractDao implements DaoInterface
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $crit = $criteria;
try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => &$criteria]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {}
$uow->enqueueWithMeta($conn, [
'type' => 'delete',
'mode' => 'byCriteria',
@ -396,6 +409,7 @@ abstract class AbstractDao implements DaoInterface
});
return 0;
}
try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => &$criteria]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {}
if ($this->hasSoftDeletes()) {
[$where, $bindings] = $this->buildWhere($criteria);
if ($where === '') { return 0; }
@ -409,7 +423,9 @@ abstract class AbstractDao implements DaoInterface
[$where, $bindings] = $this->buildWhere($criteria);
if ($where === '') { return 0; }
$sql = 'DELETE FROM ' . $this->getTable() . ' WHERE ' . $where;
return $this->connection->execute($sql, $bindings);
$affected = $this->connection->execute($sql, $bindings);
try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => $criteria, 'affected' => $affected]; Events::dispatcher()->dispatch('dao.afterDelete', $payload); } catch (\Throwable) {}
return $affected;
}
/**
@ -423,6 +439,8 @@ abstract class AbstractDao implements DaoInterface
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
if (empty($data)) { return 0; }
// Events: dao.beforeUpdate (bulk)
try { $ev = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => &$criteria, 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeUpdate', $ev); } catch (\Throwable) {}
$self = $this; $conn = $this->connection; $crit = $criteria; $payload = $this->prepareForUpdate($data);
$uow->enqueueWithMeta($conn, [
'type' => 'update',
@ -438,6 +456,10 @@ abstract class AbstractDao implements DaoInterface
if (empty($data)) {
return 0;
}
// Optimistic locking note: bulk updates under optimistic locking are not supported
if ($this->hasOptimisticLocking()) {
throw new \Pairity\Orm\OptimisticLockException('Optimistic locking enabled: use update(id, ...) instead of bulk updateBy(...)');
}
[$where, $whereBindings] = $this->buildWhere($criteria);
if ($where === '') {
return 0;
@ -452,7 +474,9 @@ abstract class AbstractDao implements DaoInterface
}
$sql = 'UPDATE ' . $this->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $where;
return $this->connection->execute($sql, array_merge($setParams, $whereBindings));
$affected = $this->connection->execute($sql, array_merge($setParams, $whereBindings));
try { $payload = ['dao' => $this, 'table' => $this->getTable(), 'criteria' => $criteria, 'affected' => $affected]; Events::dispatcher()->dispatch('dao.afterUpdate', $payload); } catch (\Throwable) {}
return $affected;
}
/** Expose relation metadata for UoW ordering/cascades. */
@ -645,10 +669,17 @@ abstract class AbstractDao implements DaoInterface
$type = (string)($config['type'] ?? '');
$daoClass = $config['dao'] ?? null;
$dtoClass = $config['dto'] ?? null; // kept for docs compatibility
if (!is_string($daoClass)) { continue; }
/** @var class-string<AbstractDao> $daoClass */
$relatedDao = new $daoClass($this->getConnection());
// Resolve related DAO: allow daoInstance or factory callable, else class-string
$relatedDao = null;
if (isset($config['daoInstance']) && is_object($config['daoInstance'])) {
$relatedDao = $config['daoInstance'];
} elseif (isset($config['factory']) && is_callable($config['factory'])) {
try { $relatedDao = ($config['factory'])($this); } catch (\Throwable) { $relatedDao = null; }
} elseif (is_string($daoClass)) {
/** @var class-string<AbstractDao> $daoClass */
try { $relatedDao = new $daoClass($this->getConnection()); } catch (\Throwable) { $relatedDao = null; }
}
if (!$relatedDao instanceof AbstractDao) { continue; }
// Apply per-relation constraint, if any
$constraint = $this->constraintForPath($name);
if (is_callable($constraint)) {
@ -1147,6 +1178,8 @@ abstract class AbstractDao implements DaoInterface
$idVal = $row[$pk] ?? null;
if ($idVal !== null) {
$uow->attach(static::class, (string)$idVal, $dto);
// Bind this DAO to allow snapshot diffing to emit updates
$uow->bindDao(static::class, (string)$idVal, $this);
}
}
return $dto;
@ -1421,6 +1454,69 @@ abstract class AbstractDao implements DaoInterface
return gmdate('Y-m-d H:i:s');
}
// ===== Optimistic locking (MVP) =====
protected function hasOptimisticLocking(): bool
{
$lock = $this->getSchema()['locking'] ?? [];
return is_array($lock) && isset($lock['type'], $lock['column']) && in_array($lock['type'], ['version','timestamp'], true);
}
/** @return array{type:string,column:string}|array{} */
protected function lockingConfig(): array
{
$lock = $this->getSchema()['locking'] ?? [];
return is_array($lock) ? $lock : [];
}
/** Execute an immediate update with optimistic locking when configured. */
private function doImmediateUpdateWithLock(int|string $id, array $toStore): void
{
if (!$this->hasOptimisticLocking()) {
// default path
$sets = [];
$params = [];
foreach ($toStore as $col => $val) { $sets[] = "$col = :set_$col"; $params["set_$col"] = $val; }
$params['pk'] = $id;
$sql = 'UPDATE ' . $this->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $this->getPrimaryKey() . ' = :pk';
$this->connection->execute($sql, $params);
return;
}
$cfg = $this->lockingConfig();
$col = (string)$cfg['column'];
$type = (string)$cfg['type'];
// Fetch current lock value
$pk = $this->getPrimaryKey();
$row = $this->connection->query('SELECT ' . $col . ' AS c FROM ' . $this->getTable() . ' WHERE ' . $pk . ' = :pk LIMIT 1', ['pk' => $id]);
$current = $row[0]['c'] ?? null;
// Build SETs
$sets = [];
$params = [];
foreach ($toStore as $c => $v) { $sets[] = "$c = :set_$c"; $params["set_$c"] = $v; }
if ($type === 'version') {
// bump version
$sets[] = $col . ' = ' . $col . ' + 1';
}
// WHERE with lock compare
$params['pk'] = $id;
$where = $pk . ' = :pk';
if ($current !== null) {
$params['exp_lock'] = $current;
$where .= ' AND ' . $col . ' = :exp_lock';
}
$sql = 'UPDATE ' . $this->getTable() . ' SET ' . implode(', ', $sets) . ' WHERE ' . $where;
$affected = $this->connection->execute($sql, $params);
if ($current !== null && $affected === 0) {
throw new \Pairity\Orm\OptimisticLockException('Optimistic lock failed for ' . static::class . ' id=' . (string)$id);
}
}
// ===== Soft delete toggles =====
public function withTrashed(): static

View file

@ -4,6 +4,7 @@ namespace Pairity\NoSql\Mongo;
use Pairity\Model\AbstractDto;
use Pairity\Orm\UnitOfWork;
use Pairity\Events\Events;
/**
* Base DAO for MongoDB collections returning DTOs.
@ -121,6 +122,8 @@ abstract class AbstractMongoDao
public function findOneBy(array|Filter $filter): ?AbstractDto
{
$filterArr = $this->normalizeFilterInput($filter);
// Events: dao.beforeFind (Mongo) — allow filter mutation
try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'filter' => &$filterArr]; Events::dispatcher()->dispatch('dao.beforeFind', $ev); } catch (\Throwable) {}
$this->applyRuntimeScopesToFilter($filterArr);
$opts = $this->buildOptions();
$opts['limit'] = 1;
@ -128,7 +131,9 @@ abstract class AbstractMongoDao
$this->resetModifiers();
$this->resetRuntimeScopes();
$row = $docs[0] ?? null;
return $row ? $this->hydrate($row) : null;
$dto = $row ? $this->hydrate($row) : null;
try { $payload = ['dao' => $this, 'collection' => $this->collection(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterFind', $payload); } catch (\Throwable) {}
return $dto;
}
/**
@ -139,6 +144,8 @@ abstract class AbstractMongoDao
public function findAllBy(array|Filter $filter = [], array $options = []): array
{
$filterArr = $this->normalizeFilterInput($filter);
// Events: dao.beforeFind (Mongo)
try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'filter' => &$filterArr]; Events::dispatcher()->dispatch('dao.beforeFind', $ev); } catch (\Throwable) {}
$this->applyRuntimeScopesToFilter($filterArr);
$opts = $this->buildOptions();
// external override/merge
@ -150,6 +157,7 @@ abstract class AbstractMongoDao
}
$this->resetModifiers();
$this->resetRuntimeScopes();
try { $payload = ['dao' => $this, 'collection' => $this->collection(), 'dtos' => $dtos]; Events::dispatcher()->dispatch('dao.afterFind', $payload); } catch (\Throwable) {}
return $dtos;
}
@ -169,10 +177,14 @@ abstract class AbstractMongoDao
public function insert(array $data): AbstractDto
{
// Inserts remain immediate to obtain a real id, even under UoW
// Events: dao.beforeInsert (Mongo) — allow mutation of $data
try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeInsert', $ev); } catch (\Throwable) {}
$id = UnitOfWork::suspendDuring(function () use ($data) {
return $this->connection->insertOne($this->databaseName(), $this->collection(), $data);
});
return $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id]));
$dto = $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id]));
try { $payload = ['dao' => $this, 'collection' => $this->collection(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterInsert', $payload); } catch (\Throwable) {}
return $dto;
}
/** @param array<string,mixed> $data */
@ -181,6 +193,8 @@ abstract class AbstractMongoDao
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $theId = $id; $payload = $data;
// Events: dao.beforeUpdate (Mongo)
try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'id' => $id, 'data' => &$payload]; Events::dispatcher()->dispatch('dao.beforeUpdate', $ev); } catch (\Throwable) {}
$uow->enqueueWithMeta($conn, [
'type' => 'update',
'mode' => 'byId',
@ -188,15 +202,21 @@ abstract class AbstractMongoDao
'id' => (string)$id,
], function () use ($self, $theId, $payload) {
UnitOfWork::suspendDuring(function () use ($self, $theId, $payload) {
$self->getConnection()->updateOne($self->databaseName(), $self->collection(), ['_id' => $theId], ['$set' => $payload]);
$self->doImmediateUpdateWithLock($theId, $payload);
});
});
$base = $this->findById($id)?->toArray(false) ?? [];
$result = array_merge($base, $data, ['_id' => $id]);
return $this->hydrate($result);
$dto = $this->hydrate($result);
try { $p = ['dao' => $this, 'collection' => $this->collection(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterUpdate', $p); } catch (\Throwable) {}
return $dto;
}
$this->connection->updateOne($this->databaseName(), $this->collection(), ['_id' => $id], ['$set' => $data]);
return $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id]));
// Events: dao.beforeUpdate (Mongo)
try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'id' => $id, 'data' => &$data]; Events::dispatcher()->dispatch('dao.beforeUpdate', $ev); } catch (\Throwable) {}
$this->doImmediateUpdateWithLock($id, $data);
$dto = $this->findById($id) ?? $this->hydrate(array_merge($data, ['_id' => $id]));
try { $p = ['dao' => $this, 'collection' => $this->collection(), 'dto' => $dto]; Events::dispatcher()->dispatch('dao.afterUpdate', $p); } catch (\Throwable) {}
return $dto;
}
public function deleteById(string $id): int
@ -204,6 +224,7 @@ abstract class AbstractMongoDao
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $theId = $id;
try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'id' => $id]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {}
$uow->enqueueWithMeta($conn, [
'type' => 'delete',
'mode' => 'byId',
@ -216,7 +237,10 @@ abstract class AbstractMongoDao
});
return 0;
}
return $this->connection->deleteOne($this->databaseName(), $this->collection(), ['_id' => $id]);
try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'id' => $id]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {}
$affected = $this->connection->deleteOne($this->databaseName(), $this->collection(), ['_id' => $id]);
try { $p = ['dao' => $this, 'collection' => $this->collection(), 'id' => $id, 'affected' => $affected]; Events::dispatcher()->dispatch('dao.afterDelete', $p); } catch (\Throwable) {}
return $affected;
}
/** @param array<string,mixed>|Filter $filter */
@ -225,6 +249,7 @@ abstract class AbstractMongoDao
$uow = UnitOfWork::current();
if ($uow && !UnitOfWork::isSuspended()) {
$self = $this; $conn = $this->connection; $flt = $this->normalizeFilterInput($filter);
try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'filter' => &$flt]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {}
$uow->enqueueWithMeta($conn, [
'type' => 'delete',
'mode' => 'byCriteria',
@ -239,8 +264,10 @@ abstract class AbstractMongoDao
}
// For MVP provide deleteOne semantic; bulk deletes could be added later
$flt = $this->normalizeFilterInput($filter);
try { $ev = ['dao' => $this, 'collection' => $this->collection(), 'filter' => &$flt]; Events::dispatcher()->dispatch('dao.beforeDelete', $ev); } catch (\Throwable) {}
$this->applyRuntimeScopesToFilter($flt);
$res = $this->connection->deleteOne($this->databaseName(), $this->collection(), $flt);
try { $p = ['dao' => $this, 'collection' => $this->collection(), 'filter' => $flt, 'affected' => $res]; Events::dispatcher()->dispatch('dao.afterDelete', $p); } catch (\Throwable) {}
$this->resetRuntimeScopes();
return $res;
}
@ -335,6 +362,8 @@ abstract class AbstractMongoDao
$idVal = $doc['_id'] ?? null;
if ($idVal !== null) {
$uow->attach(static::class, (string)$idVal, $dto);
// Bind this DAO to allow snapshot diffing to emit updates
$uow->bindDao(static::class, (string)$idVal, $this);
}
}
return $dto;
@ -656,4 +685,39 @@ abstract class AbstractMongoDao
{
$this->runtimeScopes = [];
}
// ===== Optimistic locking (MVP) for Mongo =====
/**
* Override to enable locking. Example return:
* ['type' => 'version', 'column' => '_v']
* Currently only 'version' (numeric increment) is supported for Mongo.
* @return array{type:string,column:string}|array{}
*/
protected function locking(): array { return []; }
private function hasOptimisticLocking(): bool
{
$cfg = $this->locking();
return is_array($cfg) && isset($cfg['type'], $cfg['column']) && $cfg['type'] === 'version' && is_string($cfg['column']) && $cfg['column'] !== '';
}
private function doImmediateUpdateWithLock(string $id, array $payload): void
{
if (!$this->hasOptimisticLocking()) {
$this->connection->updateOne($this->databaseName(), $this->collection(), ['_id' => $id], ['$set' => $payload]);
return;
}
$cfg = $this->locking();
$col = (string)$cfg['column'];
// Fetch current version
$docs = $this->connection->find($this->databaseName(), $this->collection(), ['_id' => $id], ['limit' => 1, 'projection' => [$col => 1]]);
$cur = $docs[0][$col] ?? null;
$filter = ['_id' => $id];
if ($cur !== null) { $filter[$col] = $cur; }
$update = ['$set' => $payload, '$inc' => [$col => 1]];
$modified = $this->connection->updateOne($this->databaseName(), $this->collection(), $filter, $update);
if ($cur !== null && $modified === 0) {
throw new \Pairity\Orm\OptimisticLockException('Optimistic lock failed for ' . static::class . ' id=' . $id);
}
}
}

View file

@ -0,0 +1,7 @@
<?php
namespace Pairity\Orm;
class OptimisticLockException extends \RuntimeException
{
}

View file

@ -5,6 +5,7 @@ namespace Pairity\Orm;
use Closure;
use Pairity\Model\AbstractDao as SqlDao;
use Pairity\NoSql\Mongo\AbstractMongoDao as MongoDao;
use Pairity\Events\Events;
/**
* Opt-in Unit of Work (MVP):
@ -22,6 +23,12 @@ final class UnitOfWork
/** @var array<string, array<string, object>> map[daoClass][id] = DTO */
private array $identityMap = [];
/** @var array<string, array<string, array<string,mixed>>> snapshots[daoClass][id] = array representation */
private array $snapshots = [];
/** @var array<string, array<string, object>> daoBind[daoClass][id] = DAO instance for updates */
private array $daoBind = [];
/** Enable snapshot diffing */
private bool $snapshotsEnabled = false;
/**
* Queues grouped by a connection hash key.
@ -91,6 +98,17 @@ final class UnitOfWork
public function attach(string $daoClass, string $id, object $dto): void
{
$this->identityMap[$daoClass][$id] = $dto;
if ($this->snapshotsEnabled) {
// store shallow snapshot
$arr = [];
try {
if (method_exists($dto, 'toArray')) {
/** @var array<string,mixed> $arr */
$arr = $dto->toArray(false);
}
} catch (\Throwable) { $arr = []; }
$this->snapshots[$daoClass][$id] = is_array($arr) ? $arr : [];
}
}
/** Fetch an attached DTO if present. */
@ -99,6 +117,36 @@ final class UnitOfWork
return $this->identityMap[$daoClass][$id] ?? null;
}
/** Bind a DAO instance to a managed entity for potential snapshot diff updates. */
public function bindDao(string $daoClass, string $id, object $dao): void
{
$this->daoBind[$daoClass][$id] = $dao;
}
/** Enable/disable snapshot diffing. */
public function enableSnapshots(bool $flag = true): static
{
$this->snapshotsEnabled = $flag;
return $this;
}
public function isManaged(string $daoClass, string $id): bool
{
return isset($this->identityMap[$daoClass][$id]);
}
public function detach(string $daoClass, string $id): void
{
unset($this->identityMap[$daoClass][$id], $this->snapshots[$daoClass][$id], $this->daoBind[$daoClass][$id]);
}
public function clear(): void
{
$this->identityMap = [];
$this->snapshots = [];
$this->daoBind = [];
}
// ===== Defer operations =====
/** Enqueue a mutation for the given connection object (back-compat, raw op). */
@ -126,10 +174,16 @@ final class UnitOfWork
{
// Ensure we run ops with DAO interception suspended to avoid re-enqueue
self::suspendDuring(function () {
// uow.beforeCommit
try { $payload = ['context' => 'uow']; Events::dispatcher()->dispatch('uow.beforeCommit', $payload); } catch (\Throwable) {}
// Grouped by connection type
foreach ($this->queues as $entry) {
$conn = $entry['conn'];
$ops = $this->expandAndOrder($entry['ops']);
// Inject snapshot-based updates for managed entities with diffs
$ops = $this->injectSnapshotDiffUpdates($ops);
// Coalesce multiple updates for the same entity and order update-before-delete
$ops = $this->coalesceAndOrderPerEntity($ops);
// PDO/SQL path: has transaction(callable)
if (method_exists($conn, 'transaction')) {
$conn->transaction(function () use ($ops) {
@ -153,6 +207,8 @@ final class UnitOfWork
foreach ($ops as $o) { ($o['op'])(); }
}
}
// uow.afterCommit
try { $payload2 = ['context' => 'uow']; Events::dispatcher()->dispatch('uow.afterCommit', $payload2); } catch (\Throwable) {}
});
// Clear queues after successful commit
@ -231,6 +287,143 @@ final class UnitOfWork
return $expanded;
}
/**
* Inject snapshot-based updates for managed entities that have changed but were not explicitly updated.
* Only when snapshots are enabled.
* @param array<int,array{op:Closure,meta:array<string,mixed>}> $ops
* @return array<int,array{op:Closure,meta:array<string,mixed>}> $ops
*/
private function injectSnapshotDiffUpdates(array $ops): array
{
if (!$this->snapshotsEnabled) {
return $ops;
}
// Build a set of entities already scheduled for update/delete
$scheduled = [];
foreach ($ops as $o) {
$m = $o['meta'] ?? [];
if (!isset($m['dao']) || !is_object($m['dao'])) continue;
$daoClass = get_class($m['dao']);
$id = (string)($m['id'] ?? '');
if ($id !== '') {
$scheduled[$daoClass][$id] = true;
}
}
// For each managed entity, if changed and not scheduled, enqueue update
foreach ($this->identityMap as $daoClass => $entities) {
foreach ($entities as $id => $dto) {
if (!$this->snapshotsEnabled) break;
$snap = $this->snapshots[$daoClass][$id] ?? null;
if ($snap === null) continue;
$now = [];
try { if (method_exists($dto, 'toArray')) { $now = $dto->toArray(false); } } catch (\Throwable) { $now = []; }
if (!is_array($now)) $now = [];
$diff = $this->diffAssoc($snap, $now);
if (!$diff) continue;
if (isset($scheduled[$daoClass][$id])) continue;
$dao = $this->daoBind[$daoClass][$id] ?? null;
if (!$dao) continue;
// Build op that performs immediate update under suspension
$op = function () use ($dao, $id, $diff) {
self::suspendDuring(function () use ($dao, $id, $diff) {
if (method_exists($dao, 'update')) { $dao->update($id, $diff); }
});
};
$ops[] = ['op' => $op, 'meta' => ['type' => 'update', 'mode' => 'byId', 'dao' => $dao, 'id' => (string)$id, 'payload' => $diff]];
}
}
return $ops;
}
/**
* Merge multiple updates for the same entity and ensure update happens before delete for that entity.
* @param array<int,array{op:Closure,meta:array<string,mixed>}> $ops
* @return array<int,array{op:Closure,meta:array<string,mixed>}> $ops
*/
private function coalesceAndOrderPerEntity(array $ops): array
{
// Group updates by daoClass+id
$updateMap = [];
$deleteSet = [];
foreach ($ops as $o) {
$m = $o['meta'] ?? [];
if (!isset($m['dao']) || !is_object($m['dao'])) continue;
$dao = $m['dao'];
$daoClass = get_class($dao);
$id = (string)($m['id'] ?? '');
if (($m['type'] ?? '') === 'update' && ($m['mode'] ?? '') === 'byId' && $id !== '') {
$key = $daoClass . '#' . $id;
$payload = (array)($m['payload'] ?? []);
if (!isset($updateMap[$key])) { $updateMap[$key] = ['dao' => $dao, 'id' => $id, 'payload' => []]; }
// merge (last write wins)
$updateMap[$key]['payload'] = array_merge($updateMap[$key]['payload'], $payload);
}
if (($m['type'] ?? '') === 'delete' && ($m['mode'] ?? '') === 'byId' && $id !== '') {
$deleteSet[$daoClass . '#' . $id] = ['dao' => $dao, 'id' => $id];
}
}
if (!$updateMap) { return $ops; }
// Rebuild ops: for each original op, skip individual updates; add one merged update before a delete for the same entity
$result = [];
$emittedUpdate = [];
foreach ($ops as $o) {
$m = $o['meta'] ?? [];
$emit = true;
$dao = $m['dao'] ?? null;
$daoClass = is_object($dao) ? get_class($dao) : null;
$id = (string)($m['id'] ?? '');
$key = ($daoClass && $id !== '') ? ($daoClass . '#' . $id) : null;
if (($m['type'] ?? '') === 'update' && ($m['mode'] ?? '') === 'byId' && $key && isset($updateMap[$key])) {
// skip individual update; we'll emit merged one once
$emit = false;
}
if (($m['type'] ?? '') === 'delete' && ($m['mode'] ?? '') === 'byId' && $key && isset($updateMap[$key]) && !isset($emittedUpdate[$key])) {
// emit merged update before delete
$merged = $updateMap[$key];
$updOp = function () use ($merged) {
self::suspendDuring(function () use ($merged) {
$dao = $merged['dao']; $id = $merged['id']; $payload = $merged['payload'];
if (method_exists($dao, 'update')) { $dao->update($id, $payload); }
});
};
$result[] = ['op' => $updOp, 'meta' => ['type' => 'update', 'mode' => 'byId', 'dao' => $merged['dao'], 'id' => (string)$merged['id'], 'payload' => $merged['payload']]];
$emittedUpdate[$key] = true;
}
if ($emit) {
$result[] = $o;
}
}
// For any remaining merged updates not emitted (no delete op), append them at the end
foreach ($updateMap as $k => $merged) {
if (isset($emittedUpdate[$k])) continue;
$updOp = function () use ($merged) {
self::suspendDuring(function () use ($merged) {
$dao = $merged['dao']; $id = $merged['id']; $payload = $merged['payload'];
if (method_exists($dao, 'update')) { $dao->update($id, $payload); }
});
};
$result[] = ['op' => $updOp, 'meta' => ['type' => 'update', 'mode' => 'byId', 'dao' => $merged['dao'], 'id' => (string)$merged['id'], 'payload' => $merged['payload']]];
}
return $result;
}
/** @param array<string,mixed> $a @param array<string,mixed> $b */
private function diffAssoc(array $a, array $b): array
{
$diff = [];
foreach ($b as $k => $v) {
// only simple scalar/array comparisons; nested DTOs are out of scope
$av = $a[$k] ?? null;
if ($av !== $v) {
$diff[$k] = $v;
}
}
// Skip keys present in $a but removed in $b to avoid unintended nulling
return $diff;
}
/**
* Read relations metadata from DAO instance if available.
* @return array<string,mixed>

View file

@ -0,0 +1,117 @@
<?php
declare(strict_types=1);
namespace Pairity\Tests;
use PHPUnit\Framework\TestCase;
use Pairity\Database\ConnectionManager;
use Pairity\Model\AbstractDao;
use Pairity\Model\AbstractDto;
use Pairity\Events\Events;
use Pairity\Orm\UnitOfWork;
final class EventSystemSqliteTest extends TestCase
{
private function conn()
{
return ConnectionManager::make(['driver' => 'sqlite', 'path' => ':memory:']);
}
public function testDaoEventsForInsertUpdateDeleteAndFind(): void
{
$conn = $this->conn();
$conn->execute('CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, status TEXT)');
$Dto = new class([]) extends AbstractDto {};
$dtoClass = get_class($Dto);
$Dao = new class($conn, $dtoClass) extends AbstractDao {
private string $dto; public function __construct($c,string $d){ parent::__construct($c); $this->dto=$d; }
public function getTable(): string { return 'users'; }
protected function dtoClass(): string { return $this->dto; }
protected function schema(): array { return ['primaryKey'=>'id','columns'=>['id'=>['cast'=>'int'],'name'=>['cast'=>'string'],'status'=>['cast'=>'string']]]; }
};
$dao = new $Dao($conn, $dtoClass);
$beforeInsertData = null; $afterInsertName = null; $afterUpdateName = null; $afterDeleteAffected = null; $afterFindCount = null;
Events::dispatcher()->clear();
Events::dispatcher()->listen('dao.beforeInsert', function(array &$p) use (&$beforeInsertData) {
if (($p['table'] ?? '') === 'users') {
// mutate data
$p['data']['status'] = 'mutated';
$beforeInsertData = $p['data'];
}
});
Events::dispatcher()->listen('dao.afterInsert', function(array &$p) use (&$afterInsertName) {
if (($p['table'] ?? '') === 'users' && $p['dto'] instanceof AbstractDto) {
$afterInsertName = $p['dto']->toArray(false)['name'] ?? null;
}
});
Events::dispatcher()->listen('dao.afterUpdate', function(array &$p) use (&$afterUpdateName) {
if (($p['table'] ?? '') === 'users' && $p['dto'] instanceof AbstractDto) {
$afterUpdateName = $p['dto']->toArray(false)['name'] ?? null;
}
});
Events::dispatcher()->listen('dao.afterDelete', function(array &$p) use (&$afterDeleteAffected) {
if (($p['table'] ?? '') === 'users') { $afterDeleteAffected = (int)($p['affected'] ?? 0); }
});
Events::dispatcher()->listen('dao.afterFind', function(array &$p) use (&$afterFindCount) {
if (($p['table'] ?? '') === 'users') {
if (isset($p['dto'])) { $afterFindCount = ($p['dto'] ? 1 : 0); }
if (isset($p['dtos'])) { $afterFindCount = is_array($p['dtos']) ? count($p['dtos']) : 0; }
}
});
// Insert (beforeInsert should set status)
$created = $dao->insert(['name' => 'Alice']);
$arr = $created->toArray(false);
$this->assertSame('mutated', $arr['status'] ?? null);
$this->assertSame('Alice', $afterInsertName);
// Update
$id = (int)$arr['id'];
$updated = $dao->update($id, ['name' => 'Alice2']);
$this->assertSame('Alice2', $afterUpdateName);
// Find
$one = $dao->findById($id);
$this->assertSame(1, $afterFindCount);
// Delete
$aff = $dao->deleteById($id);
$this->assertSame($aff, $afterDeleteAffected);
}
public function testUowBeforeAfterCommitEvents(): void
{
$conn = $this->conn();
$conn->execute('CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT)');
$Dto = new class([]) extends AbstractDto {};
$dtoClass = get_class($Dto);
$Dao = new class($conn, $dtoClass) extends AbstractDao {
private string $dto; public function __construct($c,string $d){ parent::__construct($c); $this->dto=$d; }
public function getTable(): string { return 'users'; }
protected function dtoClass(): string { return $this->dto; }
protected function schema(): array { return ['primaryKey'=>'id','columns'=>['id'=>['cast'=>'int'],'name'=>['cast'=>'string']]]; }
};
$dao = new $Dao($conn, $dtoClass);
$before = 0; $after = 0;
Events::dispatcher()->clear();
Events::dispatcher()->listen('uow.beforeCommit', function(array &$p) use (&$before) { $before++; });
Events::dispatcher()->listen('uow.afterCommit', function(array &$p) use (&$after) { $after++; });
$row = $dao->insert(['name' => 'X']);
$id = (int)($row->toArray(false)['id'] ?? 0);
UnitOfWork::run(function(UnitOfWork $uow) use ($dao, $id) {
$dao->update($id, ['name' => 'Y']);
$dao->deleteBy(['id' => $id]);
});
$this->assertSame(1, $before);
$this->assertSame(1, $after);
}
}

View file

@ -0,0 +1,72 @@
<?php
declare(strict_types=1);
namespace Pairity\Tests;
use PHPUnit\Framework\TestCase;
use Pairity\NoSql\Mongo\MongoConnectionManager;
use Pairity\NoSql\Mongo\AbstractMongoDao;
use Pairity\Model\AbstractDto;
use Pairity\Events\Events;
final class MongoEventSystemTest extends TestCase
{
private function hasMongoExt(): bool { return \extension_loaded('mongodb'); }
public function testDaoEventsFireOnCrud(): void
{
if (!$this->hasMongoExt()) { $this->markTestSkipped('ext-mongodb not loaded'); }
// Connect (skip if server unavailable)
try {
$conn = MongoConnectionManager::make([
'host' => \getenv('MONGO_HOST') ?: '127.0.0.1',
'port' => (int)(\getenv('MONGO_PORT') ?: 27017),
]);
} catch (\Throwable $e) {
$this->markTestSkipped('Mongo not available: ' . $e->getMessage());
}
$Dto = new class([]) extends AbstractDto {};
$dtoClass = \get_class($Dto);
$Dao = new class($conn, $dtoClass) extends AbstractMongoDao {
private string $dto; public function __construct($c,string $d){ parent::__construct($c); $this->dto=$d; }
protected function collection(): string { return 'pairity_test.events_users'; }
protected function dtoClass(): string { return $this->dto; }
};
$dao = new $Dao($conn, $dtoClass);
// Clean
foreach ($dao->findAllBy([]) as $doc) { $id = (string)($doc->toArray(false)['_id'] ?? ''); if ($id) { $dao->deleteById($id); } }
$beforeInsert = null; $afterInsert = false; $afterUpdate = false; $afterDelete = 0; $afterFind = 0;
Events::dispatcher()->clear();
Events::dispatcher()->listen('dao.beforeInsert', function(array &$p) use (&$beforeInsert){ if (($p['collection'] ?? '') === 'pairity_test.events_users'){ $p['data']['tag'] = 'x'; $beforeInsert = $p['data']; }});
Events::dispatcher()->listen('dao.afterInsert', function(array &$p) use (&$afterInsert){ if (($p['collection'] ?? '') === 'pairity_test.events_users'){ $afterInsert = true; }});
Events::dispatcher()->listen('dao.afterUpdate', function(array &$p) use (&$afterUpdate){ if (($p['collection'] ?? '') === 'pairity_test.events_users'){ $afterUpdate = true; }});
Events::dispatcher()->listen('dao.afterDelete', function(array &$p) use (&$afterDelete){ if (($p['collection'] ?? '') === 'pairity_test.events_users'){ $afterDelete += (int)($p['affected'] ?? 0); }});
Events::dispatcher()->listen('dao.afterFind', function(array &$p) use (&$afterFind){ if (($p['collection'] ?? '') === 'pairity_test.events_users'){ $afterFind += isset($p['dto']) ? (int)!!$p['dto'] : (is_array($p['dtos'] ?? null) ? count($p['dtos']) : 0); }});
// Insert
$created = $dao->insert(['email' => 'e@x.com']);
$this->assertTrue($afterInsert);
$this->assertSame('x', $created->toArray(false)['tag'] ?? null);
// Update
$id = (string)($created->toArray(false)['_id'] ?? '');
$dao->update($id, ['email' => 'e2@x.com']);
$this->assertTrue($afterUpdate);
// Find
$one = $dao->findById($id);
$this->assertNotNull($one);
$this->assertGreaterThanOrEqual(1, $afterFind);
// Delete
$aff = $dao->deleteById($id);
$this->assertSame($aff, $afterDelete);
}
}

View file

@ -0,0 +1,56 @@
<?php
declare(strict_types=1);
namespace Pairity\Tests;
use PHPUnit\Framework\TestCase;
use Pairity\NoSql\Mongo\MongoConnectionManager;
use Pairity\NoSql\Mongo\AbstractMongoDao;
use Pairity\Model\AbstractDto;
final class MongoOptimisticLockTest extends TestCase
{
private function hasMongoExt(): bool { return \extension_loaded('mongodb'); }
public function testVersionIncrementOnUpdate(): void
{
if (!$this->hasMongoExt()) { $this->markTestSkipped('ext-mongodb not loaded'); }
// Connect (skip if server unavailable)
try {
$conn = MongoConnectionManager::make([
'host' => \getenv('MONGO_HOST') ?: '127.0.0.1',
'port' => (int)(\getenv('MONGO_PORT') ?: 27017),
]);
} catch (\Throwable $e) {
$this->markTestSkipped('Mongo not available: ' . $e->getMessage());
}
$dto = new class([]) extends AbstractDto {};
$dtoClass = \get_class($dto);
$Dao = new class($conn, $dtoClass) extends AbstractMongoDao {
private string $dto; public function __construct($c, string $d){ parent::__construct($c); $this->dto=$d; }
protected function collection(): string { return 'pairity_test.lock_users'; }
protected function dtoClass(): string { return $this->dto; }
protected function locking(): array { return ['type' => 'version', 'column' => 'version']; }
};
$dao = new $Dao($conn, $dtoClass);
// Clean
foreach ($dao->findAllBy([]) as $doc) { $id = (string)($doc->toArray(false)['_id'] ?? ''); if ($id) { $dao->deleteById($id); } }
// Insert with initial version 0
$created = $dao->insert(['email' => 'lock@example.com', 'version' => 0]);
$id = (string)($created->toArray(false)['_id'] ?? '');
$this->assertNotEmpty($id);
// Update should bump version to 1
$dao->update($id, ['email' => 'lock2@example.com']);
$after = $dao->findById($id);
$this->assertNotNull($after);
$this->assertSame(1, (int)($after->toArray(false)['version'] ?? -1));
}
}

View file

@ -0,0 +1,63 @@
<?php
declare(strict_types=1);
namespace Pairity\Tests;
use PHPUnit\Framework\TestCase;
use Pairity\Database\ConnectionManager;
use Pairity\Model\AbstractDao;
use Pairity\Model\AbstractDto;
use Pairity\Orm\OptimisticLockException;
final class OptimisticLockSqliteTest extends TestCase
{
private function conn()
{
return ConnectionManager::make(['driver' => 'sqlite', 'path' => ':memory:']);
}
public function testVersionLockingIncrementsAndBlocksBulkUpdate(): void
{
$conn = $this->conn();
// schema with version column
$conn->execute('CREATE TABLE users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
version INTEGER NOT NULL DEFAULT 0
)');
$UserDto = new class([]) extends AbstractDto {};
$dtoClass = get_class($UserDto);
$UserDao = new class($conn, $dtoClass) extends AbstractDao {
private string $dto; public function __construct($c,string $dto){ parent::__construct($c); $this->dto=$dto; }
public function getTable(): string { return 'users'; }
protected function dtoClass(): string { return $this->dto; }
protected function schema(): array {
return [
'primaryKey' => 'id',
'columns' => [ 'id'=>['cast'=>'int'], 'name'=>['cast'=>'string'], 'version'=>['cast'=>'int'] ],
'locking' => ['type' => 'version', 'column' => 'version'],
];
}
};
$dao = new $UserDao($conn, $dtoClass);
// Insert
$created = $dao->insert(['name' => 'A']);
$arr = $created->toArray(false);
$id = (int)$arr['id'];
// First update: should succeed and bump version to 1
$dao->update($id, ['name' => 'A1']);
$row = $conn->query('SELECT name, version FROM users WHERE id = :id', ['id' => $id])[0] ?? [];
$this->assertSame('A1', (string)($row['name'] ?? ''));
$this->assertSame(1, (int)($row['version'] ?? 0));
// Bulk update should throw while locking enabled
$this->expectException(OptimisticLockException::class);
$dao->updateBy(['id' => $id], ['name' => 'A2']);
}
}