Skip to content

Trying to get reactive backpressure to work #36

Open
@flyaruu

Description

Hi, I'm trying to do a PoC on reactive backpressure using pgadba, and I kind of get the impression it isn't honoring backpressure. I'm doing a simple query from a database using a rowpublisheroperation:

DataSource ds = DataSourceFactory.newFactory("org.postgresql.adba.PgDataSourceFactory")
    .builder()
    .url("jdbc:postgresql://localhost:5432/dvdrental")
    .username("postgres")
    .password("mysecretpassword")
    .build();

CompletableFuture<String> result = new CompletableFuture<>();
Session session = ds.getSession();
// Very basic SubmissionPublisher (and a Flow.Processor), so I can expose the result as a publisher
RowProcessor rp = new RowProcessor();
final CompletableFuture<String> completableFuture = session.<String>rowPublisherOperation("select title from film limit 1000")
			.subscribe(rp, result)
			.submit()
			.getCompletionStage()
			.toCompletableFuture();

// Convert to RxJava2, because I'm most confortable with that
long count = FlowInterop.fromFlowPublisher(rp)
	.doOnTerminate(()->session.close())
	.map(e->(String)e.at("title").get())
	.doOnNext(e->System.err.println("Title: "+e))
	.count()
	.blockingGet();
System.err.println("Count: "+count);

Where the rowprocessor is this:

public class RowProcessor extends SubmissionPublisher<Result.RowColumn> implements Flow.Processor<Result.RowColumn,Result.RowColumn> {

	private Subscription subscription;
	@Override
	public void onSubscribe(Subscription subscription) {
		this.subscription = subscription;
		this.subscription.request(1);
	}
	@Override
	public void onNext(RowColumn item) {
		submit(item);
		this.subscription.request(1);
	}
	@Override
	public void onError(Throwable throwable) {
		throwable.printStackTrace();
	}
	@Override
	public void onComplete() {
		System.err.println("completed");
		close();
	}
}

Now what happens is this: If I limit to about 500 it works fine.

select title from film limit 500

results in:

completed
Count: 500

All good. If I increase to 900 I see this:

completed
Count: 895

Along with 5 IllegalStateExceptions:

java.lang.IllegalStateException: failed to offer item to subscriber
	at org.postgresql.adba/org.postgresql.adba.submissions.ProcessorSubmission.lambda$0(ProcessorSubmission.java:82)
	at java.base/java.util.concurrent.SubmissionPublisher.retryOffer(SubmissionPublisher.java:445)
	at java.base/java.util.concurrent.SubmissionPublisher.doOffer(SubmissionPublisher.java:422)
	at java.base/java.util.concurrent.SubmissionPublisher.offer(SubmissionPublisher.java:550)
	at org.postgresql.adba/org.postgresql.adba.submissions.ProcessorSubmission.addRow(ProcessorSubmission.java:81)
	at org.postgresql.adba/org.postgresql.adba.communication.network.Portal.addDataRow(Portal.java:152)
	at org.postgresql.adba/org.postgresql.adba.communication.network.ExecuteResponse.read(ExecuteResponse.java:31)
	at org.postgresql.adba/org.postgresql.adba.communication.NetworkConnection.handleRead(NetworkConnection.java:407)
	at org.postgresql.adba/org.postgresql.adba.execution.DefaultNioLoop.run(DefaultNioLoop.java:127)
	at java.base/java.lang.Thread.run(Thread.java:844)

If I do more with the results (like printing the rows to the console) the problem gets worse, so I get the impression that if the pg driver does not slow down reading the result when the consumer can't keep up.

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions