milena-0.5.2.3: A Kafka client for Haskell.

Safe HaskellNone
LanguageHaskell2010

Network.Kafka

Contents

Synopsis

Documentation

data KafkaState #

Constructors

KafkaState 

Fields

Instances
Show KafkaState # 
Instance details

Defined in Network.Kafka

Generic KafkaState # 
Instance details

Defined in Network.Kafka

Associated Types

type Rep KafkaState :: * -> * #

type Rep KafkaState # 
Instance details

Defined in Network.Kafka

data KafkaClientError #

Errors given from the Kafka monad.

Constructors

KafkaNoOffset

A response did not contain an offset.

KafkaDeserializationError String

A value could not be deserialized correctly.

KafkaInvalidBroker Leader

Could not find a cached broker for the found leader.

KafkaFailedToFetchMetadata 
KafkaIOException IOException 
Instances
Eq KafkaClientError # 
Instance details

Defined in Network.Kafka

Show KafkaClientError # 
Instance details

Defined in Network.Kafka

Generic KafkaClientError # 
Instance details

Defined in Network.Kafka

Associated Types

type Rep KafkaClientError :: * -> * #

Exception KafkaClientError # 
Instance details

Defined in Network.Kafka

type Rep KafkaClientError # 
Instance details

Defined in Network.Kafka

type Rep KafkaClientError = D1 (MetaData "KafkaClientError" "Network.Kafka" "milena-0.5.2.3-EPtlCrT9mL2wAhWEIFpL" False) ((C1 (MetaCons "KafkaNoOffset" PrefixI False) (U1 :: * -> *) :+: C1 (MetaCons "KafkaDeserializationError" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 String))) :+: (C1 (MetaCons "KafkaInvalidBroker" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Leader)) :+: (C1 (MetaCons "KafkaFailedToFetchMetadata" PrefixI False) (U1 :: * -> *) :+: C1 (MetaCons "KafkaIOException" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 IOException)))))

data KafkaTime #

An abstract form of Kafka's time. Used for querying offsets.

Constructors

LatestTime

The latest time on the broker.

EarliestTime

The earliest time on the broker.

OtherTime Time

A specific time.

Instances
Eq KafkaTime # 
Instance details

Defined in Network.Kafka

Generic KafkaTime # 
Instance details

Defined in Network.Kafka

Associated Types

type Rep KafkaTime :: * -> * #

type Rep KafkaTime # 
Instance details

Defined in Network.Kafka

type Rep KafkaTime = D1 (MetaData "KafkaTime" "Network.Kafka" "milena-0.5.2.3-EPtlCrT9mL2wAhWEIFpL" False) (C1 (MetaCons "LatestTime" PrefixI False) (U1 :: * -> *) :+: (C1 (MetaCons "EarliestTime" PrefixI False) (U1 :: * -> *) :+: C1 (MetaCons "OtherTime" PrefixI False) (S1 (MetaSel (Nothing :: Maybe Symbol) NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Time))))

data PartitionAndLeader #

Instances
Eq PartitionAndLeader # 
Instance details

Defined in Network.Kafka

Ord PartitionAndLeader # 
Instance details

Defined in Network.Kafka

Show PartitionAndLeader # 
Instance details

Defined in Network.Kafka

Generic PartitionAndLeader # 
Instance details

Defined in Network.Kafka

Associated Types

type Rep PartitionAndLeader :: * -> * #

type Rep PartitionAndLeader # 
Instance details

Defined in Network.Kafka

type Rep PartitionAndLeader = D1 (MetaData "PartitionAndLeader" "Network.Kafka" "milena-0.5.2.3-EPtlCrT9mL2wAhWEIFpL" False) (C1 (MetaCons "PartitionAndLeader" PrefixI True) (S1 (MetaSel (Just "_palTopic") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 TopicName) :*: (S1 (MetaSel (Just "_palPartition") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Partition) :*: S1 (MetaSel (Just "_palLeader") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Leader))))

data TopicAndPartition #

Instances
Eq TopicAndPartition # 
Instance details

Defined in Network.Kafka

Ord TopicAndPartition # 
Instance details

Defined in Network.Kafka

Show TopicAndPartition # 
Instance details

Defined in Network.Kafka

Generic TopicAndPartition # 
Instance details

Defined in Network.Kafka

Associated Types

