From 10c4dcabe0fff424fad50925f18077d65c83c8ae Mon Sep 17 00:00:00 2001 From: Edsko de Vries Date: Wed, 23 Oct 2024 16:23:23 +0200 Subject: [PATCH] WIP: Update for `http2#148` --- cabal.project | 14 ++++- .../src/Network/GRPC/Util/Session/Server.hs | 60 ++++++++++++++++++- .../Test/Sanity/BrokenDeployments.hs | 1 + grapesy/test-grapesy/Test/Sanity/Interop.hs | 1 + 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/cabal.project b/cabal.project index 7123e82b..adbd5701 100644 --- a/cabal.project +++ b/cabal.project @@ -1,4 +1,4 @@ -packages: +packages: ./grpc-spec , ./grapesy , ./tutorials/quickstart @@ -16,7 +16,17 @@ package grpc-spec package grapesy tests: True benchmarks: True - flags: +build-demo +build-stress-test + flags: +build-demo +build-stress-test + +source-repository-package + type: git + location: https://github.com/kazu-yamamoto/http2 + tag: 620687bea826b76436db9475c8566775592b4f18 + +source-repository-package + type: git + location: https://github.com/kazu-yamamoto/http2-tls + tag: 2d4725c16f7dc887c5792d46fae5b67a0907ca6d -- -- ghc 9.10 diff --git a/grapesy/src/Network/GRPC/Util/Session/Server.hs b/grapesy/src/Network/GRPC/Util/Session/Server.hs index 39e24ef0..4bc6891b 100644 --- a/grapesy/src/Network/GRPC/Util/Session/Server.hs +++ b/grapesy/src/Network/GRPC/Util/Session/Server.hs @@ -6,6 +6,11 @@ module Network.GRPC.Util.Session.Server ( import Network.HTTP2.Server qualified as Server +import Control.Concurrent +import Control.Exception +import Control.Monad +import Data.IORef + import Network.GRPC.Util.HTTP2.Stream import Network.GRPC.Util.Session.API import Network.GRPC.Util.Session.Channel @@ -76,15 +81,17 @@ setupResponseChannel sess FlowStartRegular headers -> do regular <- initFlowStateRegular headers markReady $ FlowStateRegular regular + (waitForWorker, wrapWorker) <- joinWithWorker let resp :: Server.Response resp = setResponseTrailers sess channel regular $ Server.responseStreamingIface (responseStatus responseInfo) (responseHeaders responseInfo) - $ \iface -> do + $ \iface -> wrapWorker $ do stream <- serverOutputStream iface sendMessageLoop sess regular stream respond conn resp + waitForWorker FlowStartNoMessages trailers -> do markReady $ FlowStateNoMessages trailers let resp :: Server.Response @@ -95,6 +102,57 @@ setupResponseChannel sess return channel +{------------------------------------------------------------------------------- + Auxiliary: join two threads +-------------------------------------------------------------------------------} + +data WorkerStatus = + -- | The thread ID of the worker thread is not yet known + WorkerUnknown + + -- | The worker thread is known, and has installed an exception handler + | WorkerReady ThreadId + + -- | We don't need the worker anymore, but its identity is not yet known + | WorkerPreventStart + +-- | Join current thread with worker thread +-- +-- The result of the parent thread will be the result of the worker thread; +-- any exception thrown to the parent will also interrupt the worker, and +-- any exception raised in the worker will be also be raised in the parent. +joinWithWorker :: forall a. IO (IO a, IO a -> IO ()) +joinWithWorker = do + sync :: MVar (Either SomeException a) <- newEmptyMVar + cancel :: IORef WorkerStatus <- newIORef WorkerUnknown + + let waitForWorker :: IO a + waitForWorker = do + handle cancelWorker $ + either throwIO return =<< readMVar sync + where + cancelWorker :: SomeException -> IO a + cancelWorker e = do + mWorker <- atomicModifyIORef cancel $ \case + WorkerUnknown -> (WorkerPreventStart, Nothing) + WorkerReady tid -> (undefined, Just tid) + WorkerPreventStart -> error "impossible" + forM_ mWorker $ \worker -> throwTo worker e + throwIO e + + wrapWorker :: IO a -> IO () + wrapWorker k = do + workerId <- myThreadId + handle (putMVar sync . Left) $ do + preventStart <- atomicModifyIORef cancel $ \case + WorkerUnknown -> (WorkerReady workerId, False) + WorkerPreventStart -> (undefined, True) + WorkerReady _tid -> error "impossible" + unless preventStart $ + putMVar sync =<< (Right <$> k) + + return (waitForWorker, wrapWorker) + {------------------------------------------------------------------------------- Auxiliary http2 -------------------------------------------------------------------------------} diff --git a/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs b/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs index d5ff176f..58b63157 100644 --- a/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs +++ b/grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs @@ -272,6 +272,7 @@ test_invalidRequestMetadata = respondWith response $ \addr -> do (Client.ResponseHeaders' HandledSynthesized) <- Client.withConnection connParams' (Client.ServerInsecure addr) $ \conn -> Client.withRPC conn def (Proxy @Ping) $ \call -> do + Client.sendEndOfInput call Client.recvInitialResponse call case mResp of Right headers diff --git a/grapesy/test-grapesy/Test/Sanity/Interop.hs b/grapesy/test-grapesy/Test/Sanity/Interop.hs index 17949426..d296e042 100644 --- a/grapesy/test-grapesy/Test/Sanity/Interop.hs +++ b/grapesy/test-grapesy/Test/Sanity/Interop.hs @@ -217,6 +217,7 @@ test_cancellation_client = testClientServer ClientServerTest { config = def { expectEarlyClientTermination = True + , serverPort = 50051 } , client = simpleTestClient $ \conn -> do -- We wait for the first input, but then cancel the request