Skip to content

Commit

Permalink
spawning a thread for sendRequest of client.
Browse files Browse the repository at this point in the history
  • Loading branch information
kazu-yamamoto committed Nov 8, 2024
1 parent 97b1fc1 commit cb2ce0f
Showing 1 changed file with 49 additions and 41 deletions.
90 changes: 49 additions & 41 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ run cconf@ClientConfig{..} conf client = do
{ auxPossibleClientStreams = possibleClientStream ctx
}
clientCore ctx req processResponse = do
strm <- sendRequest conf ctx scheme authority req
(strm, moutobj) <- makeStream ctx scheme authority req
case moutobj of
Nothing -> return ()
Just outobj -> sendRequest conf ctx strm outobj
rsp <- getResponse strm
x <- processResponse rsp
adjustRxWindow ctx strm
Expand All @@ -109,7 +112,10 @@ runIO cconf@ClientConfig{..} conf@Config{..} action = do
ctx@Context{..} <- setup cconf conf
let putB bs = enqueueControl controlQ $ CFrames Nothing [bs]
putR req = do
strm <- sendRequest conf ctx scheme authority req
(strm, moutobj) <- makeStream ctx scheme authority req
case moutobj of
Nothing -> return ()
Just outobj -> sendRequest conf ctx strm outobj
return (streamNumber strm, strm)
get = getResponse
create = openOddStreamWait ctx
Expand Down Expand Up @@ -165,23 +171,22 @@ runH2 conf ctx runClient = do
Left () -> do
wait runningClient

sendRequest
:: Config
-> Context
makeStream
:: Context
-> Scheme
-> Authority
-> Request
-> IO Stream
sendRequest conf ctx@Context{..} scheme auth (Request req) = do
-> IO (Stream, Maybe OutObj)
makeStream ctx@Context{..} scheme auth (Request req) = do
-- Checking push promises
let hdr0 = outObjHeaders req
method = fromMaybe (error "sendRequest:method") $ lookup ":method" hdr0
path = fromMaybe (error "sendRequest:path") $ lookup ":path" hdr0
method = fromMaybe (error "makeStream:method") $ lookup ":method" hdr0
path = fromMaybe (error "makeStream:path") $ lookup ":path" hdr0
mstrm0 <- lookupEvenCache evenStreamTable method path
case mstrm0 of
Just strm0 -> do
deleteEvenCache evenStreamTable method path
return strm0
return (strm0, Nothing)
Nothing -> do
-- Arch/Sender is originally implemented for servers where
-- the ordering of responses can be out-of-order.
Expand All @@ -200,38 +205,41 @@ sendRequest conf ctx@Context{..} scheme auth (Request req) = do
| otherwise = hdr1
req' = req{outObjHeaders = hdr2}
-- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
(sid, newstrm) <- openOddStreamWait ctx
sendHeaderBody conf ctx sid newstrm req'
return newstrm
(_sid, newstrm) <- openOddStreamWait ctx
return (newstrm, Just req')

sendHeaderBody :: Config -> Context -> StreamId -> Stream -> OutObj -> IO ()
sendHeaderBody Config{..} ctx@Context{..} sid newstrm OutObj{..} = do
(mnext, mtbq) <- case outObjBody of
OutBodyNone -> return (Nothing, Nothing)
OutBodyFile (FileSpec path fileoff bytecount) -> do
(pread, sentinel) <- confPositionReadMaker path
let next = fillFileBodyGetNext pread fileoff bytecount sentinel
return (Just next, Nothing)
OutBodyBuilder builder -> do
let next = fillBuilderBodyGetNext builder
return (Just next, Nothing)
OutBodyStreaming strmbdy -> do
q <- sendStreaming ctx newstrm $ \iface ->
outBodyUnmask iface $ strmbdy (outBodyPush iface) (outBodyFlush iface)
let next = nextForStreaming q
return (Just next, Just q)
OutBodyStreamingIface strmbdy -> do
q <- sendStreaming ctx newstrm strmbdy
let next = nextForStreaming q
return (Just next, Just q)
((var, sync), out) <-
prepareSync newstrm (OHeader outObjHeaders mnext outObjTrailers) mtbq
atomically $ do
sidOK <- readTVar outputQStreamID
check (sidOK == sid)
enqueueOutputSTM outputQ out
writeTVar outputQStreamID (sid + 2)
forkManaged threadManager "H2 worker" $ syncWithSender ctx newstrm var sync
sendRequest :: Config -> Context -> Stream -> OutObj -> IO ()
sendRequest Config{..} ctx@Context{..} strm OutObj{..} =
forkManaged threadManager label $ do
let sid = streamNumber strm
(mnext, mtbq) <- case outObjBody of
OutBodyNone -> return (Nothing, Nothing)
OutBodyFile (FileSpec path fileoff bytecount) -> do
(pread, sentinel) <- confPositionReadMaker path
let next = fillFileBodyGetNext pread fileoff bytecount sentinel
return (Just next, Nothing)
OutBodyBuilder builder -> do
let next = fillBuilderBodyGetNext builder
return (Just next, Nothing)
OutBodyStreaming strmbdy -> do
q <- sendStreaming ctx strm $ \iface ->
outBodyUnmask iface $ strmbdy (outBodyPush iface) (outBodyFlush iface)
let next = nextForStreaming q
return (Just next, Just q)
OutBodyStreamingIface strmbdy -> do
q <- sendStreaming ctx strm strmbdy
let next = nextForStreaming q
return (Just next, Just q)
((var, sync), out) <-
prepareSync strm (OHeader outObjHeaders mnext outObjTrailers) mtbq
atomically $ do
sidOK <- readTVar outputQStreamID
check (sidOK == sid)
enqueueOutputSTM outputQ out
writeTVar outputQStreamID (sid + 2)
syncWithSender ctx strm var sync
where
label = "H2 request sender for stream " ++ show (streamNumber strm)

sendStreaming
:: Context
Expand Down

0 comments on commit cb2ce0f

Please sign in to comment.