{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.Client.Run where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import qualified Data.ByteString.UTF8 as UTF8
import Data.IORef
import Data.IP (IPv6)
import Network.Control (RxFlow (..), defaultMaxData)
import Network.HTTP.Semantics.Client
import Network.HTTP.Semantics.Client.Internal
import Network.HTTP.Semantics.IO
import Network.Socket (SockAddr)
import qualified System.ThreadManager as T
import Text.Read (readMaybe)
import Imports
import Network.HTTP2.Frame
import Network.HTTP2.H2
data ClientConfig = ClientConfig
{ ClientConfig -> Scheme
scheme :: Scheme
, ClientConfig -> Authority
authority :: Authority
, ClientConfig -> Int
cacheLimit :: Int
, ClientConfig -> Int
connectionWindowSize :: WindowSize
, ClientConfig -> Settings
settings :: Settings
}
deriving (ClientConfig -> ClientConfig -> Bool
(ClientConfig -> ClientConfig -> Bool)
-> (ClientConfig -> ClientConfig -> Bool) -> Eq ClientConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ClientConfig -> ClientConfig -> Bool
== :: ClientConfig -> ClientConfig -> Bool
$c/= :: ClientConfig -> ClientConfig -> Bool
/= :: ClientConfig -> ClientConfig -> Bool
Eq, Int -> ClientConfig -> ShowS
[ClientConfig] -> ShowS
ClientConfig -> Authority
(Int -> ClientConfig -> ShowS)
-> (ClientConfig -> Authority)
-> ([ClientConfig] -> ShowS)
-> Show ClientConfig
forall a.
(Int -> a -> ShowS) -> (a -> Authority) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ClientConfig -> ShowS
showsPrec :: Int -> ClientConfig -> ShowS
$cshow :: ClientConfig -> Authority
show :: ClientConfig -> Authority
$cshowList :: [ClientConfig] -> ShowS
showList :: [ClientConfig] -> ShowS
Show)
defaultClientConfig :: ClientConfig
defaultClientConfig :: ClientConfig
defaultClientConfig =
ClientConfig
{ scheme :: Scheme
scheme = Scheme
"http"
, authority :: Authority
authority = Authority
"localhost"
, cacheLimit :: Int
cacheLimit = Int
64
, connectionWindowSize :: Int
connectionWindowSize = Int
defaultMaxData
, settings :: Settings
settings = Settings
defaultSettings
}
run :: ClientConfig -> Config -> Client a -> IO a
run :: forall a. ClientConfig -> Config -> Client a -> IO a
run cconf :: ClientConfig
cconf@ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} Config
conf Client a
client = do
ctx <- ClientConfig -> Config -> IO Context
setup ClientConfig
cconf Config
conf
runH2 conf ctx $ runClient ctx
where
serverMaxStreams :: Context -> IO Int
serverMaxStreams Context
ctx = do
mx <- Settings -> Maybe Int
maxConcurrentStreams (Settings -> Maybe Int) -> IO Settings -> IO (Maybe Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef (Context -> IORef Settings
peerSettings Context
ctx)
case mx of
Maybe Int
Nothing -> Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
forall a. Bounded a => a
maxBound
Just Int
x -> Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
x
possibleClientStream :: Context -> IO Int
possibleClientStream Context
ctx = do
x <- Context -> IO Int
serverMaxStreams Context
ctx
n <- oddConc <$> readTVarIO (oddStreamTable ctx)
return (x - n)
aux :: Context -> Aux
aux Context
ctx =
Aux
{ auxPossibleClientStreams :: IO Int
auxPossibleClientStreams = Context -> IO Int
possibleClientStream Context
ctx
}
clientCore :: Context -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Request
req Response -> IO b
processResponse = do
(strm, moutobj) <- Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream Context
ctx Scheme
scheme Authority
authority Request
req
case moutobj of
Maybe OutObj
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just OutObj
outobj -> Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config
conf Context
ctx Stream
strm OutObj
outobj Bool
False
rsp <- getResponse strm
x <- processResponse rsp
adjustRxWindow ctx strm
return x
runClient :: Context -> IO a
runClient Context
ctx = Client a
client (Context -> Request -> (Response -> IO r) -> IO r
forall {b}. Context -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx) (Aux -> IO a) -> Aux -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> Aux
aux Context
ctx
runIO :: ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO :: forall a. ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO cconf :: ClientConfig
cconf@ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} conf :: Config
conf@Config{Int
Buffer
SockAddr
Manager
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
confPeerSockAddr :: Config -> SockAddr
confMySockAddr :: Config -> SockAddr
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> Int -> IO Scheme
confSendAll :: Config -> Scheme -> IO ()
confBufferSize :: Config -> Int
confWriteBuffer :: Config -> Buffer
..} ClientIO -> IO (IO a)
action = do
ctx@Context{..} <- ClientConfig -> Config -> IO Context
setup ClientConfig
cconf Config
conf
let putB Scheme
bs = TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [Scheme
bs]
putR Request
req = do
(strm, moutobj) <- Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream Context
ctx Scheme
scheme Authority
authority Request
req
case moutobj of
Maybe OutObj
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just OutObj
outobj -> Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config
conf Context
ctx Stream
strm OutObj
outobj Bool
True
return (streamNumber strm, strm)
get = Stream -> IO Response
getResponse
create = Context -> IO (Int, Stream)
openOddStreamWait Context
ctx
runClient <-
action $ ClientIO confMySockAddr confPeerSockAddr putR get putB create
runH2 conf ctx runClient
getResponse :: Stream -> IO Response
getResponse :: Stream -> IO Response
getResponse Stream
strm = do
mRsp <- MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall a. MVar a -> IO a
takeMVar (MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj))
-> MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall a b. (a -> b) -> a -> b
$ Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm
case mRsp of
Left SomeException
err -> SomeException -> IO Response
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
err
Right InpObj
rsp -> Response -> IO Response
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Response -> IO Response) -> Response -> IO Response
forall a b. (a -> b) -> a -> b
$ InpObj -> Response
Response InpObj
rsp
setup :: ClientConfig -> Config -> IO Context
setup :: ClientConfig -> Config -> IO Context
setup ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} conf :: Config
conf@Config{Int
Buffer
SockAddr
Manager
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confPeerSockAddr :: Config -> SockAddr
confMySockAddr :: Config -> SockAddr
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> Int -> IO Scheme
confSendAll :: Config -> Scheme -> IO ()
confBufferSize :: Config -> Int
confWriteBuffer :: Config -> Buffer
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} = do
let clientInfo :: RoleInfo
clientInfo = Scheme -> Authority -> RoleInfo
newClientInfo Scheme
scheme Authority
authority
ctx <-
RoleInfo
-> Config -> Int -> Int -> Settings -> Manager -> IO Context
newContext
RoleInfo
clientInfo
Config
conf
Int
cacheLimit
Int
connectionWindowSize
Settings
settings
Manager
confTimeoutManager
exchangeSettings ctx
return ctx
runH2 :: Config -> Context -> IO a -> IO a
runH2 :: forall a. Config -> Context -> IO a -> IO a
runH2 Config
conf Context
ctx IO a
runClient = do
ThreadManager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
forall a.
ThreadManager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
T.stopAfter ThreadManager
mgr (IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try IO a
runAll IO (Either SomeException a)
-> (Either SomeException a -> IO a) -> IO a
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Config -> Either SomeException a -> IO a
forall a. Config -> Either SomeException a -> IO a
closureClient Config
conf) ((Maybe SomeException -> IO ()) -> IO a)
-> (Maybe SomeException -> IO ()) -> IO a
forall a b. (a -> b) -> a -> b
$ \Maybe SomeException
res ->
TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams (Context -> TVar OddStreamTable
oddStreamTable Context
ctx) (Context -> TVar EvenStreamTable
evenStreamTable Context
ctx) Maybe SomeException
res
where
mgr :: ThreadManager
mgr = Context -> ThreadManager
threadManager Context
ctx
runReceiver :: IO ()
runReceiver = Context -> Config -> IO ()
frameReceiver Context
ctx Config
conf
runSender :: IO ()
runSender = Context -> Config -> IO ()
frameSender Context
ctx Config
conf
runBackgroundThreads :: IO ()
runBackgroundThreads = do
Authority -> IO ()
labelMe Authority
"H2 runBackgroundThreads"
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
concurrently_ IO ()
runReceiver IO ()
runSender
runAll :: IO a
runAll = do
er <- IO () -> IO a -> IO (Either () a)
forall a b. IO a -> IO b -> IO (Either a b)
race IO ()
runBackgroundThreads IO a
runClient
case er of
Left () -> IO a
forall a. HasCallStack => a
undefined
Right a
r -> a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r
makeStream
:: Context
-> Scheme
-> Authority
-> Request
-> IO (Stream, Maybe OutObj)
makeStream :: Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream ctx :: Context
ctx@Context{TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> TVar Int
continued :: Context -> IORef (Maybe Int)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} Scheme
scheme Authority
auth (Request OutObj
req) = do
let hdr0 :: [Header]
hdr0 = OutObj -> [Header]
outObjHeaders OutObj
req
method :: Scheme
method = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"makeStream:method") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":method" [Header]
hdr0
path :: Scheme
path = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"makeStream:path") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":path" [Header]
hdr0
mstrm0 <- TVar EvenStreamTable -> Scheme -> Scheme -> IO (Maybe Stream)
lookupEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
case mstrm0 of
Just Stream
strm0 -> do
TVar EvenStreamTable -> Scheme -> Scheme -> IO ()
deleteEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
(Stream, Maybe OutObj) -> IO (Stream, Maybe OutObj)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream
strm0, Maybe OutObj
forall a. Maybe a
Nothing)
Maybe Stream
Nothing -> do
let isIPv6 :: Bool
isIPv6 = Maybe IPv6 -> Bool
forall a. Maybe a -> Bool
isJust (Authority -> Maybe IPv6
forall a. Read a => Authority -> Maybe a
readMaybe Authority
auth :: Maybe IPv6)
auth' :: Scheme
auth'
| Bool
isIPv6 = Scheme
"[" Scheme -> Scheme -> Scheme
forall a. Semigroup a => a -> a -> a
<> Authority -> Scheme
UTF8.fromString Authority
auth Scheme -> Scheme -> Scheme
forall a. Semigroup a => a -> a -> a
<> Scheme
"]"
| Bool
otherwise = Authority -> Scheme
UTF8.fromString Authority
auth
let hdr1, hdr2 :: [Header]
hdr1 :: [Header]
hdr1
| Scheme
scheme Scheme -> Scheme -> Bool
forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":scheme", Scheme
scheme) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr0
| Bool
otherwise = [Header]
hdr0
hdr2 :: [Header]
hdr2
| Authority
auth Authority -> Authority -> Bool
forall a. Eq a => a -> a -> Bool
/= Authority
"" = (HeaderName
":authority", Scheme
auth') Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr1
| Bool
otherwise = [Header]
hdr1
req' :: OutObj
req' = OutObj
req{outObjHeaders = hdr2}
(_sid, newstrm) <- Context -> IO (Int, Stream)
openOddStreamWait Context
ctx
return (newstrm, Just req')
sendRequest :: Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest :: Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config{Int
Buffer
SockAddr
Manager
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confPeerSockAddr :: Config -> SockAddr
confMySockAddr :: Config -> SockAddr
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> Int -> IO Scheme
confSendAll :: Config -> Scheme -> IO ()
confBufferSize :: Config -> Int
confWriteBuffer :: Config -> Buffer
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} ctx :: Context
ctx@Context{TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> TVar Int
continued :: Context -> IORef (Maybe Int)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} Stream
strm OutObj{[Header]
OutBody
TrailersMaker
outObjHeaders :: OutObj -> [Header]
outObjHeaders :: [Header]
outObjBody :: OutBody
outObjTrailers :: TrailersMaker
outObjBody :: OutObj -> OutBody
outObjTrailers :: OutObj -> TrailersMaker
..} Bool
io = do
let sid :: Int
sid = Stream -> Int
streamNumber Stream
strm
(mnext, mtbq) <- case OutBody
outObjBody of
OutBody
OutBodyNone -> (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DynaNext
forall a. Maybe a
Nothing, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
OutBodyFile (FileSpec Authority
path FileOffset
fileoff FileOffset
bytecount) -> do
(pread, sentinel) <- PositionReadMaker
confPositionReadMaker Authority
path
let next = PositionRead -> FileOffset -> FileOffset -> Sentinel -> DynaNext
fillFileBodyGetNext PositionRead
pread FileOffset
fileoff FileOffset
bytecount Sentinel
sentinel
return (Just next, Nothing)
OutBodyBuilder Builder
builder -> do
let next :: DynaNext
next = Builder -> DynaNext
fillBuilderBodyGetNext Builder
builder
(Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm ((OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk))
-> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
forall a b. (a -> b) -> a -> b
$ \OutBodyIface
iface ->
OutBodyIface -> forall x. IO x -> IO x
outBodyUnmask OutBodyIface
iface (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (Builder -> IO ()) -> IO () -> IO ()
strmbdy (OutBodyIface -> Builder -> IO ()
outBodyPush OutBodyIface
iface) (OutBodyIface -> IO ()
outBodyFlush OutBodyIface
iface)
let next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
return (Just next, Just q)
OutBodyStreamingIface OutBodyIface -> IO ()
strmbdy -> do
q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm OutBodyIface -> IO ()
strmbdy
let next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
return (Just next, Just q)
let ot = [Header] -> Maybe DynaNext -> TrailersMaker -> OutputType
OHeader [Header]
outObjHeaders Maybe DynaNext
mnext TrailersMaker
outObjTrailers
if io
then do
let out = Context -> Stream -> OutputType -> Output
makeOutputIO Context
ctx Stream
strm OutputType
ot
pushOutput sid out
else do
(pop, out) <- makeOutput strm ot
pushOutput sid out
lc <- newLoopCheck strm mtbq
T.forkManaged threadManager label $ syncWithSender' ctx pop lc
where
label :: Authority
label = Authority
"H2 request sender for stream " Authority -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> Authority
forall a. Show a => a -> Authority
show (Stream -> Int
streamNumber Stream
strm)
pushOutput :: Int -> Output -> IO ()
pushOutput Int
sid Output
out = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
sidOK <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
outputQStreamID
check (sidOK == sid)
writeTVar outputQStreamID (sid + 2)
enqueueOutputSTM outputQ out
sendStreaming
:: Context
-> Stream
-> (OutBodyIface -> IO ())
-> IO (TBQueue StreamingChunk)
sendStreaming :: Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context{TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> TVar Int
continued :: Context -> IORef (Maybe Int)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} Stream
strm OutBodyIface -> IO ()
strmbdy = do
tbq <- Natural -> IO (TBQueue StreamingChunk)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10
T.forkManagedUnmask threadManager label $ \forall x. IO x -> IO x
unmask ->
TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO ()) -> IO ()
forall r.
TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO r) -> IO r
withOutBodyIface TBQueue StreamingChunk
tbq IO a -> IO a
forall x. IO x -> IO x
unmask OutBodyIface -> IO ()
strmbdy
return tbq
where
label :: Authority
label = Authority
"H2 request streaming sender for stream " Authority -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> Authority
forall a. Show a => a -> Authority
show (Stream -> Int
streamNumber Stream
strm)
exchangeSettings :: Context -> IO ()
exchangeSettings :: Context -> IO ()
exchangeSettings Context{TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> TVar Int
continued :: Context -> IORef (Maybe Int)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} = do
connRxWS <- RxFlow -> Int
rxfBufSize (RxFlow -> Int) -> IO RxFlow -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef RxFlow -> IO RxFlow
forall a. IORef a -> IO a
readIORef IORef RxFlow
rxFlow
let frames = Settings -> Int -> [Scheme]
makeNegotiationFrames Settings
mySettings Int
connRxWS
setframe = Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing (Scheme
connectionPreface Scheme -> [Scheme] -> [Scheme]
forall a. a -> [a] -> [a]
: [Scheme]
frames)
writeIORef myFirstSettings True
enqueueControl controlQ setframe
data ClientIO = ClientIO
{ ClientIO -> SockAddr
cioMySockAddr :: SockAddr
, ClientIO -> SockAddr
cioPeerSockAddr :: SockAddr
, ClientIO -> Request -> IO (Int, Stream)
cioWriteRequest :: Request -> IO (StreamId, Stream)
, ClientIO -> Stream -> IO Response
cioReadResponse :: Stream -> IO Response
, ClientIO -> Scheme -> IO ()
cioWriteBytes :: ByteString -> IO ()
, ClientIO -> IO (Int, Stream)
cioCreateStream :: IO (StreamId, Stream)
}