Skip to content

Commit

Permalink
simplifying sync with sender
Browse files Browse the repository at this point in the history
  • Loading branch information
kazu-yamamoto committed Nov 11, 2024
1 parent ad57a9b commit f4b4f53
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 173 deletions.
9 changes: 5 additions & 4 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,15 @@ sendRequest Config{..} ctx@Context{..} strm OutObj{..} =
q <- sendStreaming ctx strm strmbdy
let next = nextForStreaming q
return (Just next, Just q)
((var, sync), out) <-
prepareSync strm (OHeader outObjHeaders mnext outObjTrailers) mtbq
let ot = OHeader outObjHeaders mnext outObjTrailers
(var, out) <- makeOutput strm ot
atomically $ do
sidOK <- readTVar outputQStreamID
check (sidOK == sid)
enqueueOutputSTM outputQ out
writeTVar outputQStreamID (sid + 2)
syncWithSender ctx strm var sync
enqueueOutputSTM outputQ out
lc <- newLoopCheck strm mtbq
syncWithSender' ctx var lc
where
label = "H2 request sender for stream " ++ show (streamNumber strm)

Expand Down
97 changes: 45 additions & 52 deletions Network/HTTP2/H2/Sender.hs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ frameSender
x <- atomically $ dequeue off
case x of
C ctl -> flushN off >> control ctl >> loop 0
O out -> outputOrEnqueueAgain out off >>= flushIfNecessary >>= loop
O out -> outputAndSync out off >>= flushIfNecessary >>= loop
Flush -> flushN off >> loop 0

-- Flush the connection buffer to the socket, where the first 'n' bytes of
Expand Down Expand Up @@ -139,29 +139,31 @@ frameSender
Just siz -> setLimitForEncoding siz encodeDynamicTable

----------------------------------------------------------------
outputOrEnqueueAgain :: Output -> Offset -> IO Offset
outputOrEnqueueAgain out@(Output strm otyp sync) off = E.handle (\e -> resetStream strm InternalError e >> return off) $ do
-- INVARIANT
--
-- Both the stream window and the connection window are open.
----------------------------------------------------------------
outputAndSync :: Output -> Offset -> IO Offset
outputAndSync out@(Output strm otyp sync) off = E.handle (\e -> resetStream strm InternalError e >> return off) $ do
state <- readStreamState strm
if isHalfClosedLocal state
then return off
else case otyp of
OHeader hdr mnext tlrmkr ->
-- Send headers immediately, without waiting for data
-- No need to check the streaming window (applies to DATA frames only)
outputHeader strm hdr mnext tlrmkr sync off
OHeader hdr mnext tlrmkr -> do
(off', mout') <- outputHeader strm hdr mnext tlrmkr sync off
case mout' of
Nothing -> sync Done
Just out' -> sync $ Cont out'
return off'
_ -> do
-- The 'sync' function usage constraints hold here: We
-- just popped off the only 'Output' for this stream,
-- and we only enqueue a new output (in 'output') if
-- 'sync' returns 'True'
ok <- sync $ Just otyp
if ok
then do
sws <- getStreamWindowSize strm
cws <- getConnectionWindowSize ctx -- not 0
let lim = min cws sws
output out off lim
else return off
sws <- getStreamWindowSize strm
cws <- getConnectionWindowSize ctx -- not 0
let lim = min cws sws
(off', mout') <- output out off lim
case mout' of
Nothing -> sync Done
Just out' -> sync $ Cont out'
return off'

