Open
Description
Hi, I'm trying to do a PoC on reactive backpressure using pgadba, and I kind of get the impression it isn't honoring backpressure. I'm doing a simple query from a database using a rowpublisheroperation:
DataSource ds = DataSourceFactory.newFactory("org.postgresql.adba.PgDataSourceFactory")
.builder()
.url("jdbc:postgresql://localhost:5432/dvdrental")
.username("postgres")
.password("mysecretpassword")
.build();
CompletableFuture<String> result = new CompletableFuture<>();
Session session = ds.getSession();
// Very basic SubmissionPublisher (and a Flow.Processor), so I can expose the result as a publisher
RowProcessor rp = new RowProcessor();
final CompletableFuture<String> completableFuture = session.<String>rowPublisherOperation("select title from film limit 1000")
.subscribe(rp, result)
.submit()
.getCompletionStage()
.toCompletableFuture();
// Convert to RxJava2, because I'm most confortable with that
long count = FlowInterop.fromFlowPublisher(rp)
.doOnTerminate(()->session.close())
.map(e->(String)e.at("title").get())
.doOnNext(e->System.err.println("Title: "+e))
.count()
.blockingGet();
System.err.println("Count: "+count);
Where the rowprocessor is this:
public class RowProcessor extends SubmissionPublisher<Result.RowColumn> implements Flow.Processor<Result.RowColumn,Result.RowColumn> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(RowColumn item) {
submit(item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.err.println("completed");
close();
}
}
Now what happens is this: If I limit to about 500 it works fine.
select title from film limit 500
results in:
completed
Count: 500
All good. If I increase to 900 I see this:
completed
Count: 895
Along with 5 IllegalStateExceptions:
java.lang.IllegalStateException: failed to offer item to subscriber
at org.postgresql.adba/org.postgresql.adba.submissions.ProcessorSubmission.lambda$0(ProcessorSubmission.java:82)
at java.base/java.util.concurrent.SubmissionPublisher.retryOffer(SubmissionPublisher.java:445)
at java.base/java.util.concurrent.SubmissionPublisher.doOffer(SubmissionPublisher.java:422)
at java.base/java.util.concurrent.SubmissionPublisher.offer(SubmissionPublisher.java:550)
at org.postgresql.adba/org.postgresql.adba.submissions.ProcessorSubmission.addRow(ProcessorSubmission.java:81)
at org.postgresql.adba/org.postgresql.adba.communication.network.Portal.addDataRow(Portal.java:152)
at org.postgresql.adba/org.postgresql.adba.communication.network.ExecuteResponse.read(ExecuteResponse.java:31)
at org.postgresql.adba/org.postgresql.adba.communication.NetworkConnection.handleRead(NetworkConnection.java:407)
at org.postgresql.adba/org.postgresql.adba.execution.DefaultNioLoop.run(DefaultNioLoop.java:127)
at java.base/java.lang.Thread.run(Thread.java:844)
If I do more with the results (like printing the rows to the console) the problem gets worse, so I get the impression that if the pg driver does not slow down reading the result when the consumer can't keep up.
Metadata
Assignees
Labels
No labels
Activity