type Rep TopicAndPartition :: * -> * #

type Rep TopicAndPartition # 
Instance details

Defined in Network.Kafka

type Rep TopicAndPartition = D1 (MetaData "TopicAndPartition" "Network.Kafka" "milena-0.5.2.3-EPtlCrT9mL2wAhWEIFpL" False) (C1 (MetaCons "TopicAndPartition" PrefixI True) (S1 (MetaSel (Just "_tapTopic") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 TopicName) :*: S1 (MetaSel (Just "_tapPartition") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Partition)))

data TopicAndMessage #

A topic with a serializable message.

Instances
Eq TopicAndMessage # 
Instance details

Defined in Network.Kafka

Show TopicAndMessage # 
Instance details

Defined in Network.Kafka

Generic TopicAndMessage # 
Instance details

Defined in Network.Kafka

Associated Types

type Rep TopicAndMessage :: * -> * #

type Rep TopicAndMessage # 
Instance details

Defined in Network.Kafka

type Rep TopicAndMessage = D1 (MetaData "TopicAndMessage" "Network.Kafka" "milena-0.5.2.3-EPtlCrT9mL2wAhWEIFpL" False) (C1 (MetaCons "TopicAndMessage" PrefixI True) (S1 (MetaSel (Just "_tamTopic") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 TopicName) :*: S1 (MetaSel (Just "_tamMessage") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Message)))

tamPayload :: TopicAndMessage -> ByteString #

Get the bytes from the Kafka message, ignoring the topic.

Configuration

defaultMaxBytes :: MaxBytes #

Default: 1024 * 1024

mkKafkaState :: KafkaClientId -> KafkaAddress -> KafkaState #

Create a consumer using default values.

runKafka :: KafkaState -> StateT KafkaState (ExceptT KafkaClientError IO) a -> IO (Either KafkaClientError a) #

Run the underlying Kafka monad.

tryKafka :: Kafka m => m a -> m a #

Catch IOExceptions and wrap them in KafkaIOExceptions.

makeRequest :: Kafka m => Handle -> ReqResp (m a) -> m a #

Make a request, incrementing the _stateCorrelationId.

metadata :: Kafka m => MetadataRequest -> m MetadataResponse #

Send a metadata request to any broker.

metadata' :: Kafka m => Handle -> MetadataRequest -> m MetadataResponse #

Send a metadata request.

expect :: Kafka m => KafkaClientError -> (a -> Maybe b) -> a -> m b #

brokerPartitionInfo :: Kafka m => TopicName -> m (Set PartitionAndLeader) #

Find a leader and partition for the topic.

protocolTime :: KafkaTime -> Time #

Convert an abstract time to a serializable protocol value.

updateMetadatas :: Kafka m => [TopicName] -> m () #

withBrokerHandle :: Kafka m => Broker -> (Handle -> m a) -> m a #

Execute a Kafka action with a Handle for the given Broker, updating the connections cache if needed.

When the action throws an IOException, it is caught and returned as a KafkaIOException in the Kafka monad.

Note that when the given action throws an exception, any state changes will be discarded. This includes both IOExceptions and exceptions thrown by throwError from Except.

withAddressHandle :: Kafka m => KafkaAddress -> (Handle -> m a) -> m a #

Execute a Kafka action with a Handle for the given KafkaAddress, updating the connections cache if needed.

When the action throws an IOException, it is caught and returned as a KafkaIOException in the Kafka monad.

Note that when the given action throws an exception, any state changes will be discarded. This includes both IOExceptions and exceptions thrown by throwError from Except.

withAnyHandle :: Kafka m => (Handle -> m a) -> m a #

Like withAddressHandle, but round-robins the addresses in the KafkaState.

When the action throws an IOException, it is caught and returned as a KafkaIOException in the Kafka monad.

Note that when the given action throws an exception, any state changes will be discarded. This includes both IOExceptions and exceptions thrown by throwError from Except.

Offsets

data PartitionOffsetRequestInfo #

Fields to construct an offset request, per topic and partition.

Constructors

PartitionOffsetRequestInfo 

Fields

getLastOffset :: Kafka m => KafkaTime -> Partition -> TopicName -> m Offset #

Get the first found offset.

getLastOffset' :: Kafka m => Handle -> KafkaTime -> Partition -> TopicName -> m Offset #

Get the first found offset.