postgresql-simple-queue-1.0.1: A PostgreSQL backed queue

Safe HaskellNone
LanguageHaskell2010

Database.PostgreSQL.Simple.Queue

Contents

Description

This module utilize PostgreSQL to implement a durable queue for efficently processing arbitrary payloads which can be represented as JSON.

Typically a producer would enqueue a new payload as part of larger database transaction

  createAccount userRecord = do
     runDBTSerializable $ do
        createUserDB userRecord
        enqueueDB $ makeVerificationEmail userRecord

In another thread or process, the consumer would drain the queue.

   forever $ do
     -- Attempt get a payload or block until one is available
     payload <- lock conn

     -- Perform application specifc parsing of the payload value
     case fromJSON $ pValue payload of
       Success x -> sendEmail x -- Perform application specific processing
       Error err -> logErr err

     -- Remove the payload from future processing
     dequeue conn $ pId payload

For a more complete example or a consumer, utilizing the provided defaultMain, see EmailQueue.

This modules provides two flavors of functions, a DB API and an IO API. Most operations are provided in both flavors, with the exception of lock. lock blocks and would not be that useful as part of a larger transaction since it would keep the transaction open for a potentially long time. Although both flavors are provided, in general one versions is more useful for typical use cases.

Synopsis

Types

newtype PayloadId #

Constructors

PayloadId 

Fields

Instances
Eq PayloadId # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

Show PayloadId # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

FromRow PayloadId # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

FromField PayloadId # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

ToRow PayloadId # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

Methods

toRow :: PayloadId -> [Action] #

ToField PayloadId # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

Methods

toField :: PayloadId -> Action #

data State #

A Payload can exist in three states in the queue, Enqueued, and Dequeued. A Payload starts in the Enqueued state and is locked so some sort of process can occur with it, usually something in IO. Once the processing is complete, the Payload is moved the Dequeued state, which is the terminal state.

Constructors

Enqueued 
Dequeued 
Instances
Bounded State # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

Enum State # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

Eq State # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

Methods

(==) :: State -> State -> Bool #

(/=) :: State -> State -> Bool #

Ord State # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

Methods

compare :: State -> State -> Ordering #

(<) :: State -> State -> Bool #

(<=) :: State -> State -> Bool #

(>) :: State -> State -> Bool #

(>=) :: State -> State -> Bool #

max :: State -> State -> State #

min :: State -> State -> State #

Show State # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

Methods

showsPrec :: Int -> State -> ShowS #

show :: State -> String #

showList :: [State] -> ShowS #

FromField State # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

ToField State # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

Methods

toField :: State -> Action #

data Payload #

Constructors

Payload 

Fields

Instances
Eq Payload # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

Methods

(==) :: Payload -> Payload -> Bool #

(/=) :: Payload -> Payload -> Bool #

Show Payload # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

FromRow Payload # 
Instance details

Defined in Database.PostgreSQL.Simple.Queue

DB API

enqueueDB :: String -> Value -> DB PayloadId #

Enqueue a new JSON value into the queue. This particularly function can be composed as part of a larger database transaction. For instance, a single transaction could create a user and enqueue a email message.

  createAccount userRecord = do
     runDBTSerializable $ do
        createUserDB userRecord
        enqueueDB $ makeVerificationEmail userRecord

dequeueDB :: String -> DB (Maybe Payload) #

Transition a Payload to the Dequeued state.

withPayloadDB #

Arguments

:: String

schema

-> Int

retry count

-> (Payload -> IO a)

payload processing function

-> DB (Either SomeException (Maybe a)) 

Attempt to get a payload and process it. If the function passed in throws an exception return it on the left side of the Either. Re-add the payload up to some passed in maximum. Return Nothing is the payloads table is empty otherwise the result is an a from the payload ingesting function.

getCountDB :: String -> DB Int64 #

Get the number of rows in the Enqueued state.

IO API

enqueue :: String -> Connection -> Value -> IO PayloadId #

Enqueue a new JSON value into the queue. See enqueueDB for a version which can be composed with other queries in a single transaction.

tryDequeue :: String -> Connection -> IO (Maybe Payload) #

Return a the oldest Payload in the Enqueued state or Nothing if there are no payloads. For a blocking version utilizing PostgreSQL's NOTIFY and LISTEN, see dequeue. This functions runs dequeueDb as a ReadCommitted transaction.

See withPayload for an alternative interface that will automatically return the payload to the Enqueued state if an exception occurs.

dequeue :: String -> Connection -> IO Payload #

Transition a Payload to the Dequeued state. his functions runs dequeueDB as a Serializable transaction.

withPayload #

Arguments

:: String 
-> Connection 
-> Int

retry count

-> (Payload -> IO a) 
-> IO (Either SomeException a) 

Return the oldest Payload in the Enqueued state or block until a payload arrives. This function utilizes PostgreSQL's LISTEN and NOTIFY functionality to avoid excessively polling of the DB while waiting for new payloads, without scarficing promptness.

getCount :: String -> Connection -> IO Int64 #

Get the number of rows in the Enqueued state. This function runs getCountDB in a ReadCommitted transaction.