resetStream :: Stream -> ErrorCode -> E.SomeException -> IO ()
resetStream strm err e = do
Expand All @@ -175,9 +177,9 @@ frameSender
-> [Header]
-> Maybe DynaNext
-> TrailersMaker
-> (Maybe OutputType -> IO Bool)
-> (Sync -> IO ())
-> Offset
-> IO Offset
-> IO (Offset, Maybe Output)
outputHeader strm hdr mnext tlrmkr sync off0 = do
-- Header frame and Continuation frame
let sid = streamNumber strm
Expand All @@ -186,19 +188,19 @@ frameSender
off' <- headerContinue sid ths endOfStream off0
-- halfClosedLocal calls closed which removes
-- the stream from stream table.
when endOfStream $ do
halfClosedLocal ctx strm Finished
void $ sync Nothing
off <- flushIfNecessary off'
case mnext of
Nothing -> return off
Nothing -> do
-- endOfStream
halfClosedLocal ctx strm Finished
return (off, Nothing)
Just next -> do
let out' = Output strm (ONext next tlrmkr) sync
outputOrEnqueueAgain out' off
return (off, Just out')

----------------------------------------------------------------
output :: Output -> Offset -> WindowSize -> IO Offset
output out@(Output strm (ONext curr tlrmkr) sync) off0 lim = do
output :: Output -> Offset -> WindowSize -> IO (Offset, Maybe Output)
output out@(Output strm (ONext curr tlrmkr) _) off0 lim = do
-- Data frame payload
buflim <- readIORef outputBufferLimit
let payloadOff = off0 + frameHeaderLength
Expand All @@ -208,13 +210,12 @@ frameSender
case next of
Next datPayloadLen reqflush mnext -> do
NextTrailersMaker tlrmkr' <- runTrailersMaker tlrmkr datBuf datPayloadLen
fillDataHeaderEnqueueNext
fillDataHeader
strm
off0
datPayloadLen
mnext
tlrmkr'
sync
out
reqflush
CancelNext mErr -> do
Expand All @@ -233,15 +234,14 @@ frameSender
resetStream strm InternalError err
Nothing ->
resetStream strm Cancel (E.toException CancelledStream)
return off0
output (Output strm (OPush ths pid) sync) off0 _lim = do
return (off0, Nothing)
output (Output strm (OPush ths pid) _) off0 _lim = do
-- Creating a push promise header
-- Frame id should be associated stream id from the client.
let sid = streamNumber strm
len <- pushPromise pid sid ths off0
off <- flushIfNecessary $ off0 + frameHeaderLength + len
_ <- sync Nothing
return off
return (off, Nothing)
output _ _ _ = undefined -- never reached

----------------------------------------------------------------
Expand Down Expand Up @@ -285,23 +285,21 @@ frameSender
continue off' ths' FrameContinuation

----------------------------------------------------------------
fillDataHeaderEnqueueNext
fillDataHeader
:: Stream
-> Offset
-> Int
-> Maybe DynaNext
-> (Maybe ByteString -> IO NextTrailersMaker)
-> (Maybe OutputType -> IO Bool)
-> Output
-> Bool
-> IO Offset
fillDataHeaderEnqueueNext
-> IO (Offset, Maybe Output)
fillDataHeader
strm@Stream{streamNumber}
off
datPayloadLen
Nothing
tlrmkr
sync
_
reqflush = do
let buf = confWriteBuffer `plusPtr` off
Expand All @@ -321,41 +319,37 @@ frameSender
else
return off
off'' <- handleTrailers mtrailers off'
_ <- sync Nothing
halfClosedLocal ctx strm Finished
if reqflush
then do
flushN off''
return 0
else return off''
return (0, Nothing)
else return (off'', Nothing)
where
handleTrailers Nothing off0 = return off0
handleTrailers (Just trailers) off0 = do
(ths, _) <- toTokenHeaderTable trailers
headerContinue streamNumber ths True {- endOfStream -} off0
fillDataHeaderEnqueueNext
fillDataHeader
_
off
0
(Just next)
tlrmkr
_
out
reqflush = do
let out' = out{outputType = ONext next tlrmkr}
enqueueOutput outputQ out'
if reqflush
then do
flushN off
return 0
else return off
fillDataHeaderEnqueueNext
return (0, Just out')
else return (off, Just out')
fillDataHeader
strm@Stream{streamNumber}
off
datPayloadLen
(Just next)
tlrmkr
_
out
reqflush = do
let buf = confWriteBuffer `plusPtr` off
Expand All @@ -364,12 +358,11 @@ frameSender
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
decreaseWindowSize ctx strm datPayloadLen
let out' = out{outputType = ONext next tlrmkr}
enqueueOutput outputQ out'
if reqflush
then do
flushN off'
return 0
else return off'
return (0, Just out')
else return (off', Just out')

----------------------------------------------------------------
pushPromise :: StreamId -> StreamId -> TokenHeaderList -> Offset -> IO Int
Expand Down
Loading

0 comments on commit f4b4f53

Please sign in to comment.