| Safe Haskell | None |
|---|---|
| Language | Haskell2010 |
Database.PostgreSQL.Simple.Queue
Description
This module utilize PostgreSQL to implement a durable queue for efficently processing arbitrary payloads which can be represented as JSON.
Typically a producer would enqueue a new payload as part of larger database transaction
createAccount userRecord = do
runDBTSerializable $ do
createUserDB userRecord
enqueueDB $ makeVerificationEmail userRecord
In another thread or process, the consumer would drain the queue.
forever $ do
-- Attempt get a payload or block until one is available
payload <- lock conn
-- Perform application specifc parsing of the payload value
case fromJSON $ pValue payload of
Success x -> sendEmail x -- Perform application specific processing
Error err -> logErr err
-- Remove the payload from future processing
dequeue conn $ pId payload
For a more complete example or a consumer, utilizing the provided
defaultMain, see
EmailQueue.
This modules provides two flavors of functions, a DB API and an IO API.
Most operations are provided in both flavors, with the exception of lock.
lock blocks and would not be that useful as part of a larger transaction
since it would keep the transaction open for a potentially long time. Although
both flavors are provided, in general one versions is more useful for typical
use cases.
Synopsis
- newtype PayloadId = PayloadId {
- unPayloadId :: Int64
- data State
- data Payload = Payload {}
- enqueueDB :: String -> Value -> DB PayloadId
- dequeueDB :: String -> DB (Maybe Payload)
- withPayloadDB :: String -> Int -> (Payload -> IO a) -> DB (Either SomeException (Maybe a))
- getCountDB :: String -> DB Int64
- enqueue :: String -> Connection -> Value -> IO PayloadId
- tryDequeue :: String -> Connection -> IO (Maybe Payload)
- dequeue :: String -> Connection -> IO Payload
- withPayload :: String -> Connection -> Int -> (Payload -> IO a) -> IO (Either SomeException a)
- getCount :: String -> Connection -> IO Int64
Types
Constructors
| PayloadId | |
Fields
| |
Instances
| Eq PayloadId # | |
| Show PayloadId # | |
| FromRow PayloadId # | |
Defined in Database.PostgreSQL.Simple.Queue | |
| FromField PayloadId # | |
Defined in Database.PostgreSQL.Simple.Queue Methods | |
| ToRow PayloadId # | |
Defined in Database.PostgreSQL.Simple.Queue | |
| ToField PayloadId # | |
Defined in Database.PostgreSQL.Simple.Queue | |
A Payload can exist in three states in the queue, Enqueued,
and Dequeued. A Payload starts in the Enqueued state and is locked
so some sort of process can occur with it, usually something in IO.
Once the processing is complete, the Payload is moved the Dequeued
state, which is the terminal state.
Constructors
| Payload | |
DB API
enqueueDB :: String -> Value -> DB PayloadId #
Enqueue a new JSON value into the queue. This particularly function can be composed as part of a larger database transaction. For instance, a single transaction could create a user and enqueue a email message.
createAccount userRecord = do
runDBTSerializable $ do
createUserDB userRecord
enqueueDB $ makeVerificationEmail userRecord
IO API
enqueue :: String -> Connection -> Value -> IO PayloadId #
Enqueue a new JSON value into the queue. See enqueueDB for a version
which can be composed with other queries in a single transaction.
tryDequeue :: String -> Connection -> IO (Maybe Payload) #
Return a the oldest Payload in the Enqueued state or Nothing
if there are no payloads. For a blocking version utilizing PostgreSQL's
NOTIFY and LISTEN, see dequeue. This functions runs dequeueDb as a
ReadCommitted transaction.
See withPayload for an alternative interface that will automatically return
the payload to the Enqueued state if an exception occurs.
dequeue :: String -> Connection -> IO Payload #
Transition a Payload to the Dequeued state. his functions runs
dequeueDB as a Serializable transaction.
Arguments
| :: String | |
| -> Connection | |
| -> Int | retry count |
| -> (Payload -> IO a) | |
| -> IO (Either SomeException a) |
getCount :: String -> Connection -> IO Int64 #
Get the number of rows in the Enqueued state. This function runs
getCountDB in a ReadCommitted transaction.