Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix panic with generic error #21

Merged
merged 7 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hasql-notifications.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ library
exposed-modules: Hasql.Notifications
build-depends: base >= 4.7 && < 5
, bytestring >= 0.10.8.2
, text >= 1.2.3.1 && < 2.2
, text >= 2 && < 2.2
, hasql-pool >= 0.4 && < 0.11
, bytestring >= 0.10
, postgresql-libpq >= 0.9 && < 1.0
Expand Down
257 changes: 134 additions & 123 deletions src/Hasql/Notifications.hs
Original file line number Diff line number Diff line change
@@ -1,52 +1,51 @@
{-# LANGUAGE CPP #-}
{-|
This module has functions to send commands LISTEN and NOTIFY to the database server.
It also has a function to wait for and handle notifications on a database connection.

For more information check the [PostgreSQL documentation](https://www.postgresql.org/docs/current/libpq-notify.html).

-}
-- |
-- This module has functions to send commands LISTEN and NOTIFY to the database server.
-- It also has a function to wait for and handle notifications on a database connection.
--
-- For more information check the [PostgreSQL documentation](https://www.postgresql.org/docs/current/libpq-notify.html).
module Hasql.Notifications
( notifyPool
, notify
, listen
, unlisten
, waitForNotifications
, PgIdentifier
, toPgIdentifier
, fromPgIdentifier
) where

import Hasql.Pool (Pool, UsageError, use)
import Hasql.Session (sql, run, statement)
import qualified Hasql.Session as S
import qualified Hasql.Statement as HST
import Hasql.Connection (Connection, withLibPQConnection)
import qualified Hasql.Decoders as HD
import qualified Hasql.Encoders as HE
import qualified Database.PostgreSQL.LibPQ as PQ
( notifyPool,
notify,
listen,
unlisten,
waitForNotifications,
PgIdentifier,
toPgIdentifier,
fromPgIdentifier,
FatalError (..),
)
where

import Control.Concurrent (threadDelay, threadWaitRead)
import Control.Exception (Exception, throw)
import Control.Monad (forever, unless, void)
import Data.ByteString.Char8 (ByteString)
import Data.Functor.Contravariant (contramap)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.ByteString.Char8 (ByteString)
import Data.Functor.Contravariant (contramap)
import Control.Monad (void, forever)
#if defined(mingw32_HOST_OS)
import Control.Concurrent ( threadDelay )
#else
import Control.Concurrent (threadWaitRead)
#endif
import Control.Exception (Exception, throw)
import qualified Database.PostgreSQL.LibPQ as PQ
import Hasql.Connection (Connection, withLibPQConnection)
import qualified Hasql.Decoders as HD
import qualified Hasql.Encoders as HE
import Hasql.Pool (Pool, UsageError, use)
import Hasql.Session (run, sql, statement)
import qualified Hasql.Session as S
import qualified Hasql.Statement as HST

-- | A wrapped text that represents a properly escaped and quoted PostgreSQL identifier
newtype PgIdentifier = PgIdentifier Text deriving (Show)

-- | Uncatchable exceptions thrown and never caught.
newtype FatalError = FatalError { fatalErrorMessage :: String }
deriving (Show)
newtype FatalError = FatalError {fatalErrorMessage :: String}

instance Exception FatalError

instance Show FatalError where
show = fatalErrorMessage

-- | Given a PgIdentifier returns the wrapped text
fromPgIdentifier :: PgIdentifier -> Text
fromPgIdentifier (PgIdentifier bs) = bs
Expand All @@ -60,101 +59,114 @@ toPgIdentifier x =
strictlyReplaceQuotes = T.replace "\"" ("\"\"" :: Text)

-- | Given a Hasql Pool, a channel and a message sends a notify command to the database
notifyPool :: Pool -- ^ Pool from which the connection will be used to issue a NOTIFY command.
-> Text -- ^ Channel where to send the notification
-> Text -- ^ Payload to be sent with the notification
-> IO (Either UsageError ())
notifyPool ::
-- | Pool from which the connection will be used to issue a NOTIFY command.
Pool ->
-- | Channel where to send the notification
Text ->
-- | Payload to be sent with the notification
Text ->
IO (Either UsageError ())
notifyPool pool channel mesg =
use pool (statement (channel, mesg) callStatement)
where
callStatement = HST.Statement ("SELECT pg_notify" <> "($1, $2)") encoder HD.noResult False
encoder = contramap fst (HE.param $ HE.nonNullable HE.text) <> contramap snd (HE.param $ HE.nonNullable HE.text)
use pool (statement (channel, mesg) callStatement)
where
callStatement = HST.Statement ("SELECT pg_notify" <> "($1, $2)") encoder HD.noResult False
encoder = contramap fst (HE.param $ HE.nonNullable HE.text) <> contramap snd (HE.param $ HE.nonNullable HE.text)

-- | Given a Hasql Connection, a channel and a message sends a notify command to the database
notify :: Connection -- ^ Connection to be used to send the NOTIFY command
-> PgIdentifier -- ^ Channel where to send the notification
-> Text -- ^ Payload to be sent with the notification
-> IO (Either S.QueryError ())
notify ::
-- | Connection to be used to send the NOTIFY command
Connection ->
-- | Channel where to send the notification
PgIdentifier ->
-- | Payload to be sent with the notification
Text ->
IO (Either S.QueryError ())
notify con channel mesg =
run (sql $ T.encodeUtf8 ("NOTIFY " <> fromPgIdentifier channel <> ", '" <> mesg <> "'")) con

{-|
Given a Hasql Connection and a channel sends a listen command to the database.
Once the connection sends the LISTEN command the server register its interest in the channel.
Hence it's important to keep track of which connection was used to open the listen command.

Example of listening and waiting for a notification:

@
import System.Exit (die)

import Hasql.Connection
import Hasql.Notifications

main :: IO ()
main = do
dbOrError <- acquire "postgres://localhost/db_name"
case dbOrError of
Right db -> do
let channelToListen = toPgIdentifier "sample-channel"
listen db channelToListen
waitForNotifications (\channel _ -> print $ "Just got notification on channel " <> channel) db
_ -> die "Could not open database connection"
@
-}
listen :: Connection -- ^ Connection to be used to send the LISTEN command
-> PgIdentifier -- ^ Channel this connection will be registered to listen to
-> IO ()
run (sql $ T.encodeUtf8 ("NOTIFY " <> fromPgIdentifier channel <> ", '" <> mesg <> "'")) con

-- |
-- Given a Hasql Connection and a channel sends a listen command to the database.
-- Once the connection sends the LISTEN command the server register its interest in the channel.
-- Hence it's important to keep track of which connection was used to open the listen command.
--
-- Example of listening and waiting for a notification:
--
-- @
-- import System.Exit (die)
--
-- import Hasql.Connection
-- import Hasql.Notifications
--
-- main :: IO ()
-- main = do
-- dbOrError <- acquire "postgres://localhost/db_name"
-- case dbOrError of
-- Right db -> do
-- let channelToListen = toPgIdentifier "sample-channel"
-- listen db channelToListen
-- waitForNotifications (\channel _ -> print $ "Just got notification on channel " <> channel) db
-- _ -> die "Could not open database connection"
-- @
listen ::
-- | Connection to be used to send the LISTEN command
Connection ->
-- | Channel this connection will be registered to listen to
PgIdentifier ->
IO ()
listen con channel =
void $ withLibPQConnection con execListen
where
execListen pqCon = void $ PQ.exec pqCon $ T.encodeUtf8 $ "LISTEN " <> fromPgIdentifier channel

-- | Given a Hasql Connection and a channel sends a unlisten command to the database
unlisten :: Connection -- ^ Connection currently registerd by a previous 'listen' call
-> PgIdentifier -- ^ Channel this connection will be deregistered from
-> IO ()
unlisten ::
-- | Connection currently registerd by a previous 'listen' call
Connection ->
-- | Channel this connection will be deregistered from
PgIdentifier ->
IO ()
unlisten con channel =
void $ withLibPQConnection con execListen
where
execListen pqCon = void $ PQ.exec pqCon $ T.encodeUtf8 $ "UNLISTEN " <> fromPgIdentifier channel


{-|
Given a function that handles notifications and a Hasql connection it will listen
on the database connection and call the handler everytime a message arrives.

The message handler passed as first argument needs two parameters channel and payload.
See an example of handling notification on a separate thread:

@
import Control.Concurrent.Async (async)
import Control.Monad (void)
import System.Exit (die)

import Hasql.Connection
import Hasql.Notifications

notificationHandler :: ByteString -> ByteString -> IO()
notificationHandler channel payload =
void $ async do
print $ "Handle payload " <> payload <> " in its own thread"

main :: IO ()
main = do
dbOrError <- acquire "postgres://localhost/db_name"
case dbOrError of
Right db -> do
let channelToListen = toPgIdentifier "sample-channel"
listen db channelToListen
waitForNotifications notificationHandler db
_ -> die "Could not open database connection"
@
-}

waitForNotifications :: (ByteString -> ByteString -> IO()) -- ^ Callback function to handle incoming notifications
-> Connection -- ^ Connection where we will listen to
-> IO ()
-- |
-- Given a function that handles notifications and a Hasql connection it will listen
-- on the database connection and call the handler everytime a message arrives.
--
-- The message handler passed as first argument needs two parameters channel and payload.
-- See an example of handling notification on a separate thread:
--
-- @
-- import Control.Concurrent.Async (async)
-- import Control.Monad (void)
-- import System.Exit (die)
--
-- import Hasql.Connection
-- import Hasql.Notifications
--
-- notificationHandler :: ByteString -> ByteString -> IO()
-- notificationHandler channel payload =
-- void $ async do
-- print $ "Handle payload " <> payload <> " in its own thread"
--
-- main :: IO ()
-- main = do
-- dbOrError <- acquire "postgres://localhost/db_name"
-- case dbOrError of
-- Right db -> do
-- let channelToListen = toPgIdentifier "sample-channel"
-- listen db channelToListen
-- waitForNotifications notificationHandler db
-- _ -> die "Could not open database connection"
-- @
waitForNotifications ::
-- | Callback function to handle incoming notifications
(ByteString -> ByteString -> IO ()) ->
-- | Connection where we will listen to
Connection ->
IO ()
waitForNotifications sendNotification con =
withLibPQConnection con $ void . forever . pqFetch
where
Expand All @@ -164,16 +176,15 @@ waitForNotifications sendNotification con =
Nothing -> do
mfd <- PQ.socket pqCon
case mfd of
Nothing -> panic "Error checking for PostgreSQL notifications"
#if defined(mingw32_HOST_OS)
Just _ -> do
void $ threadDelay 1000000
#else
Nothing -> void $ threadDelay 1000000
Just fd -> do
void $ threadWaitRead fd
#endif
void $ PQ.consumeInput pqCon

result <- PQ.consumeInput pqCon
unless result $ do
mError <- PQ.errorMessage pqCon
panic $ maybe "Error checking for PostgreSQL notifications" (T.unpack . T.decodeUtf8Lenient) mError
Just notification ->
sendNotification (PQ.notifyRelname notification) (PQ.notifyExtra notification)
sendNotification (PQ.notifyRelname notification) (PQ.notifyExtra notification)
panic :: String -> a
panic a = throw (FatalError a)
2 changes: 1 addition & 1 deletion stack.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
resolver: lts-19.17
resolver: lts-21.7

packages:
- .
Expand Down
8 changes: 4 additions & 4 deletions stack.yaml.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
packages: []
snapshots:
- completed:
size: 619161
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/19/17.yaml
sha256: 7f47507fd037228a8d23cf830f5844e1f006221acebdd7cb49f2f5fb561e0546
original: lts-19.17
sha256: 23bb9bb355bfdb1635252e120a29b712f0d5e8a6c6a65c5ab5bd6692f46c438e
size: 640457
url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/21/7.yaml
original: lts-21.7
32 changes: 18 additions & 14 deletions test/Hasql/NotificationsSpec.hs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
module Hasql.NotificationsSpec (main, spec) where

import Test.Hspec
import Test.QuickCheck

import Control.Concurrent (forkIO, killThread)
import Control.Concurrent.MVar
import Control.Monad (void)
import System.Exit (die)
import Data.ByteString

import Hasql.Connection
import Hasql.Notifications
import System.Exit (die)
import Test.Hspec
import Test.QuickCheck

-- `main` is here so that this module can be run from GHCi on its own. It is
-- not needed for automatic spec discovery.
Expand All @@ -24,15 +22,21 @@ spec = do
it "should call our notification handler" $ do
dbOrError <- acquire "postgres://postgres:roottoor@localhost/hasql_notifications_test"
case dbOrError of
Right db -> do
let channelToListen = toPgIdentifier "test-channel"
mailbox <- newEmptyMVar :: IO (MVar ByteString)
listen db channelToListen
threadId <- forkIO $ waitForNotifications (\channel payload -> putMVar mailbox $ "Just got notification on channel " <> channel <> ": " <> payload) db
notify db (toPgIdentifier "test-channel") "Payload"
takeMVar mailbox `shouldReturn` "Just got notification on channel test-channel: Payload"
_ -> die "Could not open database connection"
Right db -> do
let channelToListen = toPgIdentifier "test-channel"
mailbox <- newEmptyMVar :: IO (MVar ByteString)
listen db channelToListen
threadId <- forkIO $ waitForNotifications (\channel payload -> putMVar mailbox $ "Just got notification on channel " <> channel <> ": " <> payload) db
notify db (toPgIdentifier "test-channel") "Payload"
takeMVar mailbox `shouldReturn` "Just got notification on channel test-channel: Payload"
_ -> die "Could not open database connection"

describe "FatalError show instance" $
it "extracts message" $
( show $
FatalError {fatalErrorMessage = "some message"}
)
`shouldBe` "some message"
describe "toPgIdenfier" $
it "enclose text in quotes doubling existing ones" $
fromPgIdentifier (toPgIdentifier "some \"identifier\"") `shouldBe` "\"some \"\"identifier\"\"\""
fromPgIdentifier (toPgIdentifier "some \"identifier\"") `shouldBe` "\"some \"\"identifier\"\"\""
Loading