| Safe Haskell | Safe-Inferred |
|---|---|
| Language | Haskell98 |
Asapo.Producer
Description
To implement an ASAP:O producer, you should only need this interface. It exposes no memory-management functions (like free) or pointers, and is thus safe to use.
Simple Example
Here's some code for a simple producer that connects and sends a message with id "1".
>>>:seti -XOverloadedStrings>>>:{import Asapo.Producer import Control.Applicative (Applicative ((<*>))) import Control.Monad(void) import Data.Either (Either (Left, Right)) import Data.Function (($)) import Data.Functor ((<$>)) import Data.Semigroup (Semigroup ((<>))) import Data.Text (Text, pack) import Data.Text.Encoding (encodeUtf8) import qualified Data.Text.IO as TIO import Data.Time.Clock (secondsToNominalDiffTime) import Data.Word (Word64) import System.IO (IO) import Text.Show (Show (show)) import Prelude () main :: IO () main = withProducer (Endpoint "localhost:8400") (ProcessingThreads 1) TcpHandler ( SourceCredentials { sourceType = RawSource, instanceId = InstanceId "test_instance", pipelineStep = PipelineStep "pipeline_step_1", beamtime = Beamtime "asapo_test", beamline = Beamline "auto", dataSource = DataSource "asapo_source", token = Token "sometoken" } ) (secondsToNominalDiffTime 10) $ \producer -> do TIO.putStrLn "connected, sending data" let responseHandler :: RequestResponse -> IO () responseHandler requestResponse = TIO.putStrLn $ "in response handler, payload " <> responsePayload requestResponse <> ", error " <> pack (show (responseError requestResponse)) send producer (MessageId 1) (FileName "raw/default/1.txt") (Metadata "{\"test\": 3.0}") (DatasetSubstream 0) (DatasetSize 0) NoAutoId (encodeUtf8 "test") DataAndMetadata FilesystemAndDatabase (StreamName "default") responseHandler void $ waitRequestsFinished producer (secondsToNominalDiffTime 10) :}
Synopsis
- newtype ProducerException = ProducerException Text
- newtype Endpoint = Endpoint Text
- newtype ProcessingThreads = ProcessingThreads Int
- data RequestHandlerType
- newtype Error = Error Text
- newtype Metadata = Metadata Text
- data SourceCredentials = SourceCredentials {}
- newtype MessageId = MessageId Word64
- data DeletionFlags
- data Producer
- data LogLevel
- = LogNone
- | LogError
- | LogInfo
- | LogDebug
- | LogWarning
- newtype FileName = FileName Text
- newtype PipelineStep = PipelineStep Text
- newtype Beamtime = Beamtime Text
- newtype DataSource = DataSource Text
- newtype Beamline = Beamline Text
- data StreamInfo = StreamInfo {}
- data SourceType
- newtype StreamName = StreamName Text
- newtype InstanceId = InstanceId Text
- newtype Token = Token Text
- newtype DatasetSubstream = DatasetSubstream Int
- newtype DatasetSize = DatasetSize Int
- data VersionInfo = VersionInfo {}
- data UpsertMode
- data MetadataIngestMode
- data AutoIdFlag
- data TransferFlag
- data StorageFlag
- data RequestResponse = RequestResponse {}
- data Opcode
- data GenericRequestHeader = GenericRequestHeader {}
- withProducer :: forall a. Endpoint -> ProcessingThreads -> RequestHandlerType -> SourceCredentials -> NominalDiffTime -> (Producer -> IO a) -> IO a
- getVersionInfo :: Producer -> IO VersionInfo
- getStreamInfo :: Producer -> StreamName -> NominalDiffTime -> IO StreamInfo
- getLastStream :: Producer -> NominalDiffTime -> IO StreamInfo
- getBeamtimeMeta :: Producer -> NominalDiffTime -> IO (Maybe Text)
- getStreamMeta :: Producer -> StreamName -> NominalDiffTime -> IO (Maybe Text)
- getRequestsQueueSize :: Producer -> IO Int
- getRequestsQueueVolumeMb :: Producer -> IO Int
- send :: Producer -> MessageId -> FileName -> Metadata -> DatasetSubstream -> DatasetSize -> AutoIdFlag -> ByteString -> TransferFlag -> StorageFlag -> StreamName -> (RequestResponse -> IO ()) -> IO Int
- sendFile :: Producer -> MessageId -> FileName -> Metadata -> DatasetSubstream -> DatasetSize -> AutoIdFlag -> Int -> FileName -> TransferFlag -> StorageFlag -> StreamName -> (RequestResponse -> IO ()) -> IO Int
- sendStreamFinishedFlag :: Producer -> StreamName -> MessageId -> StreamName -> (RequestResponse -> IO ()) -> IO Int
- sendBeamtimeMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> (RequestResponse -> IO ()) -> IO Int
- sendStreamMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> StreamName -> (RequestResponse -> IO ()) -> IO Int
- deleteStream :: Producer -> StreamName -> NominalDiffTime -> [DeletionFlags] -> IO Int
- setLogLevel :: Producer -> LogLevel -> IO ()
- enableLocalLog :: Producer -> Bool -> IO ()
- enableRemoteLog :: Producer -> Bool -> IO ()
- setCredentials :: Producer -> SourceCredentials -> IO Int
- setRequestsQueueLimits :: Producer -> Int -> Int -> IO ()
- messageIdFromInt :: Integral a => a -> MessageId
- waitRequestsFinished :: Producer -> NominalDiffTime -> IO Int
Types
newtype ProducerException Source #
Constructors
| ProducerException Text |
Instances
| Exception ProducerException Source # | |
Defined in Asapo.Producer Methods toException :: ProducerException -> SomeException # | |
| Show ProducerException Source # | |
Defined in Asapo.Producer Methods showsPrec :: Int -> ProducerException -> ShowS # show :: ProducerException -> String # showList :: [ProducerException] -> ShowS # | |
Wrapper around an ASAP:O producer endpoint (usually something like "host:port")
newtype ProcessingThreads Source #
Wrapper around the number of ASAP:O processing threads (simply to make call signatures mor readable)
Constructors
| ProcessingThreads Int |
data RequestHandlerType Source #
This has no documentation in ASAP:O yet
Constructors
| TcpHandler | |
| FilesystemHandler |
Wrapper around an ASAP:O producer error. Note that there is only an error "explanation" here, no error code, since the C interface does not expose this.
data SourceCredentials Source #
Constructors
| SourceCredentials | |
Fields
| |
data DeletionFlags Source #
Constructors
| DeleteMeta | Delete metadata also |
| DeleteErrorOnNotExist | Don't throw an error if the data doesn't exist anyways |
Instances
| Eq DeletionFlags Source # | |
Defined in Asapo.Either.Producer Methods (==) :: DeletionFlags -> DeletionFlags -> Bool # (/=) :: DeletionFlags -> DeletionFlags -> Bool # | |
Constructors
| LogNone | |
| LogError | |
| LogInfo | |
| LogDebug | |
| LogWarning |
Wrapper around file name (dubious to use Text here, but fine for now)
newtype PipelineStep Source #
Constructors
| PipelineStep Text |
newtype DataSource Source #
Constructors
| DataSource Text |
data StreamInfo Source #
Constructors
| StreamInfo | |
Instances
| Show StreamInfo Source # | |
Defined in Asapo.Either.Common Methods showsPrec :: Int -> StreamInfo -> ShowS # show :: StreamInfo -> String # showList :: [StreamInfo] -> ShowS # | |
data SourceType Source #
Constructors
| RawSource | |
| ProcessedSource |
newtype StreamName Source #
Constructors
| StreamName Text |
Instances
| Show StreamName Source # | |
Defined in Asapo.Either.Common Methods showsPrec :: Int -> StreamName -> ShowS # show :: StreamName -> String # showList :: [StreamName] -> ShowS # | |
newtype InstanceId Source #
Constructors
| InstanceId Text |
newtype DatasetSubstream Source #
Wrapper around the substream to use
Constructors
| DatasetSubstream Int |
data VersionInfo Source #
Constructors
| VersionInfo | |
Fields
| |
Instances
| Show VersionInfo Source # | |
Defined in Asapo.Either.Producer Methods showsPrec :: Int -> VersionInfo -> ShowS # show :: VersionInfo -> String # showList :: [VersionInfo] -> ShowS # | |
data UpsertMode Source #
data AutoIdFlag Source #
Anti-boolean-blindness for the "auto id" flag in the message header
Instances
| Eq AutoIdFlag Source # | |
Defined in Asapo.Either.Producer | |
data StorageFlag Source #
Where to store the data
Constructors
| Filesystem | |
| Database | |
| FilesystemAndDatabase |
data RequestResponse Source #
Information about the request and its response, to be used in the ASAP:O send callback
Constructors
| RequestResponse | |
data GenericRequestHeader Source #
Information about the send request, to be used in the ASAP:O send callback
Constructors
| GenericRequestHeader | |
Fields | |
Initialization
Arguments
| :: forall a. Endpoint | |
| -> ProcessingThreads | |
| -> RequestHandlerType | |
| -> SourceCredentials | |
| -> NominalDiffTime | timeout |
| -> (Producer -> IO a) | |
| -> IO a |
Create a producer and do something with it. This is the main entrypoint into the producer.
Getters
getVersionInfo :: Producer -> IO VersionInfo Source #
Retrieve producer version info
Arguments
| :: Producer | |
| -> StreamName | |
| -> NominalDiffTime | Timeout |
| -> IO StreamInfo |
Retrieve info for a single stream
Arguments
| :: Producer | |
| -> NominalDiffTime | Timeout |
| -> IO StreamInfo |
Retrieve info for the latest stream
Arguments
| :: Producer | |
| -> NominalDiffTime | timeout |
| -> IO (Maybe Text) |
Retrieve metadata for the given stream (which might be missing, in which case Nothing is returned)
Arguments
| :: Producer | |
| -> StreamName | |
| -> NominalDiffTime | timeout |
| -> IO (Maybe Text) |
Retrieve metadata for the given stream (which might be missing, in which case Nothing is returned)
getRequestsQueueSize :: Producer -> IO Int Source #
Get current size of the requests queue (number of requests pending/being processed)
getRequestsQueueVolumeMb :: Producer -> IO Int Source #
Get current volume of the requests queue (total memory of occupied by pending/being processed requests)
Modifiers
Arguments
| :: Producer | |
| -> MessageId | |
| -> FileName | |
| -> Metadata | |
| -> DatasetSubstream | |
| -> DatasetSize | |
| -> AutoIdFlag | |
| -> ByteString | Actual data to send |
| -> TransferFlag | |
| -> StorageFlag | |
| -> StreamName | |
| -> (RequestResponse -> IO ()) | |
| -> IO Int |
Send a message containing raw data. Due to newtype and enum usage, all parameter should be self-explanatory
Arguments
| :: Producer | |
| -> MessageId | |
| -> FileName | File name to put into the message header |
| -> Metadata | |
| -> DatasetSubstream | |
| -> DatasetSize | |
| -> AutoIdFlag | |
| -> Int | Size |
| -> FileName | File to actually send |
| -> TransferFlag | |
| -> StorageFlag | |
| -> StreamName | |
| -> (RequestResponse -> IO ()) | |
| -> IO Int |
Send a message containing a file. Due to newtype and enum usage, all parameter should be self-explanatory
sendStreamFinishedFlag :: Producer -> StreamName -> MessageId -> StreamName -> (RequestResponse -> IO ()) -> IO Int Source #
As the title says, send the "stream finished" flag
sendBeamtimeMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> (RequestResponse -> IO ()) -> IO Int Source #
Send or extend beamtime metadata
sendStreamMetadata :: Producer -> Metadata -> MetadataIngestMode -> UpsertMode -> StreamName -> (RequestResponse -> IO ()) -> IO Int Source #
Send or extend stream metadata
Arguments
| :: Producer | |
| -> StreamName | |
| -> NominalDiffTime | timeout |
| -> [DeletionFlags] | |
| -> IO Int |
setCredentials :: Producer -> SourceCredentials -> IO Int Source #
Set a different set of credentials
setRequestsQueueLimits Source #
Set maximum size of the requests queue
messageIdFromInt :: Integral a => a -> MessageId Source #
waitRequestsFinished :: Producer -> NominalDiffTime -> IO Int Source #
Wait for all outstanding requests to finish