Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

base writeBytesAvailable on credits and emit signal when changed #47743

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/corelib/httprequest.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2012-2016 Fanout, Inc.
* Copyright (C) 2023 Fastly, Inc.
*
* This file is part of Pushpin.
*
Expand Down Expand Up @@ -88,6 +89,7 @@ class HttpRequest : public QObject
void readyRead();
// indicates output data written and/or output finished
void bytesWritten(int count);
void writeBytesChanged();
void paused();
void error();
};
Expand Down
60 changes: 40 additions & 20 deletions src/corelib/zhttprequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class ZhttpRequest::Private : public QObject
bool pendingUpdate;
bool needPause;
bool readableChanged;
bool writableChanged;
bool errored;
ErrorCondition errorCondition;
RTimer *expireTimer;
Expand Down Expand Up @@ -127,6 +128,7 @@ class ZhttpRequest::Private : public QObject
pendingUpdate(false),
needPause(false),
readableChanged(false),
writableChanged(false),
errored(false),
expireTimer(0),
keepAliveTimer(0),
Expand All @@ -153,6 +155,7 @@ class ZhttpRequest::Private : public QObject
{
needPause = false;
readableChanged = false;
writableChanged = false;

if(expireTimer)
{
Expand Down Expand Up @@ -425,7 +428,9 @@ class ZhttpRequest::Private : public QObject
// also send credits if we need to.

QByteArray buf = requestBodyBuf.take(outCredits);

outCredits -= buf.size();
writableChanged = true;

ZhttpRequestPacket p;
p.type = ZhttpRequestPacket::Data;
Expand Down Expand Up @@ -466,7 +471,11 @@ class ZhttpRequest::Private : public QObject
ZhttpResponsePacket packet;
packet.type = ZhttpResponsePacket::Data;
packet.body = responseBodyBuf.take(outCredits);

outCredits -= packet.body.size();
if(!packet.body.isEmpty())
writableChanged = true;

packet.more = (!responseBodyBuf.isEmpty() || !bodyFinished);

writePacket(packet);
Expand Down Expand Up @@ -580,20 +589,24 @@ class ZhttpRequest::Private : public QObject
}

if(packet.credits > 0)
{
outCredits += packet.credits;
writableChanged = true;
}

if(!packet.body.isEmpty() || (!done && haveRequestBody))
{
readableChanged = true;

if(readableChanged || writableChanged)
update();
}
}
else if(packet.type == ZhttpRequestPacket::Credit)
{
if(packet.credits > 0)
{
outCredits += packet.credits;
tryWrite();
writableChanged = true;
update();
}
}
else if(packet.type == ZhttpRequestPacket::KeepAlive)
Expand Down Expand Up @@ -742,26 +755,19 @@ class ZhttpRequest::Private : public QObject

responseBodyBuf += packet.body;

if(!doReq && packet.credits > 0)
if(packet.more)
{
outCredits += packet.credits;
if(outCredits > 0)
if(!doReq && packet.credits > 0)
{
// try to write anything that was waiting on credits
QPointer<QObject> self = this;
tryWrite();
if(!self)
return;
outCredits += packet.credits;
writableChanged = true;
}
}

if(packet.more)
{
if(needToSendHeaders || !packet.body.isEmpty())
{
readableChanged = true;

if(readableChanged || writableChanged)
update();
}
}
else
{
Expand All @@ -776,8 +782,8 @@ class ZhttpRequest::Private : public QObject
if(packet.credits > 0)
{
outCredits += packet.credits;
if(outCredits > 0)
tryWrite();
writableChanged = true;
update();
}
}
else if(packet.type == ZhttpResponsePacket::KeepAlive)
Expand Down Expand Up @@ -1036,6 +1042,12 @@ public slots:
else if(state == ClientRequesting)
{
tryWrite();

if(writableChanged)
{
writableChanged = false;
emit q->writeBytesChanged();
}
}
else if(state == ClientReceiving)
{
Expand Down Expand Up @@ -1127,6 +1139,12 @@ public slots:
else if(state == ServerResponding)
{
tryWrite();

if(writableChanged)
{
writableChanged = false;
emit q->writeBytesChanged();
}
}
}

Expand Down Expand Up @@ -1296,8 +1314,10 @@ int ZhttpRequest::bytesAvailable() const

int ZhttpRequest::writeBytesAvailable() const
{
if(d->responseBodyBuf.size() <= IDEAL_CREDITS)
return (IDEAL_CREDITS - d->responseBodyBuf.size());
if(d->server && d->responseBodyBuf.size() < d->outCredits)
return d->outCredits - d->responseBodyBuf.size();
else if(!d->server && d->requestBodyBuf.size() < d->outCredits)
return d->outCredits - d->requestBodyBuf.size();
else
return 0;
}
Expand Down
4 changes: 4 additions & 0 deletions src/handler/httpsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class HttpSession::Private : public QObject

req->setParent(this);
connect(req, &ZhttpRequest::bytesWritten, this, &Private::req_bytesWritten);
connect(req, &ZhttpRequest::writeBytesChanged, this, &Private::req_writeBytesChanged);
connect(req, &ZhttpRequest::error, this, &Private::req_error);

timer = new RTimer(this);
Expand Down Expand Up @@ -1420,7 +1421,10 @@ private slots:
doFinish();
return;
}
}

void req_writeBytesChanged()
{
if(state == SendingFirstInstructResponse)
{
tryWriteFirstInstructResponse();
Expand Down
Loading