diff --git a/hasql-notifications.cabal b/hasql-notifications.cabal index 1d41899..8a3e5a3 100644 --- a/hasql-notifications.cabal +++ b/hasql-notifications.cabal @@ -18,7 +18,7 @@ library exposed-modules: Hasql.Notifications build-depends: base >= 4.7 && < 5 , bytestring >= 0.10.8.2 - , text >= 1.2.3.1 && < 2.2 + , text >= 2 && < 2.2 , hasql-pool >= 0.4 && < 0.11 , bytestring >= 0.10 , postgresql-libpq >= 0.9 && < 1.0 diff --git a/src/Hasql/Notifications.hs b/src/Hasql/Notifications.hs index eb04c1b..d11c605 100644 --- a/src/Hasql/Notifications.hs +++ b/src/Hasql/Notifications.hs @@ -1,52 +1,51 @@ {-# LANGUAGE CPP #-} -{-| - This module has functions to send commands LISTEN and NOTIFY to the database server. - It also has a function to wait for and handle notifications on a database connection. - For more information check the [PostgreSQL documentation](https://www.postgresql.org/docs/current/libpq-notify.html). - --} +-- | +-- This module has functions to send commands LISTEN and NOTIFY to the database server. +-- It also has a function to wait for and handle notifications on a database connection. +-- +-- For more information check the [PostgreSQL documentation](https://www.postgresql.org/docs/current/libpq-notify.html). module Hasql.Notifications - ( notifyPool - , notify - , listen - , unlisten - , waitForNotifications - , PgIdentifier - , toPgIdentifier - , fromPgIdentifier - ) where - -import Hasql.Pool (Pool, UsageError, use) -import Hasql.Session (sql, run, statement) -import qualified Hasql.Session as S -import qualified Hasql.Statement as HST -import Hasql.Connection (Connection, withLibPQConnection) -import qualified Hasql.Decoders as HD -import qualified Hasql.Encoders as HE -import qualified Database.PostgreSQL.LibPQ as PQ + ( notifyPool, + notify, + listen, + unlisten, + waitForNotifications, + PgIdentifier, + toPgIdentifier, + fromPgIdentifier, + FatalError (..), + ) +where + +import Control.Concurrent (threadDelay, threadWaitRead) +import Control.Exception (Exception, throw) +import Control.Monad (forever, unless, void) +import Data.ByteString.Char8 (ByteString) +import Data.Functor.Contravariant (contramap) import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Encoding as T -import Data.ByteString.Char8 (ByteString) -import Data.Functor.Contravariant (contramap) -import Control.Monad (void, forever) -#if defined(mingw32_HOST_OS) -import Control.Concurrent ( threadDelay ) -#else -import Control.Concurrent (threadWaitRead) -#endif -import Control.Exception (Exception, throw) +import qualified Database.PostgreSQL.LibPQ as PQ +import Hasql.Connection (Connection, withLibPQConnection) +import qualified Hasql.Decoders as HD +import qualified Hasql.Encoders as HE +import Hasql.Pool (Pool, UsageError, use) +import Hasql.Session (run, sql, statement) +import qualified Hasql.Session as S +import qualified Hasql.Statement as HST -- | A wrapped text that represents a properly escaped and quoted PostgreSQL identifier newtype PgIdentifier = PgIdentifier Text deriving (Show) -- | Uncatchable exceptions thrown and never caught. -newtype FatalError = FatalError { fatalErrorMessage :: String } - deriving (Show) +newtype FatalError = FatalError {fatalErrorMessage :: String} instance Exception FatalError +instance Show FatalError where + show = fatalErrorMessage + -- | Given a PgIdentifier returns the wrapped text fromPgIdentifier :: PgIdentifier -> Text fromPgIdentifier (PgIdentifier bs) = bs @@ -60,101 +59,114 @@ toPgIdentifier x = strictlyReplaceQuotes = T.replace "\"" ("\"\"" :: Text) -- | Given a Hasql Pool, a channel and a message sends a notify command to the database -notifyPool :: Pool -- ^ Pool from which the connection will be used to issue a NOTIFY command. - -> Text -- ^ Channel where to send the notification - -> Text -- ^ Payload to be sent with the notification - -> IO (Either UsageError ()) +notifyPool :: + -- | Pool from which the connection will be used to issue a NOTIFY command. + Pool -> + -- | Channel where to send the notification + Text -> + -- | Payload to be sent with the notification + Text -> + IO (Either UsageError ()) notifyPool pool channel mesg = - use pool (statement (channel, mesg) callStatement) - where - callStatement = HST.Statement ("SELECT pg_notify" <> "($1, $2)") encoder HD.noResult False - encoder = contramap fst (HE.param $ HE.nonNullable HE.text) <> contramap snd (HE.param $ HE.nonNullable HE.text) + use pool (statement (channel, mesg) callStatement) + where + callStatement = HST.Statement ("SELECT pg_notify" <> "($1, $2)") encoder HD.noResult False + encoder = contramap fst (HE.param $ HE.nonNullable HE.text) <> contramap snd (HE.param $ HE.nonNullable HE.text) -- | Given a Hasql Connection, a channel and a message sends a notify command to the database -notify :: Connection -- ^ Connection to be used to send the NOTIFY command - -> PgIdentifier -- ^ Channel where to send the notification - -> Text -- ^ Payload to be sent with the notification - -> IO (Either S.QueryError ()) +notify :: + -- | Connection to be used to send the NOTIFY command + Connection -> + -- | Channel where to send the notification + PgIdentifier -> + -- | Payload to be sent with the notification + Text -> + IO (Either S.QueryError ()) notify con channel mesg = - run (sql $ T.encodeUtf8 ("NOTIFY " <> fromPgIdentifier channel <> ", '" <> mesg <> "'")) con - -{-| - Given a Hasql Connection and a channel sends a listen command to the database. - Once the connection sends the LISTEN command the server register its interest in the channel. - Hence it's important to keep track of which connection was used to open the listen command. - - Example of listening and waiting for a notification: - - @ - import System.Exit (die) - - import Hasql.Connection - import Hasql.Notifications - - main :: IO () - main = do - dbOrError <- acquire "postgres://localhost/db_name" - case dbOrError of - Right db -> do - let channelToListen = toPgIdentifier "sample-channel" - listen db channelToListen - waitForNotifications (\channel _ -> print $ "Just got notification on channel " <> channel) db - _ -> die "Could not open database connection" - @ --} -listen :: Connection -- ^ Connection to be used to send the LISTEN command - -> PgIdentifier -- ^ Channel this connection will be registered to listen to - -> IO () + run (sql $ T.encodeUtf8 ("NOTIFY " <> fromPgIdentifier channel <> ", '" <> mesg <> "'")) con + +-- | +-- Given a Hasql Connection and a channel sends a listen command to the database. +-- Once the connection sends the LISTEN command the server register its interest in the channel. +-- Hence it's important to keep track of which connection was used to open the listen command. +-- +-- Example of listening and waiting for a notification: +-- +-- @ +-- import System.Exit (die) +-- +-- import Hasql.Connection +-- import Hasql.Notifications +-- +-- main :: IO () +-- main = do +-- dbOrError <- acquire "postgres://localhost/db_name" +-- case dbOrError of +-- Right db -> do +-- let channelToListen = toPgIdentifier "sample-channel" +-- listen db channelToListen +-- waitForNotifications (\channel _ -> print $ "Just got notification on channel " <> channel) db +-- _ -> die "Could not open database connection" +-- @ +listen :: + -- | Connection to be used to send the LISTEN command + Connection -> + -- | Channel this connection will be registered to listen to + PgIdentifier -> + IO () listen con channel = void $ withLibPQConnection con execListen where execListen pqCon = void $ PQ.exec pqCon $ T.encodeUtf8 $ "LISTEN " <> fromPgIdentifier channel -- | Given a Hasql Connection and a channel sends a unlisten command to the database -unlisten :: Connection -- ^ Connection currently registerd by a previous 'listen' call - -> PgIdentifier -- ^ Channel this connection will be deregistered from - -> IO () +unlisten :: + -- | Connection currently registerd by a previous 'listen' call + Connection -> + -- | Channel this connection will be deregistered from + PgIdentifier -> + IO () unlisten con channel = void $ withLibPQConnection con execListen where execListen pqCon = void $ PQ.exec pqCon $ T.encodeUtf8 $ "UNLISTEN " <> fromPgIdentifier channel - -{-| - Given a function that handles notifications and a Hasql connection it will listen - on the database connection and call the handler everytime a message arrives. - - The message handler passed as first argument needs two parameters channel and payload. - See an example of handling notification on a separate thread: - - @ - import Control.Concurrent.Async (async) - import Control.Monad (void) - import System.Exit (die) - - import Hasql.Connection - import Hasql.Notifications - - notificationHandler :: ByteString -> ByteString -> IO() - notificationHandler channel payload = - void $ async do - print $ "Handle payload " <> payload <> " in its own thread" - - main :: IO () - main = do - dbOrError <- acquire "postgres://localhost/db_name" - case dbOrError of - Right db -> do - let channelToListen = toPgIdentifier "sample-channel" - listen db channelToListen - waitForNotifications notificationHandler db - _ -> die "Could not open database connection" - @ --} - -waitForNotifications :: (ByteString -> ByteString -> IO()) -- ^ Callback function to handle incoming notifications - -> Connection -- ^ Connection where we will listen to - -> IO () +-- | +-- Given a function that handles notifications and a Hasql connection it will listen +-- on the database connection and call the handler everytime a message arrives. +-- +-- The message handler passed as first argument needs two parameters channel and payload. +-- See an example of handling notification on a separate thread: +-- +-- @ +-- import Control.Concurrent.Async (async) +-- import Control.Monad (void) +-- import System.Exit (die) +-- +-- import Hasql.Connection +-- import Hasql.Notifications +-- +-- notificationHandler :: ByteString -> ByteString -> IO() +-- notificationHandler channel payload = +-- void $ async do +-- print $ "Handle payload " <> payload <> " in its own thread" +-- +-- main :: IO () +-- main = do +-- dbOrError <- acquire "postgres://localhost/db_name" +-- case dbOrError of +-- Right db -> do +-- let channelToListen = toPgIdentifier "sample-channel" +-- listen db channelToListen +-- waitForNotifications notificationHandler db +-- _ -> die "Could not open database connection" +-- @ +waitForNotifications :: + -- | Callback function to handle incoming notifications + (ByteString -> ByteString -> IO ()) -> + -- | Connection where we will listen to + Connection -> + IO () waitForNotifications sendNotification con = withLibPQConnection con $ void . forever . pqFetch where @@ -164,16 +176,15 @@ waitForNotifications sendNotification con = Nothing -> do mfd <- PQ.socket pqCon case mfd of - Nothing -> panic "Error checking for PostgreSQL notifications" -#if defined(mingw32_HOST_OS) - Just _ -> do - void $ threadDelay 1000000 -#else + Nothing -> void $ threadDelay 1000000 Just fd -> do void $ threadWaitRead fd -#endif - void $ PQ.consumeInput pqCon + + result <- PQ.consumeInput pqCon + unless result $ do + mError <- PQ.errorMessage pqCon + panic $ maybe "Error checking for PostgreSQL notifications" (T.unpack . T.decodeUtf8Lenient) mError Just notification -> - sendNotification (PQ.notifyRelname notification) (PQ.notifyExtra notification) + sendNotification (PQ.notifyRelname notification) (PQ.notifyExtra notification) panic :: String -> a panic a = throw (FatalError a) diff --git a/stack.yaml b/stack.yaml index 9e202fb..68caf3c 100644 --- a/stack.yaml +++ b/stack.yaml @@ -1,4 +1,4 @@ -resolver: lts-19.17 +resolver: lts-21.7 packages: - . diff --git a/stack.yaml.lock b/stack.yaml.lock index 9648c3f..0161ee7 100644 --- a/stack.yaml.lock +++ b/stack.yaml.lock @@ -6,7 +6,7 @@ packages: [] snapshots: - completed: - size: 619161 - url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/19/17.yaml - sha256: 7f47507fd037228a8d23cf830f5844e1f006221acebdd7cb49f2f5fb561e0546 - original: lts-19.17 + sha256: 23bb9bb355bfdb1635252e120a29b712f0d5e8a6c6a65c5ab5bd6692f46c438e + size: 640457 + url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/21/7.yaml + original: lts-21.7 diff --git a/test/Hasql/NotificationsSpec.hs b/test/Hasql/NotificationsSpec.hs index 113fe32..d944e6e 100644 --- a/test/Hasql/NotificationsSpec.hs +++ b/test/Hasql/NotificationsSpec.hs @@ -1,16 +1,14 @@ module Hasql.NotificationsSpec (main, spec) where -import Test.Hspec -import Test.QuickCheck - import Control.Concurrent (forkIO, killThread) import Control.Concurrent.MVar import Control.Monad (void) -import System.Exit (die) import Data.ByteString - import Hasql.Connection import Hasql.Notifications +import System.Exit (die) +import Test.Hspec +import Test.QuickCheck -- `main` is here so that this module can be run from GHCi on its own. It is -- not needed for automatic spec discovery. @@ -24,15 +22,21 @@ spec = do it "should call our notification handler" $ do dbOrError <- acquire "postgres://postgres:roottoor@localhost/hasql_notifications_test" case dbOrError of - Right db -> do - let channelToListen = toPgIdentifier "test-channel" - mailbox <- newEmptyMVar :: IO (MVar ByteString) - listen db channelToListen - threadId <- forkIO $ waitForNotifications (\channel payload -> putMVar mailbox $ "Just got notification on channel " <> channel <> ": " <> payload) db - notify db (toPgIdentifier "test-channel") "Payload" - takeMVar mailbox `shouldReturn` "Just got notification on channel test-channel: Payload" - _ -> die "Could not open database connection" + Right db -> do + let channelToListen = toPgIdentifier "test-channel" + mailbox <- newEmptyMVar :: IO (MVar ByteString) + listen db channelToListen + threadId <- forkIO $ waitForNotifications (\channel payload -> putMVar mailbox $ "Just got notification on channel " <> channel <> ": " <> payload) db + notify db (toPgIdentifier "test-channel") "Payload" + takeMVar mailbox `shouldReturn` "Just got notification on channel test-channel: Payload" + _ -> die "Could not open database connection" + describe "FatalError show instance" $ + it "extracts message" $ + ( show $ + FatalError {fatalErrorMessage = "some message"} + ) + `shouldBe` "some message" describe "toPgIdenfier" $ it "enclose text in quotes doubling existing ones" $ - fromPgIdentifier (toPgIdentifier "some \"identifier\"") `shouldBe` "\"some \"\"identifier\"\"\"" \ No newline at end of file + fromPgIdentifier (toPgIdentifier "some \"identifier\"") `shouldBe` "\"some \"\"identifier\"\"\""