Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update for http2#148 #256

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions cabal.project
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
packages:
packages:
./grpc-spec
, ./grapesy
, ./tutorials/quickstart
Expand All @@ -16,7 +16,17 @@ package grpc-spec
package grapesy
tests: True
benchmarks: True
flags: +build-demo +build-stress-test
flags: +build-demo +build-stress-test

source-repository-package
type: git
location: https://github.com/kazu-yamamoto/http2
tag: 620687bea826b76436db9475c8566775592b4f18

source-repository-package
type: git
location: https://github.com/kazu-yamamoto/http2-tls
tag: 2d4725c16f7dc887c5792d46fae5b67a0907ca6d

--
-- ghc 9.10
Expand Down
60 changes: 59 additions & 1 deletion grapesy/src/Network/GRPC/Util/Session/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ module Network.GRPC.Util.Session.Server (

import Network.HTTP2.Server qualified as Server

import Control.Concurrent
import Control.Exception
import Control.Monad
import Data.IORef

import Network.GRPC.Util.HTTP2.Stream
import Network.GRPC.Util.Session.API
import Network.GRPC.Util.Session.Channel
Expand Down Expand Up @@ -76,15 +81,17 @@ setupResponseChannel sess
FlowStartRegular headers -> do
regular <- initFlowStateRegular headers
markReady $ FlowStateRegular regular
(waitForWorker, wrapWorker) <- joinWithWorker
let resp :: Server.Response
resp = setResponseTrailers sess channel regular
$ Server.responseStreamingIface
(responseStatus responseInfo)
(responseHeaders responseInfo)
$ \iface -> do
$ \iface -> wrapWorker $ do
stream <- serverOutputStream iface
sendMessageLoop sess regular stream
respond conn resp
waitForWorker
FlowStartNoMessages trailers -> do
markReady $ FlowStateNoMessages trailers
let resp :: Server.Response
Expand All @@ -95,6 +102,57 @@ setupResponseChannel sess

return channel

{-------------------------------------------------------------------------------
Auxiliary: join two threads
-------------------------------------------------------------------------------}

data WorkerStatus =
-- | The thread ID of the worker thread is not yet known
WorkerUnknown

-- | The worker thread is known, and has installed an exception handler
| WorkerReady ThreadId

-- | We don't need the worker anymore, but its identity is not yet known
| WorkerPreventStart

-- | Join current thread with worker thread
--
-- The result of the parent thread will be the result of the worker thread;
-- any exception thrown to the parent will also interrupt the worker, and
-- any exception raised in the worker will be also be raised in the parent.
joinWithWorker :: forall a. IO (IO a, IO a -> IO ())
joinWithWorker = do
sync :: MVar (Either SomeException a) <- newEmptyMVar
cancel :: IORef WorkerStatus <- newIORef WorkerUnknown

let waitForWorker :: IO a
waitForWorker = do
handle cancelWorker $
either throwIO return =<< readMVar sync
where
cancelWorker :: SomeException -> IO a
cancelWorker e = do
mWorker <- atomicModifyIORef cancel $ \case
WorkerUnknown -> (WorkerPreventStart, Nothing)
WorkerReady tid -> (undefined, Just tid)
WorkerPreventStart -> error "impossible"
forM_ mWorker $ \worker -> throwTo worker e
throwIO e

wrapWorker :: IO a -> IO ()
wrapWorker k = do
workerId <- myThreadId
handle (putMVar sync . Left) $ do
preventStart <- atomicModifyIORef cancel $ \case
WorkerUnknown -> (WorkerReady workerId, False)
WorkerPreventStart -> (undefined, True)
WorkerReady _tid -> error "impossible"
unless preventStart $
putMVar sync =<< (Right <$> k)

return (waitForWorker, wrapWorker)

{-------------------------------------------------------------------------------
Auxiliary http2
-------------------------------------------------------------------------------}
Expand Down
1 change: 1 addition & 0 deletions grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ test_invalidRequestMetadata = respondWith response $ \addr -> do
(Client.ResponseHeaders' HandledSynthesized) <-
Client.withConnection connParams' (Client.ServerInsecure addr) $ \conn ->
Client.withRPC conn def (Proxy @Ping) $ \call -> do
Client.sendEndOfInput call
Client.recvInitialResponse call
case mResp of
Right headers
Expand Down
1 change: 1 addition & 0 deletions grapesy/test-grapesy/Test/Sanity/Interop.hs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ test_cancellation_client =
testClientServer ClientServerTest {
config = def {
expectEarlyClientTermination = True
, serverPort = 50051
}
, client = simpleTestClient $ \conn -> do
-- We wait for the first input, but then cancel the request
Expand Down