Skip to content

Commit

Permalink
PHPLIB-1617 Accept a Pipeline instance in aggregate and watch methods (
Browse files Browse the repository at this point in the history
  • Loading branch information
GromNaN authored Feb 10, 2025
1 parent 9411fff commit 0bdbf47
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 28 deletions.
16 changes: 8 additions & 8 deletions src/Collection.php
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,16 @@ public function __toString(): string
* Executes an aggregation framework pipeline on the collection.
*
* @see Aggregate::__construct() for supported options
* @param array $pipeline Aggregation pipeline
* @param array $options Command options
* @param array|Pipeline $pipeline Aggregation pipeline
* @param array $options Command options
* @throws UnexpectedValueException if the command response was malformed
* @throws UnsupportedException if options are not supported by the selected server
* @throws InvalidArgumentException for parameter/option parsing errors
* @throws DriverRuntimeException for other driver errors (e.g. connection errors)
*/
public function aggregate(array $pipeline, array $options = []): CursorInterface
public function aggregate(array|Pipeline $pipeline, array $options = []): CursorInterface
{
if (is_builder_pipeline($pipeline)) {
if (is_array($pipeline) && is_builder_pipeline($pipeline)) {
$pipeline = new Pipeline(...$pipeline);
}

Expand Down Expand Up @@ -1014,13 +1014,13 @@ public function updateSearchIndex(string $name, array|object $definition, array
* Create a change stream for watching changes to the collection.
*
* @see Watch::__construct() for supported options
* @param array $pipeline Aggregation pipeline
* @param array $options Command options
* @param array|Pipeline $pipeline Aggregation pipeline
* @param array $options Command options
* @throws InvalidArgumentException for parameter/option parsing errors
*/
public function watch(array $pipeline = [], array $options = []): ChangeStream
public function watch(array|Pipeline $pipeline = [], array $options = []): ChangeStream
{
if (is_builder_pipeline($pipeline)) {
if (is_array($pipeline) && is_builder_pipeline($pipeline)) {
$pipeline = new Pipeline(...$pipeline);
}

Expand Down
16 changes: 8 additions & 8 deletions src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,16 @@ public function __toString(): string
* and $listLocalSessions. Requires MongoDB >= 3.6
*
* @see Aggregate::__construct() for supported options
* @param array $pipeline Aggregation pipeline
* @param array $options Command options
* @param array|Pipeline $pipeline Aggregation pipeline
* @param array $options Command options
* @throws UnexpectedValueException if the command response was malformed
* @throws UnsupportedException if options are not supported by the selected server
* @throws InvalidArgumentException for parameter/option parsing errors
* @throws DriverRuntimeException for other driver errors (e.g. connection errors)
*/
public function aggregate(array $pipeline, array $options = []): CursorInterface
public function aggregate(array|Pipeline $pipeline, array $options = []): CursorInterface
{
if (is_builder_pipeline($pipeline)) {
if (is_array($pipeline) && is_builder_pipeline($pipeline)) {
$pipeline = new Pipeline(...$pipeline);
}

Expand Down Expand Up @@ -582,13 +582,13 @@ public function selectGridFSBucket(array $options = []): Bucket
* Create a change stream for watching changes to the database.
*
* @see Watch::__construct() for supported options
* @param array $pipeline Aggregation pipeline
* @param array $options Command options
* @param array|Pipeline $pipeline Aggregation pipeline
* @param array $options Command options
* @throws InvalidArgumentException for parameter/option parsing errors
*/
public function watch(array $pipeline = [], array $options = []): ChangeStream
public function watch(array|Pipeline $pipeline = [], array $options = []): ChangeStream
{
if (is_builder_pipeline($pipeline)) {
if (is_array($pipeline) && is_builder_pipeline($pipeline)) {
$pipeline = new Pipeline(...$pipeline);
}

Expand Down
21 changes: 15 additions & 6 deletions tests/Collection/BuilderCollectionFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use MongoDB\Builder\Pipeline;
use MongoDB\Builder\Query;
use MongoDB\Builder\Stage;
use PHPUnit\Framework\Attributes\TestWith;

use function iterator_to_array;

Expand All @@ -18,7 +19,9 @@ public function setUp(): void
$this->collection->insertMany([['x' => 1], ['x' => 2], ['x' => 2]]);
}

public function testAggregate(): void
#[TestWith([true])]
#[TestWith([false])]
public function testAggregate(bool $pipelineAsArray): void
{
$this->collection->insertMany([['x' => 10], ['x' => 10], ['x' => 10]]);
$pipeline = new Pipeline(
Expand All @@ -27,8 +30,10 @@ public function testAggregate(): void
buckets: 2,
),
);
// Extract the list of stages for arg type restriction
$pipeline = iterator_to_array($pipeline);

if ($pipelineAsArray) {
$pipeline = iterator_to_array($pipeline);
}

$results = $this->collection->aggregate($pipeline)->toArray();
$this->assertCount(2, $results);
Expand Down Expand Up @@ -257,7 +262,9 @@ public function testUpdateManyWithPipeline(): void
$this->assertEquals(3, $result[0]->x);
}

public function testWatch(): void
#[TestWith([true])]
#[TestWith([false])]
public function testWatch(bool $pipelineAsArray): void
{
$this->skipIfChangeStreamIsNotSupported();

Expand All @@ -268,8 +275,10 @@ public function testWatch(): void
$pipeline = new Pipeline(
Stage::match(operationType: Query::eq('insert')),
);
// Extract the list of stages for arg type restriction
$pipeline = iterator_to_array($pipeline);

if ($pipelineAsArray) {
$pipeline = iterator_to_array($pipeline);
}

$changeStream = $this->collection->watch($pipeline);
$this->collection->insertOne(['x' => 3]);
Expand Down
21 changes: 15 additions & 6 deletions tests/Database/BuilderDatabaseFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use MongoDB\Builder\Pipeline;
use MongoDB\Builder\Query;
use MongoDB\Builder\Stage;
use PHPUnit\Framework\Attributes\TestWith;

use function iterator_to_array;

Expand All @@ -18,7 +19,9 @@ public function tearDown(): void
parent::tearDown();
}

public function testAggregate(): void
#[TestWith([true])]
#[TestWith([false])]
public function testAggregate(bool $pipelineAsArray): void
{
$this->skipIfServerVersion('<', '6.0.0', '$documents stage is not supported');

Expand All @@ -33,14 +36,18 @@ public function testAggregate(): void
buckets: 2,
),
);
// Extract the list of stages for arg type restriction
$pipeline = iterator_to_array($pipeline);

if ($pipelineAsArray) {
$pipeline = iterator_to_array($pipeline);
}

$results = $this->database->aggregate($pipeline)->toArray();
$this->assertCount(2, $results);
}

public function testWatch(): void
#[TestWith([true])]
#[TestWith([false])]
public function testWatch(bool $pipelineAsArray): void
{
$this->skipIfChangeStreamIsNotSupported();

Expand All @@ -51,8 +58,10 @@ public function testWatch(): void
$pipeline = new Pipeline(
Stage::match(operationType: Query::eq('insert')),
);
// Extract the list of stages for arg type restriction
$pipeline = iterator_to_array($pipeline);

if ($pipelineAsArray) {
$pipeline = iterator_to_array($pipeline);
}

$changeStream = $this->database->watch($pipeline);
$this->database->selectCollection($this->getCollectionName())->insertOne(['x' => 3]);
Expand Down

0 comments on commit 0bdbf47

Please sign in to comment.