plunder

Unnamed repository; edit this file 'description' to name the repository.
Log | Files | Refs | README | LICENSE

Machine.hs (62457B)


      1 -- Copyright 2023 The Plunder Authors
      2 -- Use of this source code is governed by a BSD-style license that can be
      3 -- found in the LICENSE file.
      4 
      5 {-
      6     TOPLEVEL TODOS:
      7 
      8     - TODO: `shutdownMachine` works for now, but what we really want is a way
      9             to hit ctrl-c once to begin shutdown and then have a second ctrl-c
     10             which aborts snapshotting.
     11 -}
     12 
     13 {-# LANGUAGE NoFieldSelectors #-}
     14 {-# LANGUAGE Strict           #-}
     15 {-# LANGUAGE StrictData       #-}
     16 {-# OPTIONS_GHC -Wall   #-}
     17 {-# OPTIONS_GHC -Werror #-}
     18 {-# OPTIONS_GHC -Wno-name-shadowing #-}
     19 
     20 module Server.Machine
     21     ( Machine(..)
     22     , MachineContext(..)
     23     , Moment(..)
     24     , performReplay
     25     , replayAndCrankMachine
     26     , shutdownMachine
     27     )
     28 where
     29 
     30 import PlunderPrelude
     31 
     32 import Control.Monad.State   (StateT, execStateT, modify', runStateT)
     33 import Fan                   (Fan(..), PrimopCrash(..), fanIdx, (%%))
     34 import GHC.Conc              (unsafeIOToSTM)
     35 import GHC.Prim              (reallyUnsafePtrEquality#)
     36 import Optics                (set)
     37 import Server.Convert        ()
     38 import System.Random         (randomIO)
     39 import System.Random.Shuffle (shuffleM)
     40 
     41 import qualified GHC.Natural as GHC
     42 
     43 import Data.Sorted
     44 import Fan.Convert
     45 import Fan.Prof
     46 import Server.Common
     47 import Server.Debug
     48 import Server.Evaluator
     49 import Server.Hardware.Types
     50 import Server.LmdbStore
     51 import Server.Time
     52 import Server.Types.Logging
     53 
     54 import qualified Data.IntMap as IM
     55 import qualified Data.Map    as M
     56 import qualified Data.Set    as S
     57 import qualified Data.Vector as V
     58 import qualified Server.AgentMachine as AgentMachine
     59 
     60 --------------------------------------------------------------------------------
     61 
     62 receiptQueueMax :: GHC.Natural
     63 receiptQueueMax = 65536
     64 
     65 
     66 --------------------------------------------------------------------------------
     67 
     68 data Moment = MOMENT {
     69   val  :: Map CogId CogState,
     70   work :: NanoTime
     71   }
     72 
     73 data MachineContext = MACHINE_CONTEXT
     74     { lmdb        :: LmdbStore
     75     , hw          :: DeviceTable
     76     , eval        :: Evaluator
     77     , enableSnaps :: Bool
     78     , agentMach   :: AgentMachine.Machine
     79     }
     80 
     81 -- | Flow report for an onCommit call.
     82 data OnCommitFlow = OnCommitFlow
     83     { step   :: Flow     -- ^ This flow caused this commit.
     84     , starts :: [Flow]   -- ^ This commit started these flows.
     85     }
     86 
     87 unzipOnCommitFlows :: [OnCommitFlow] -> ([Flow], [Flow])
     88 unzipOnCommitFlows onCommits = ( map (.step) onCommits
     89                                , join $ map (.starts) onCommits)
     90 
     91 -- | Toplevel handle for a Machine, a group of individual Cog fan evaluation
     92 -- that makes requests and receives responses and which share an event log.
     93 --
     94 -- Internally, machine execution has three long lived asyncs: a `Runner` async,
     95 -- a `Logger` async, and a `Snapshotter` async. These communicate and are
     96 -- controlled via the following STM variables:
     97 data Machine = MACHINE {
     98   ctx              :: MachineContext,
     99 
    100   -- The three phases:
    101   runnerAsync      :: Async (),
    102   loggerAsync      :: Async (),
    103   snapshotAsync    :: Async (),
    104 
    105   -- Signal to shut down all the asyncs. After setting this, you should wait on
    106   -- all three in order.
    107   shutdownLogger   :: TVar Bool,
    108   shutdownSnapshot :: TVar Bool,
    109 
    110   -- Starts empty, but is filled once the machine has shutdown and the
    111   shutdownComplete :: TMVar (),
    112 
    113   -- Noun state and time of the last commit at each of the three phases.
    114   liveVar          :: TVar Moment,
    115   writVar          :: TVar (BatchNum, Moment),
    116 
    117   -- | Command to log immediately (because of a pending add)
    118   logImmediately   :: TVar Bool,
    119   logReceiptQueue  :: TBQueue (Receipt, [STM OnCommitFlow])
    120   }
    121 
    122 oneSecondInNs :: NanoTime
    123 oneSecondInNs = NanoTime $ 10 ^ (9::Int)
    124 
    125 -- Configuration for
    126 logbatchWorkIntervalInNs, snapshotWorkIntervalInNs :: NanoTime
    127 logbatchWorkIntervalInNs = oneSecondInNs
    128 snapshotWorkIntervalInNs = oneSecondInNs * 45
    129 
    130 -- twoSecondsInMicroseconds :: Nat
    131 -- twoSecondsInMicroseconds = 2 * 10 ^ (6::Int)
    132 
    133 thirtySecondsInMicroseconds :: Nat
    134 thirtySecondsInMicroseconds = 30 * 10 ^ (6::Int)
    135 
    136 data TellOutcome
    137     = OutcomeOK Fan TellId
    138     | OutcomeCrash
    139   deriving (Show)
    140 
    141 data Response
    142     = RespEval EvalOutcome
    143     | RespCall Fan SysCall
    144     | RespWhat (Set Nat)
    145     | RespTell TellPayload
    146     | RespAsk TellOutcome
    147     | RespSpin CogId Fan
    148     | RespReap CogId (Maybe CogState)
    149     | RespStop CogId (Maybe CogState)
    150     | RespWait CogId
    151     | RespWho CogId
    152   deriving (Show)
    153 
    154 responseToVal :: Response -> Fan
    155 responseToVal (RespCall f _) = f
    156 responseToVal (RespWhat w) = toNoun w
    157 responseToVal (RespTell TELL_PAYLOAD{..}) = fanIdx 1 ret
    158 responseToVal (RespAsk (OutcomeOK val _)) = (NAT 0) %% val
    159 responseToVal (RespAsk OutcomeCrash) = (NAT 0)
    160 responseToVal (RespSpin (COG_ID id) _) = fromIntegral id
    161 responseToVal (RespReap _ f) = toNoun f
    162 responseToVal (RespStop _ f) = toNoun f
    163 responseToVal (RespWait _) = (NAT 0)
    164 responseToVal (RespWho (COG_ID id)) = fromIntegral id
    165 responseToVal (RespEval e)   =
    166     ROW case e of
    167         TIMEOUT   -> mempty                      -- []
    168         OKAY _ r  -> rowSingleton r              -- [f]
    169         CRASH n e -> arrayFromListN 2 [NAT n, e] -- [n e]
    170 
    171 responseToReceiptItem :: (Int, ResponseTuple) -> (Int, ReceiptItem)
    172 responseToReceiptItem (idx, tup) = case tup.resp of
    173     RespEval OKAY{}              -> (idx, ReceiptEvalOK)
    174     RespSpin cog _               -> (idx, ReceiptSpun cog)
    175     RespReap cog _               -> (idx, ReceiptReap cog)
    176     RespStop cog _               -> (idx, ReceiptStop cog)
    177     RespTell TELL_PAYLOAD{..}    -> (idx, ReceiptTell{..})
    178     RespAsk (OutcomeOK _ tellid) -> (idx, ReceiptAsk tellid)
    179     resp                         -> (idx, ReceiptVal (responseToVal resp))
    180 
    181 makeOKReceipt :: CogId -> [(Int, ResponseTuple)] -> Receipt
    182 makeOKReceipt cogId =
    183     RECEIPT_OK cogId . mapFromList . fmap responseToReceiptItem
    184 
    185 data CallRequest = CR
    186     { durable :: Bool
    187     , device  :: DeviceName
    188     , params  :: Vector Fan
    189     }
    190   deriving (Show)
    191 
    192 data SpinRequest = SR
    193     { cogFun  :: Fan
    194     }
    195   deriving (Show)
    196 
    197 data AskRequest = ASKR
    198     { cogDst  :: CogId
    199     , channel :: Word64
    200     , msg     :: Fan
    201     }
    202   deriving (Show)
    203 
    204 data Request
    205     = ReqEval EvalRequest
    206     | ReqCall CallRequest
    207     | ReqWhat (Set Nat)
    208     | ReqTell Word64 Fan
    209     | ReqAsk  AskRequest
    210     | ReqSpin SpinRequest
    211     | ReqReap CogId
    212     | ReqStop CogId
    213     | ReqWait CogId
    214     | ReqWho
    215     | UNKNOWN Fan
    216   deriving (Show)
    217 
    218 {-
    219   [%eval timeout/@ fun/fan arg/Fan ...]
    220   [%cog %spin fun/Fan]
    221   [%cog %ask dst/Nat chan/Nat param/Fan]
    222   [%cog %tell chan/Nat fun/Fan]
    223   [%cog %wait dst/Nat]
    224   [$call synced/? param/Fan ...]
    225 -}
    226 valToRequest :: CogId -> Fan -> Request
    227 valToRequest cogId top = fromMaybe (UNKNOWN top) do
    228     row <- getRowVec top
    229     tag <- fromNoun @DeviceName (row V.! 0)
    230 
    231     if tag == "eval" then do
    232         nat <- fromNoun @Nat (row V.! 1)
    233         let timeoutSecs = nat
    234         let timeoutMs   = timeoutSecs * 1000
    235         guard (length row >= 4)
    236         let func = row V.! 2
    237         let args = V.drop 3 row
    238         let flow = FlowDisabled -- TODO: What?
    239         let er = EVAL_REQUEST{func,args,cogId,flow,timeoutMs}
    240         pure (ReqEval er)
    241         -- e (ReqEval $ error "_" timeoutSecs (row V.! 2) (V.drop 3 row))
    242     else if tag == "what" then do
    243         what <- fromNoun @(Set Nat) (row V.! 1)
    244         pure $ ReqWhat what
    245     else if tag == "cog" then do
    246         nat <- fromNoun @Nat (row V.! 1)
    247         case nat of
    248             "spin" | length row == 3 -> pure (ReqSpin $ SR $ row V.! 2)
    249             "reap" | length row == 3 -> ReqReap <$> fromNoun @CogId (row V.! 2)
    250             "stop" | length row == 3 -> ReqStop <$> fromNoun @CogId (row V.! 2)
    251             "wait" | length row == 3 -> ReqWait <$> fromNoun @CogId (row V.! 2)
    252             "tell" | length row == 4 -> do
    253                 channel <- fromNoun (row V.! 2)
    254                 pure (ReqTell channel (row V.! 3))
    255             "ask"  | length row == 5 -> do
    256                 dst <- fromNoun @CogId (row V.! 2)
    257                 channel <- fromNoun (row V.! 3)
    258                 pure (ReqAsk (ASKR dst channel (row V.! 4)))
    259             "who"  | length row == 2 -> pure ReqWho
    260             _ -> Nothing
    261     else do
    262         _   <- guard (length row >= 3)
    263         nat <- fromNoun @Nat (row V.! 1)
    264         case nat of
    265             0 -> pure (ReqCall $ CR False tag $ V.drop 2 row)
    266             1 -> pure (ReqCall $ CR True  tag $ V.drop 2 row)
    267             _ -> Nothing
    268 
    269 getCurrentReqNoun :: Fan -> Vector Fan
    270 getCurrentReqNoun s = do
    271     case s of
    272         KLO _ xs -> do
    273             let len = sizeofSmallArray xs
    274             case (xs .! (len-1)) of
    275                 ROW x -> V.fromArray x
    276                 _     -> mempty
    277         _ -> mempty
    278 
    279 -- A request noun is empty if it is a row with a nonzero value.
    280 hasNonzeroReqs :: Fan -> Bool
    281 hasNonzeroReqs = any (/= NAT 0) . getCurrentReqNoun
    282 
    283 data EvalCancelledError = EVAL_CANCELLED
    284   deriving (Exception, Show)
    285 
    286 -- | A list of parsed out valid requests from `noun`. For every cog, for every
    287 -- index in that cog's requests table, there is a raw fan value and a
    288 -- `LiveRequest` which contains STM variables to listen
    289 type CogSysCalls = IntMap (Fan, LiveRequest)
    290 type MachineSysCalls = Map CogId CogSysCalls
    291 
    292 -- | The PendingAsk contains a pointer to an %asking request, along with its
    293 -- value. It exists so we can quickly look up its %msg at execution time from a
    294 -- pool of open asks.
    295 data PendingAsk = PENDING_ASK
    296     { requestor :: CogId
    297     , reqIdx    :: RequestIdx
    298     , msg       :: Fan
    299     }
    300   deriving (Show)
    301 
    302 -- | All the information needed for both
    303 data TellPayload = TELL_PAYLOAD
    304     { asker  :: CogId
    305     , reqIdx :: RequestIdx
    306 
    307     -- The locally unique tellId for this apply. Written to the event log.
    308     , tellId :: TellId
    309 
    310     -- The value produced by running the tell function against its input. It is
    311     -- deliberately left lazy since they have to be created inside an STM
    312     -- action in `receiveResponse`, but must be evaluate/forced and credited
    313     -- against the tell's timeout. (We previously also separated the value here
    314     -- into its head and tail with `fanIdx` instead of just storing the full
    315     -- result, but that causes some partial evaluation and can crash the
    316     -- interpreter if the first statement is a `trk`.)
    317     , ret    :: ~Fan
    318     }
    319   deriving (Show)
    320 
    321 -- This is a collection of all {PendingAsk}s within all the cogs of a
    322 -- machine on all channels.  This is part of the top-level Machine STM state,
    323 -- because these IPC interactions need happen transactionally.
    324 type CogChannelPool a = Map CogId (TVar (ChannelPool a))
    325 
    326 type ChannelPool a = Map Word64 (TVar (Pool a))
    327 
    328 channelPoolRegister :: Word64 -> TVar (ChannelPool a) -> a
    329                     -> STM Int
    330 channelPoolRegister channel vChannels psr = do
    331     channels <- readTVar vChannels
    332     pool <- case M.lookup channel channels of
    333         Just pool -> pure pool
    334         Nothing -> do
    335             pool <- newTVar emptyPool
    336             modifyTVar' vChannels $ insertMap channel pool
    337             pure pool
    338     poolRegister pool psr
    339 
    340 -- Given a channel number, takes an item from that pool, cleaning up empty
    341 -- channel pools.
    342 --
    343 -- Unlike `readPool`, this never retries and returns a Maybe instead because
    344 -- retrying during `receiveResponse` can cause more widespread blockage.
    345 channelPoolTake :: Word64 -> TVar (ChannelPool a) -> STM (Maybe a)
    346 channelPoolTake channel vChannels = do
    347     channels <- readTVar vChannels
    348     case lookup channel channels of
    349         Nothing -> pure Nothing
    350         Just vPool -> do
    351             pool <- readTVar vPool
    352             case IM.minView pool.tab of
    353                 Nothing -> error "Pool didn't get cleaned when empty?"
    354                 Just (x, xs) -> do
    355                     case null xs of
    356                         True  -> modifyTVar' vChannels $ M.delete channel
    357                         False -> writeTVar vPool (set #tab xs $ pool)
    358                     pure $ Just x
    359 
    360 channelPoolUnregister :: Word64 -> TVar (ChannelPool a) -> Int -> STM ()
    361 channelPoolUnregister channel vChannels poolId =
    362   do
    363     channels <- readTVar vChannels
    364     case lookup channel channels of
    365         Nothing -> pure ()
    366         Just vPool -> do
    367             empty <- stateTVar vPool $ \pool ->
    368                 let newPool = over #tab (deleteMap poolId) pool
    369                 in (null newPool.tab, newPool)
    370 
    371             when empty $ do
    372                 modifyTVar' vChannels $ deleteMap channel
    373 
    374 -- | Data used only by the Runner async. This is all the data needed to
    375 -- run the main thread of Fan evaluation and start Requests that it made.
    376 data Runner = RUNNER
    377     { ctx       :: MachineContext
    378     , vMoment   :: TVar Moment          -- ^ Current value + cumulative CPU time
    379     , vRequests :: TVar MachineSysCalls -- ^ Current requests table
    380 
    381     , vTellId   :: TVar Int
    382 
    383     , vAsks     :: TVar (CogChannelPool PendingAsk)
    384     }
    385 
    386 -- -----------------------------------------------------------------------
    387 
    388 -- An effect that happens atomically after a Response is processed by a cog and
    389 -- didn't crash.
    390 data CogReplayEffect
    391     = CSpin
    392       { reCogId :: CogId
    393       , reFun   :: Fan
    394       }
    395     | CStop
    396       { reCogId :: CogId }
    397     | CTell
    398       { tellId :: TellId
    399       , tell   :: Fan
    400       }
    401     | CAsk
    402       { tellId :: TellId
    403       }
    404     deriving (Show)
    405 
    406 data ResponseTuple = RTUP
    407     { key  :: RequestIdx
    408     , resp :: Response
    409     , work :: NanoTime
    410     , flow :: Flow
    411     }
    412   deriving (Show)
    413 
    414 -- | A `LiveRequest` is a handle that could produce a Response to an open
    415 -- Request.
    416 data LiveRequest
    417   = LiveEval {
    418     leIdx    :: RequestIdx,
    419     leRecord :: Evaluation
    420     }
    421   | LiveCall {
    422     lcIdx     :: RequestIdx,
    423     lcCancel  :: Cancel,
    424     lcSysCall :: SysCall
    425     }
    426   | LiveWhat {
    427     lwhIdx     :: RequestIdx,
    428     lwhCog     :: Set Nat,
    429     lwhRuntime :: Set Nat
    430     }
    431   | LiveTell {
    432     ltCogId    :: CogId,
    433     ltIdx      :: RequestIdx,
    434     ltChannel  :: Word64,
    435     ltChannels :: TVar (ChannelPool PendingAsk),
    436     ltFun      :: Fan
    437     }
    438   | LiveAsk {
    439     lcrChannel  :: Word64,
    440     lcrPoolId   :: Int,
    441     lcrChannels :: TVar (ChannelPool PendingAsk)
    442     }
    443   | LiveSpin {
    444     lsIdx :: RequestIdx,
    445     lsFun :: Fan
    446     }
    447   | LiveReap {
    448     lrIdx   :: RequestIdx,
    449     lrCogId :: CogId
    450     }
    451   | LiveStop {
    452     lstIdx   :: RequestIdx,
    453     lstCogId :: CogId
    454     }
    455   | LiveWait {
    456     lwaIdx   :: RequestIdx,
    457     lwaCogId :: CogId
    458     }
    459   | LiveWho {
    460     lwIdx   :: RequestIdx,
    461     lwCogId :: CogId
    462     }
    463   | LiveUnknown
    464 
    465 instance Show LiveRequest where
    466     show = \case
    467         LiveEval{}  -> "EVAL"
    468         LiveCall{}  -> "CALL"
    469         LiveWhat{}  -> "WHAT"
    470         LiveTell{}  -> "TELL"
    471         LiveAsk{}   -> "ASK"
    472         LiveSpin{}  -> "SPIN"
    473         LiveReap{}  -> "REAP"
    474         LiveStop{}  -> "STOP"
    475         LiveWait{}  -> "WAIT"
    476         LiveWho{}   -> "WHO"
    477         LiveUnknown -> "UNKNOWN"
    478 
    479 validLiveRequest :: LiveRequest -> Bool
    480 validLiveRequest LiveUnknown = False
    481 validLiveRequest _           = True
    482 
    483 data ParseRequestsState = PRS
    484     { syscalls  :: CogSysCalls
    485     , flows     :: [Flow]
    486     , onPersist :: [STM OnCommitFlow]
    487     }
    488 
    489 makeFieldLabelsNoPrefix ''MachineContext
    490 makeFieldLabelsNoPrefix ''Moment
    491 makeFieldLabelsNoPrefix ''Runner
    492 makeFieldLabelsNoPrefix ''ParseRequestsState
    493 
    494 -- -----------------------------------------------------------------------
    495 -- No template haskell beyond this point because optics.
    496 -- -----------------------------------------------------------------------
    497 
    498 tripleToPair :: (a, b, c) -> (a, b)
    499 tripleToPair (a, b, _) = (a, b)
    500 
    501 third :: (a, b, c) -> c
    502 third (_, _, c) = c
    503 
    504 type ReconstructedEvals = [(Fan, Fan, Maybe CogReplayEffect)]
    505 
    506 type Tells = Map TellId Fan
    507 
    508 recomputeEvals
    509     :: Moment
    510     -> Tells
    511     -> CogId
    512     -> IntMap ReceiptItem
    513     -> StateT NanoTime IO ReconstructedEvals
    514 recomputeEvals m tells cogId tab =
    515     for (mapToList tab) \(idx, rVal) -> do
    516         let k = toNoun (fromIntegral idx :: Word)
    517         case rVal of
    518             ReceiptVal val -> pure (k, val, Nothing)
    519             ReceiptEvalOK  -> do
    520                 -- We performed an eval which succeeded the
    521                 -- first time.
    522                 case getEvalFunAt m cogId (RequestIdx idx) of
    523                     Nothing ->
    524                         throwIO INVALID_OK_RECEIPT_IN_LOGBATCH
    525                     Just (fun, args)  -> do
    526                         (runtime, res) <- withCalcRuntime do
    527                             evaluate $ force (foldl' (%%) fun args)
    528                         modify' (+ runtime)
    529                         pure (k, ROW (rowSingleton res), Nothing)
    530             ReceiptSpun newCogId -> do
    531                 case getSpinFunAt m cogId (RequestIdx idx) of
    532                     Nothing ->
    533                         throwIO $ INVALID_SPUN_RECEIPT_IN_LOGBATCH newCogId
    534                     Just fun -> do
    535                         let ef = CSpin newCogId fun
    536                         pure (k, NAT $ fromIntegral newCogId.int, Just ef)
    537             ReceiptTell{..} -> do
    538                 -- We have to retrieve data from both the ask and the tell
    539                 case (getAskFunAt m asker cogId reqIdx,
    540                       getTellFunAt m cogId (RequestIdx idx)) of
    541                     (Just ask, Just tellFun) -> do
    542                         let ret = tellFun %% (toNoun asker) %% ask
    543                             askResp = fanIdx 0 ret
    544                             tellResp = fanIdx 1 ret
    545                             ef = CTell tellId askResp
    546                         pure (k, tellResp, Just ef)
    547                     _ ->
    548                         throwIO $ INVALID_TELL_RECEIPT_IN_LOGBATCH
    549             ReceiptAsk{..} -> do
    550                 case lookup tellId tells of
    551                     Nothing ->
    552                         throwIO $ INVALID_ASK_RECEIPT_IN_LOGBATCH
    553                     Just askResponse ->
    554                         pure (k, NAT 0 %% askResponse, Just $ CAsk tellId)
    555             ReceiptReap{..} -> do
    556                 case M.lookup cogNum m.val of
    557                     Nothing ->
    558                         throwIO INVALID_REAP_RECEIPT_IN_LOGBATCH
    559                     Just CG_SPINNING{} ->
    560                         throwIO INVALID_REAP_RECEIPT_IN_LOGBATCH
    561                     Just val -> do
    562                         let ef = CStop cogNum
    563                         pure (k, (NAT 0) %% toNoun val, Just ef)
    564             ReceiptStop{..} -> do
    565                 case M.lookup cogNum m.val of
    566                     Nothing ->
    567                         pure (k, NAT 0, Nothing)
    568                     Just val -> do
    569                         let ef = CStop cogNum
    570                         pure (k, (NAT 0) %% toNoun val, Just ef)
    571   where
    572     getEvalFunAt :: Moment -> CogId -> RequestIdx -> Maybe (Fan, Vector Fan)
    573     getEvalFunAt m cogId idx = withRequestAt m cogId idx $ \case
    574         ReqEval er -> Just (er.func, er.args)
    575         _          -> Nothing
    576 
    577 
    578     getSpinFunAt :: Moment -> CogId -> RequestIdx -> Maybe Fan
    579     getSpinFunAt m cogId idx = withRequestAt m cogId idx $ \case
    580         ReqSpin (SR fun) -> Just fun
    581         _                -> Nothing
    582 
    583     getAskFunAt :: Moment -> CogId -> CogId -> RequestIdx -> Maybe Fan
    584     getAskFunAt m sender receiver idx = withRequestAt m sender idx $ \case
    585         ReqAsk (ASKR cogDst _channel msg)
    586             | cogDst == receiver     -> Just msg
    587             | otherwise              -> Nothing
    588         _                            -> Nothing
    589 
    590     getTellFunAt :: Moment -> CogId -> RequestIdx -> Maybe Fan
    591     getTellFunAt m sender idx = withRequestAt m sender idx $ \case
    592         ReqTell _channel fun -> Just fun
    593         _                    -> Nothing
    594 
    595     withRequestAt :: Moment -> CogId -> RequestIdx -> (Request -> Maybe a)
    596                   -> Maybe a
    597     withRequestAt m cogId (RequestIdx idx) fun = do
    598         case lookup cogId m.val of
    599             Nothing -> Nothing
    600             Just CG_CRASHED{} -> Nothing
    601             Just CG_TIMEOUT{} -> Nothing
    602             Just CG_FINISHED{} -> Nothing
    603             Just (CG_SPINNING cog) -> do
    604                 let row = getCurrentReqNoun cog
    605                 case row V.!? idx of
    606                     Nothing  -> Nothing
    607                     Just val -> fun $ valToRequest cogId val
    608 
    609 -- | Loads and the last snapshot from disk, and loops over the event loop
    610 -- (starting from that event) and processes each Receipt from each LogBatch.
    611 --
    612 -- At the end, we return the current batch number and the computed Moment
    613 -- (the current state of a running cog).
    614 performReplay
    615     :: Debug
    616     => Cushion
    617     -> MachineContext
    618     -> ReplayFrom
    619     -> IO (BatchNum, Moment)
    620 performReplay cache ctx replayFrom = do
    621     let mkInitial (a, b) = (b, (MOMENT a 0, mempty))
    622     (bn, (m, _)) <- loadLogBatches replayFrom mkInitial doBatch ctx.lmdb cache
    623     pure (bn, m)
    624   where
    625     doBatch :: (BatchNum, (Moment, Tells)) -> LogBatch
    626             -> IO (BatchNum, (Moment, Tells))
    627     doBatch (_, momentTells) LogBatch{batchNum,executed} =
    628         loop batchNum momentTells executed
    629 
    630     loop :: BatchNum -> (Moment, Tells) -> [Receipt]
    631          -> IO (BatchNum, (Moment, Tells))
    632     loop bn mt []     = pure (bn, mt)
    633     loop bn (m, t) (x:xs) =
    634         let mybCogFun = case M.lookup x.cogNum m.val of
    635                 Nothing -> Nothing
    636                 Just x  -> cogSpinningFun x
    637         in case x of
    638             RECEIPT_TIME_OUT{..} -> do
    639                 -- Evaluating this bundle of responses timed out the first time
    640                 -- we ran it, so just don't run it and just recreate the
    641                 -- timed-out state from the recorded timeout amount.
    642                 case mybCogFun of
    643                     Nothing -> throwIO INVALID_TIMEOUT_IN_LOGBATCH
    644                     Just fun -> do
    645                         let cog = CG_TIMEOUT timeoutAmount fun
    646                         pure (bn, (m { val = M.insert x.cogNum cog m.val}, t))
    647 
    648             RECEIPT_CRASHED{..} -> do
    649                 -- While it's deterministic whether a plunder computation
    650                 -- crashes or not, its crash value is not deterministic so we
    651                 -- must reconstitute the crash from the recorded result.
    652                 case mybCogFun of
    653                     Nothing -> throwIO INVALID_CRASHED_IN_LOGBATCH
    654                     Just final -> do
    655                         let cog = CG_CRASHED{op,arg,final}
    656                         pure (bn, (m { val = M.insert x.cogNum cog m.val}, t))
    657 
    658             RECEIPT_OK{..} -> do
    659                 -- The original run succeeded, rebuild the value from the
    660                 -- receipt items.
    661                 (eRes, eWork) <- runStateT (recomputeEvals m t cogNum inputs) 0
    662 
    663                 let arg = TAb $ mapFromList $ map tripleToPair eRes
    664 
    665                 let runEffect :: (Moment, Tells) -> Maybe CogReplayEffect
    666                               -> IO (Moment, Tells)
    667                     runEffect (m, t) = \case
    668                         Just CSpin{..} -> pure $ (m {
    669                             val = M.insert reCogId (CG_SPINNING reFun) m.val },
    670                                                   t)
    671                         Just CStop{..} -> pure $ (m {
    672                             val = M.delete reCogId m.val }, t)
    673                         Just CTell{..} -> pure $ (m, M.insert tellId tell t)
    674                         Just CAsk{..}  -> pure $ (m, M.delete tellId t)
    675                         Nothing        -> pure (m, t)
    676 
    677                 (m, t) <- foldlM runEffect (m, t) (map third eRes)
    678 
    679                 fun <- case mybCogFun of
    680                     Nothing  -> throwIO INVALID_COGID_IN_LOGBATCH
    681                     Just fun -> pure fun
    682 
    683                 (iWork, outcome) <- evalCheckingCrash fun arg
    684 
    685                 newVal <- case outcome of
    686                     TIMEOUT ->
    687                         error "performReplay: impossible timeout on replay"
    688                     CRASH o e -> pure $ CG_CRASHED o e fun
    689                     OKAY _ result -> pure $ CG_SPINNING result
    690 
    691                 let newMoment = MOMENT
    692                         { work = m.work + eWork + iWork
    693                         , val = insertMap x.cogNum newVal m.val
    694                         }
    695 
    696                 loop bn (newMoment, t) xs
    697 
    698 -- | Main entry point for starting running a Cog.
    699 replayAndCrankMachine
    700     :: Debug
    701     => Cushion
    702     -> MachineContext
    703     -> ReplayFrom
    704     -> IO Machine
    705 replayAndCrankMachine cache ctx replayFrom = do
    706   let threadName  = encodeUtf8 "Main"
    707 
    708   withProcessName processName (wrapReplay threadName)
    709   where
    710     processName = "Machine"
    711 
    712     wrapReplay :: ByteString -> IO Machine
    713     wrapReplay threadName = do
    714       withThreadName threadName $ do
    715         setThreadSortIndex (-3)
    716         withTracingFlow "Replay" "machine" mempty [] $ do
    717           (batchNum, moment) <- performReplay cache ctx replayFrom
    718 
    719           debugText $ "REPLAY TIME: " <> tshow moment.work <> " ns"
    720 
    721           (machine, flows) <- buildMachine threadName batchNum moment
    722           pure (flows, [], machine)
    723 
    724     buildMachine
    725         :: ByteString
    726         -> BatchNum
    727         -> Moment
    728         -> IO (Machine, [Flow])
    729     buildMachine threadName lastBatch moment = do
    730       shutdownLogger   <- newTVarIO False
    731       shutdownSnapshot <- newTVarIO False
    732       shutdownComplete <- newEmptyTMVarIO
    733       liveVar          <- newTVarIO moment
    734       writVar          <- newTVarIO (lastBatch, moment)
    735       logImmediately   <- newTVarIO False
    736       logReceiptQueue  <- newTBQueueIO receiptQueueMax
    737 
    738       runner <- atomically do
    739           vMoment   <- newTVar moment
    740           vRequests <- newTVar mempty
    741 
    742           reqChannels <- forM (keys moment.val) $ \k -> do
    743               channelPools <- newTVar mempty
    744               pure (k, channelPools)
    745           vAsks     <- newTVar $ mapFromList reqChannels
    746 
    747           vTellId <- newTVar 0
    748 
    749           pure RUNNER{vMoment, ctx, vRequests, vTellId, vAsks}
    750 
    751       -- TODO: Hack because I don't understand how this is supposed
    752       -- to work.
    753       initialFlows <- newEmptyMVar
    754 
    755       machine <- mdo
    756         snapshotAsync <- asyncOnCurProcess $ withThreadName "Snapshot" $ do
    757           setThreadSortIndex (-1)
    758           handle (onErr "snapshot") $ snapshotFun machine
    759 
    760         loggerAsync <- asyncOnCurProcess $ withThreadName "Log" $ do
    761           setThreadSortIndex (-2)
    762           handle (onErr "log") $ logFun machine
    763 
    764         runnerAsync <- asyncOnCurProcess $ withThreadName threadName $ do
    765           handle (onErr "runner") $
    766             runnerFun initialFlows machine processName runner
    767 
    768         let machine = MACHINE{..}
    769         pure machine
    770 
    771       flows <- takeMVar initialFlows
    772       pure (machine, flows)
    773 
    774     onErr name e = do
    775       debugText $ name <> " thread was killed by: " <> pack (displayException e)
    776       throwIO (e :: SomeException)
    777 
    778 -- | Synchronously shuts down a machine and wait for it to exit.
    779 shutdownMachine :: Machine -> IO ()
    780 shutdownMachine machine@MACHINE{ctx,runnerAsync} = do
    781   -- Kill the runner to immediately cancel any computation being done.
    782   cancel runnerAsync
    783   -- Kill agentMach's runner, too
    784   cancel ctx.agentMach.runnerAsync
    785 
    786   waitForLogAsyncShutdown machine
    787 
    788 waitForLogAsyncShutdown :: Machine -> IO ()
    789 waitForLogAsyncShutdown MACHINE{..} = do
    790   -- Wait for the logging system to finish any logging task it's already
    791   -- started on.
    792   atomically $ writeTVar shutdownLogger True
    793   wait loggerAsync
    794   wait snapshotAsync
    795   atomically $ putTMVar shutdownComplete ()
    796 
    797 -- | Given a Request parsed from the cog, turn it into a LiveRequest that can
    798 -- produce a value and that we can listen to.
    799 buildLiveRequest
    800     :: Debug
    801     => Flow
    802     -> Runner
    803     -> MachineContext
    804     -> CogId
    805     -> RequestIdx
    806     -> Request
    807     -> STM ([Flow], LiveRequest, Maybe (STM OnCommitFlow))
    808 buildLiveRequest causeFlow runner ctx cogId reqIdx = \case
    809     ReqEval EVAL_REQUEST{..} -> do -- timeoutMs func args -> do
    810         let req = EVAL_REQUEST{flow=causeFlow, timeoutMs, func, args, cogId}
    811         leRecord <- pleaseEvaluate ctx.eval req
    812         pure ([], LiveEval{leIdx=reqIdx, leRecord}, Nothing)
    813 
    814     ReqCall cr -> do
    815         callSt <- STVAR <$> newTVar LIVE
    816 
    817         let lcSysCall = SYSCALL cogId cr.device cr.params callSt causeFlow
    818         -- If the call requires durability, pass the STM action to start
    819         -- it back to the caller. Otherwise, just run the action and
    820         -- pass back nothing.
    821         (startedFlows, logCallback, lcCancel) <-
    822             if cr.durable then do
    823                 canceled <- newTVar False
    824                 vCancel  <- newTVar (CANCEL $ writeTVar canceled True)
    825 
    826                 -- If the cancel happens before the disk sync, then we
    827                 -- never submit the request to hardware.  If the cancel
    828                 -- happens after `onCommit`, then we run the actual cancel
    829                 -- action.
    830 
    831                 let onCancel = do
    832                         cancel <- readTVar vCancel
    833                         cancel.action
    834 
    835                 let onCommit = do
    836                         isCanceled <- readTVar canceled
    837                         startedFlows <- case isCanceled of
    838                           True -> pure []
    839                           False -> do
    840                             (cancel, startedFlows) <-
    841                               callHardware ctx.hw cr.device lcSysCall
    842                             writeTVar vCancel cancel
    843                             pure startedFlows
    844                         pure $ OnCommitFlow causeFlow startedFlows
    845 
    846                 pure ([], Just onCommit, CANCEL onCancel)
    847             else do
    848                 (cancel, flows) <- callHardware ctx.hw cr.device lcSysCall
    849                 pure (flows, Nothing, cancel)
    850 
    851         pure (startedFlows, LiveCall{lcIdx=reqIdx, lcSysCall, lcCancel},
    852               logCallback)
    853 
    854     ReqWhat what -> do
    855         -- NOTE: If we ever make the hardware set runtime dynamic, the
    856         -- DeviceTable's data should be a `TVar (Map ...)` to let us detect
    857         -- changes to the variable.
    858         let runtime = S.fromList $ keys $ table ctx.hw
    859         pure ([], LiveWhat{lwhIdx=reqIdx,lwhCog=what,lwhRuntime=runtime},
    860               Nothing)
    861 
    862     ReqAsk ASKR{cogDst,channel,msg} -> do
    863         asks <- readTVar runner.vAsks
    864         case M.lookup cogDst asks of
    865             -- TODO: Pretty sure using `error` here is wrong. Figure out what
    866             -- the real error handling should be when a cog requests from a cog
    867             -- that doesn't exist.
    868             Nothing -> error $ "Bad cogDst " <> show cogDst
    869             Just vChannels -> do
    870                 poolId <-
    871                     channelPoolRegister channel
    872                                         vChannels
    873                                         (PENDING_ASK cogId reqIdx msg)
    874                 pure ([], LiveAsk{lcrChannel=channel,
    875                                   lcrPoolId=poolId,
    876                                   lcrChannels=vChannels}, Nothing)
    877 
    878     ReqTell channel function -> do
    879         asks <- readTVar runner.vAsks
    880         channels <- case M.lookup cogId asks of
    881             Nothing   -> error $ "Listening on a nonexistent cog " <> show cogId
    882             Just pool -> pure pool
    883         pure ([], LiveTell{ltCogId=cogId,
    884                            ltIdx=reqIdx,
    885                            ltChannel=channel,
    886                            ltChannels=channels,
    887                            ltFun=function}, Nothing)
    888 
    889     ReqSpin (SR func) -> do
    890         pure ([], LiveSpin{lsIdx=reqIdx, lsFun=func}, Nothing)
    891 
    892     ReqReap cogid -> do
    893         pure ([], LiveReap{lrIdx=reqIdx, lrCogId=cogid}, Nothing)
    894 
    895     ReqStop cogid -> do
    896         pure ([], LiveStop{lstIdx=reqIdx, lstCogId=cogid}, Nothing)
    897 
    898     ReqWait cogid -> do
    899         pure ([], LiveWait{lwaIdx=reqIdx, lwaCogId=cogid}, Nothing)
    900 
    901     ReqWho -> do
    902         pure ([], LiveWho{lwIdx=reqIdx, lwCogId=cogId}, Nothing)
    903 
    904     UNKNOWN _ -> do
    905         pure ([], LiveUnknown, Nothing)
    906 
    907 cancelRequest :: LiveRequest -> STM ()
    908 cancelRequest LiveEval{..} = leRecord.cancel.action
    909 cancelRequest LiveCall{..} = lcCancel.action
    910 cancelRequest LiveWhat{}   = pure ()
    911 cancelRequest LiveTell{}   = pure ()
    912 cancelRequest LiveAsk{..} =
    913     channelPoolUnregister lcrChannel lcrChannels lcrPoolId
    914 cancelRequest LiveSpin{}   = pure () -- Not asynchronous
    915 cancelRequest LiveReap{}   = pure ()
    916 cancelRequest LiveStop{}   = pure ()
    917 cancelRequest LiveWait{}   = pure ()
    918 cancelRequest LiveWho{}    = pure ()
    919 cancelRequest LiveUnknown  = pure ()
    920 
    921 {-
    922     An eval that was timed out or crashed requires zero work to replay,
    923     since we persist the result instead of re-computing it during replay.
    924 -}
    925 workToReplayEval :: EvalOutcome -> NanoTime
    926 workToReplayEval = \case
    927     OKAY w _ -> w
    928     CRASH{}  -> 0
    929     TIMEOUT  -> 0
    930 
    931 receiveResponse
    932   :: Runner
    933   -> (Fan, LiveRequest)
    934   -> STM (Maybe ResponseTuple)
    935 receiveResponse st = \case
    936     (_, LiveEval{..}) -> do
    937         getEvalOutcome leRecord >>= \case
    938             Nothing -> pure Nothing
    939             Just (outcome, flow) -> do
    940                 let work = workToReplayEval outcome
    941                 pure (Just RTUP{key=leIdx, resp=RespEval outcome, work, flow})
    942 
    943     (_, LiveCall{..}) -> do
    944         getCallResponse lcSysCall >>= \case
    945             Nothing -> pure Nothing
    946             Just (fan, flow) -> do
    947                 let resp = RespCall fan lcSysCall
    948                 -- TODO: Handle runtimeNs here, if we spent seconds on
    949                 -- an http transfer, we want to commit that as work so
    950                 -- we wouldn't fetch the same file again when restarting.
    951                 --
    952                 -- TODO: Reconsider the above TODO?  Replay does not
    953                 -- run effects again.
    954                 pure (Just RTUP{key=lcIdx, resp, work=0, flow})
    955 
    956     (_, LiveWhat{..}) -> do
    957         guard (lwhCog /= lwhRuntime)
    958         let resp = RespWhat lwhRuntime
    959         pure (Just RTUP{key=lwhIdx,resp,work=0,flow=FlowDisabled})
    960 
    961     (_, LiveAsk{}) ->
    962         -- Asks are never responded to normally, they're responded manually as
    963         -- a side effect of a tell so that we maintain atomicity of the
    964         -- request/serve pair in the log.
    965         retry
    966 
    967     (_, LiveTell{..}) -> do
    968         channelPoolTake ltChannel ltChannels >>= \case
    969             Nothing -> pure Nothing
    970             Just PENDING_ASK{..} -> do
    971                 tellId <- stateTVar st.vTellId \s -> (TellId s, s + 1)
    972 
    973                 let pa = TELL_PAYLOAD {
    974                       asker=requestor,
    975                       reqIdx,
    976                       tellId,
    977                       ret = ltFun %% (toNoun requestor) %% msg
    978                       }
    979 
    980                 pure (Just RTUP{key=ltIdx,
    981                                 resp=RespTell pa,
    982                                 work=0,
    983                                 -- TODO: Hook up send flows here.
    984                                 flow=FlowDisabled})
    985 
    986     (_, LiveSpin{..}) ->
    987       do
    988         cogid <- getRandomNonConflictingId =<< readTVar st.vMoment
    989         -- TODO: How should the flows be represented when spinning a new cog!?
    990         pure (Just RTUP{key=lsIdx, resp=RespSpin cogid lsFun, work=0,
    991                         flow=FlowDisabled})
    992       where
    993         getRandomNonConflictingId :: Moment -> STM CogId
    994         getRandomNonConflictingId moment = do
    995           r :: Word64 <- unsafeIOToSTM $ randomIO
    996           case lookup (COG_ID r) moment.val of
    997             Nothing -> pure $ COG_ID r
    998             Just _  -> getRandomNonConflictingId moment
    999 
   1000     (_, LiveReap{..}) -> do
   1001         do
   1002           myb <- (lookup lrCogId . (.val)) <$> readTVar st.vMoment
   1003           case myb of
   1004             Nothing -> do
   1005               pure (Just RTUP{key=lrIdx, resp=RespReap lrCogId Nothing,
   1006                               work=0, flow=FlowDisabled})
   1007             Just cogval -> case cogval of
   1008               CG_SPINNING{} -> pure Nothing
   1009               _             -> do
   1010                 performStoplike RespReap lrIdx lrCogId cogval
   1011 
   1012     (_, LiveStop{..}) -> do
   1013         do
   1014           myb <- (lookup lstCogId . (.val)) <$> readTVar st.vMoment
   1015           case myb of
   1016             Nothing ->
   1017               pure (Just RTUP{key=lstIdx, resp=RespStop lstCogId Nothing,
   1018                               work=0, flow=FlowDisabled})
   1019             Just cogval -> performStoplike RespStop lstIdx lstCogId cogval
   1020 
   1021     (_, LiveWait{..}) -> do
   1022         myb <- (lookup lwaCogId . (.val)) <$> readTVar st.vMoment
   1023         case myb of
   1024           Nothing ->
   1025             pure (Just RTUP{key=lwaIdx, resp=RespWait lwaCogId,
   1026                             work=0, flow=FlowDisabled})
   1027           Just _ -> pure Nothing
   1028 
   1029     (_, LiveWho{..}) -> do
   1030         pure (Just RTUP{key=lwIdx, resp=RespWho lwCogId, work=0,
   1031                         flow=FlowDisabled})
   1032 
   1033     (_, LiveUnknown) -> do
   1034         pure Nothing
   1035 
   1036   where
   1037     -- Common implementation of %stop and %reap, building a response while also
   1038     -- removing the current cog's state and canceling all open requests.
   1039     performStoplike :: (CogId -> Maybe CogState -> Response)
   1040                     -> RequestIdx
   1041                     -> CogId
   1042                     -> CogState
   1043                     -> STM (Maybe ResponseTuple)
   1044     performStoplike mkResponse reqIdx cogId cogVal = do
   1045         modifyTVar' st.vAsks $ M.delete cogId
   1046         modifyTVar' st.vMoment
   1047                     \m -> m { val=deleteMap cogId m.val }
   1048 
   1049         reqs <- stateTVar st.vRequests getAndRemoveReqs
   1050         mapM_ cancelRequest reqs
   1051 
   1052         pure (Just RTUP{key=reqIdx, resp=mkResponse cogId (Just cogVal),
   1053                         work=0, flow=FlowDisabled})
   1054       where
   1055         getAndRemoveReqs s =
   1056             ( getLiveReqs $ fromMaybe mempty $ lookup cogId s
   1057             , deleteMap cogId s
   1058             )
   1059         getLiveReqs csc = map (\(_,(_,lr)) -> lr) $ mapToList csc
   1060 
   1061 
   1062 -- Design point: Why not use optics and StateT in Runner? Because StateT in IO
   1063 -- doesn't have a MonandUnliftIO instance, which means that we can't bracket
   1064 -- our calls to the profiling system, which you really want to do to keep
   1065 -- things exception safe. We thus only use it in the one super stateful method,
   1066 -- parseRequests.
   1067 
   1068 -- The Cog Runner --------------------------------------------------------------
   1069 
   1070 runnerFun :: Debug => MVar [Flow] -> Machine -> ByteString -> Runner -> IO ()
   1071 runnerFun initialFlows machine processName st =
   1072     bracket_ registerCogsWithHardware stopCogsWithHardware $ do
   1073         -- Process the initial syscall vector
   1074         (flows, onCommit) <- atomically (parseAllRequests st)
   1075 
   1076         -- We got here via replay, so any synchronous requests are
   1077         -- immediately safe to execute.
   1078         for_ onCommit atomically
   1079             -- TODO: do this all in one atomically?
   1080 
   1081         -- The code that calls us wants these.
   1082         putMVar initialFlows flows
   1083 
   1084         -- Run the event loop until we're forced to stop.
   1085         machineTick
   1086 
   1087         -- Force a sync to shutdown.
   1088         waitForLogAsyncShutdown machine
   1089   where
   1090     registerCogsWithHardware :: IO ()
   1091     registerCogsWithHardware = do
   1092       m <- readTVarIO st.vMoment
   1093       let k :: [CogId] = keys m.val
   1094       for_ k $ \cogid ->
   1095         for_ machine.ctx.hw.table $ \d -> d.spin cogid
   1096 
   1097     stopCogsWithHardware :: IO ()
   1098     stopCogsWithHardware = do
   1099       m <- readTVarIO st.vMoment
   1100       for_ (keys m.val) $ \cogid ->
   1101         for_ machine.ctx.hw.table $ \d -> d.stop cogid
   1102 
   1103     -- We've completed a unit of work. Time to tell the logger about
   1104     -- the new state of the world.
   1105     exportState :: (Receipt, [STM OnCommitFlow]) -> STM ()
   1106     exportState (receipt, onCommit) = do
   1107         when (length onCommit > 0) do
   1108             writeTVar machine.logImmediately True
   1109         writeTBQueue machine.logReceiptQueue (receipt, onCommit)
   1110 
   1111     -- When a "tell" gets a response, it must be the only response in the
   1112     -- response bundle, so make sure they're handled separately.
   1113     separateTells :: [(CogId, CogSysCalls)] -> [(CogId, CogSysCalls)]
   1114     separateTells = loop
   1115       where
   1116         loop [] = []
   1117         loop ((id,calls):xs) = tellList ++ (id, rest) : loop xs
   1118           where
   1119             (tells, rest) = IM.partition isTell calls
   1120 
   1121             isTell (_, LiveTell{}) = True
   1122             isTell (_, _)          = False
   1123 
   1124             tellList = map mkEachTell $ IM.toList tells
   1125             mkEachTell (k, v) = (id, IM.singleton k v)
   1126 
   1127     -- TODO: Optimize
   1128     collectResponses :: IntMap (Maybe a) -> [(Int, a)]
   1129     collectResponses imap =
   1130         go [] (mapToList imap)
   1131       where
   1132         go !acc []                  = acc
   1133         go !acc ((k,Just v) : kvs)  = go ((k,v):acc) kvs
   1134         go !acc ((_,Nothing) : kvs) = go acc         kvs
   1135 
   1136     mkCogResponses :: [(CogId, CogSysCalls)]
   1137                    -> [STM (CogId, [(Int, ResponseTuple)])]
   1138     mkCogResponses = map build
   1139       where
   1140         build :: (CogId, CogSysCalls) -> STM (CogId, [(Int, ResponseTuple)])
   1141         build (k, sysCalls) = do
   1142            returns <- fmap collectResponses
   1143                     $ traverse (receiveResponse st)
   1144                     $ sysCalls
   1145 
   1146            when (null returns) retry
   1147 
   1148            pure (k, returns)
   1149 
   1150     -- Collects all responses that are ready for one cog.
   1151     takeReturns :: IO (Maybe (CogId, [(Int, ResponseTuple)]))
   1152     takeReturns = do
   1153         withAlwaysTrace "WaitForResponse" "cog" do
   1154             -- This is in a separate atomically block because it doesn't
   1155             -- change and we want to minimize contention.
   1156             machineSysCalls <-
   1157               (separateTells . mapToList) <$> atomically (readTVar st.vRequests)
   1158 
   1159             -- We now have a mapping from cog ids to the CogSysCalls map. We
   1160             -- need to verify if waiting on this will ever complete.
   1161             --
   1162             -- Note: This is a separate mechanism than CG_FINISHED; the
   1163             -- interpreter not understanding a request should trigger an
   1164             -- interpreter shutdown but should not trigger a %reap or %stop
   1165             -- visible finished state.
   1166             case machineHasLiveRequests machineSysCalls of
   1167                 False -> pure Nothing
   1168                 True -> do
   1169                   reordered <- shuffleM machineSysCalls
   1170                   -- debugText $ "takeReturns: " <> tshow reordered
   1171                   Just <$> (atomically $ asum $ mkCogResponses reordered)
   1172 
   1173     -- Every machineTick, we try to pick off as much work as possible for cogs
   1174     -- to do.
   1175     --
   1176     -- TODO: This is the simple way of doing things, there's a ton of runtime
   1177     -- gains that could be had at the cost of complexity by having a persistent
   1178     -- async for each Cog, but that adds some sync subtlety while this means
   1179     -- the machine is as fast as only the slowest cog, but is obviously correct.
   1180     machineTick :: IO ()
   1181     machineTick = takeReturns >>= \case
   1182         Nothing -> do
   1183             putStrLn "Shutting down..."
   1184             -- TODO: Check the state of the machine and print it here. Print if
   1185             -- there are timed out cogs. Print if there are cogs which we can't
   1186             -- run because of UNKNOWN requests. Print if all cogs exited
   1187             -- cleanly with no requests (or 0 requests).
   1188             pure ()
   1189         Just (cogId, valTuples) -> do
   1190             results <- cogTick (cogId, valTuples)
   1191             atomically $ do
   1192                 mapM_ exportState results
   1193                 readTVar st.vMoment >>= writeTVar machine.liveVar
   1194             machineTick
   1195 
   1196     -- Returns true if there are any valid, open LiveRequests that aren't
   1197     -- UNKNOWN. This is used to end the machine when we will never be able to
   1198     -- provide it with work.
   1199     machineHasLiveRequests :: [(CogId, CogSysCalls)] -> Bool
   1200     machineHasLiveRequests = any \(_, csc) -> cogHasLiveRequests csc
   1201 
   1202     cogHasLiveRequests :: CogSysCalls -> Bool
   1203     cogHasLiveRequests = any \(_, req) -> validLiveRequest req
   1204 
   1205     --
   1206     cogTick
   1207       :: (CogId, [(Int, ResponseTuple)])
   1208       -> IO [(Receipt, [STM OnCommitFlow])]
   1209     cogTick (cogId, valTuples) =
   1210       withProcessName processName $
   1211         withThreadName ("Cog: " <> (encodeUtf8 $ tshow cogId)) $ do
   1212           let endFlows = (.flow) . snd <$> valTuples
   1213           let traceArg = M.singleton "syscall indicies"
   1214                        $ Right
   1215                        $ tshow
   1216                        $ fmap fst valTuples
   1217 
   1218           withTracingFlow "Response" "cog" traceArg endFlows $ do
   1219             (flows, receipts) <- runResponse st cogId valTuples
   1220             pure (flows, [], receipts)
   1221 
   1222 {-
   1223     Given a set of responses to syscalls in a cogs SysCall table, create
   1224     a new event value and pass that into the cog, to get the new cog state.
   1225 
   1226     Side Effects:
   1227 
   1228     -   The PLAN value for the cog is replaced.
   1229 
   1230     -   The cog's requests row is updated to reflect the new set of
   1231         requests.
   1232 
   1233     -   If we received a COG_TELL message, the corresponding COG_ASK request
   1234         is also processed.
   1235 
   1236     -   If we spawned a cog or stopped a cog, we inform every hardware
   1237         device that this happened.
   1238 
   1239     Results:
   1240 
   1241     -   The set of profiling events triggered by this change.
   1242 
   1243     -   A list of event-log receipts (and the corresponding actions to
   1244         be triggered when those receipts have been committed to disk.
   1245         (This is for disk-synchronized syscalls).
   1246 -}
   1247 runResponse
   1248     :: Debug
   1249     => Runner
   1250     -> CogId
   1251     -> [(Int, ResponseTuple)]
   1252     -> IO ([Flow], [(Receipt, [STM OnCommitFlow])])
   1253 runResponse st cogNum rets = do
   1254   let arg = TAb
   1255           $ mapFromList
   1256           $ flip fmap rets
   1257           $ \(k,v) -> (NAT (fromIntegral k), responseToVal v.resp)
   1258   moment <- atomically (readTVar st.vMoment)
   1259   let fun = fromMaybe (error "Trying to run a stopped cog")
   1260           $ cogSpinningFun
   1261           $ fromMaybe (error "Invalid cogid")
   1262           $ lookup cogNum moment.val
   1263   (runtimeUs, result) <- do
   1264       withAlwaysTrace "Eval" "cog" do
   1265           let preEvaluate = map (ensureResponseEvaluated . (.resp) . snd) rets
   1266           evalWithTimeout thirtySecondsInMicroseconds preEvaluate fun arg
   1267 
   1268   let resultReceipt = case result of
   1269         TIMEOUT      -> RECEIPT_TIME_OUT{cogNum,timeoutAmount=runtimeUs}
   1270         CRASH op arg -> RECEIPT_CRASHED{..}
   1271         OKAY{}       -> makeOKReceipt cogNum rets
   1272 
   1273   let newState = case result of
   1274         OKAY _ resultFan -> case hasNonzeroReqs resultFan of
   1275                                 True  -> CG_SPINNING resultFan
   1276                                 False -> CG_FINISHED resultFan
   1277         CRASH op arg     -> CG_CRASHED op arg fun
   1278         TIMEOUT          -> CG_TIMEOUT runtimeUs fun
   1279 
   1280   withAlwaysTrace "Tick" "cog" do
   1281       let responses = ((.resp) . snd) <$> rets
   1282 
   1283       -- 1) Notify hardware about spins which started a cog. We have to do this
   1284       -- before we record the cog state or do parseRequests because the initial
   1285       -- state of a newly created cog might try to communicate with the
   1286       -- hardware.
   1287       traverse_ performAlertHardwareOnSpin responses
   1288 
   1289       -- 2) Perform parseRequest and handle all changes that have to be handled
   1290       -- attomically. This has to happen after we notify the hardware about
   1291       -- cogs being spun.
   1292       (sideEffects, startedFlows, onPersists) <- atomically $
   1293           performStateUpdate newState responses runtimeUs resultReceipt
   1294 
   1295       -- 3) If this cog shut down for any reason, we have to alert the hardware
   1296       -- that it has shut down. This has to be done after parsing requests
   1297       -- because parsing requests will cancel ongoing hardware events.
   1298       case newState of
   1299         CG_SPINNING _ -> pure ()
   1300         _             -> for_ st.ctx.hw.table $ \d -> d.stop cogNum
   1301 
   1302       -- 4) %cog %ask/%tell must execute in pairs. We finally synchronously
   1303       -- trigger any %ask callback now that the %tell has processed.
   1304       afterResults <- mapM (performAskAck resultReceipt) responses
   1305 
   1306       let (sideEffectFlows, sideEffectPersists) = concatUnzip sideEffects
   1307           (afterFlows, afterReceipts) = concatUnzip afterResults
   1308           receiptPairs = [ ( resultReceipt
   1309                            , onPersists ++ sideEffectPersists
   1310                            )
   1311                          ]
   1312                       ++ afterReceipts
   1313 
   1314       pure ( startedFlows ++ afterFlows ++ sideEffectFlows
   1315            , receiptPairs )
   1316 
   1317   where
   1318     delReq :: CogSysCalls -> Int -> CogSysCalls
   1319     delReq acc k = IM.delete k acc
   1320 
   1321     ensureResponseEvaluated :: Response -> IO ()
   1322     ensureResponseEvaluated (RespTell TELL_PAYLOAD{..}) = do
   1323       -- We must force evaluate the response thunk so that we make sure it
   1324       -- doesn't have a crash, but must do this while crediting the runtime
   1325       -- (and possible crash) to the %tell since the %tell provides the
   1326       -- function.
   1327       evaluate $ force ret
   1328       pure ()
   1329 
   1330     ensureResponseEvaluated _ = pure ()
   1331 
   1332     performAlertHardwareOnSpin :: Response -> IO ()
   1333     performAlertHardwareOnSpin (RespSpin newCogId _) = do
   1334         -- If we just spun up another cog, tell all the hardware
   1335         -- devices that it exists now.
   1336         for_ st.ctx.hw.table $ \d -> d.spin newCogId
   1337     performAlertHardwareOnSpin _ = pure ()
   1338 
   1339     performStateUpdate newState responses runtimeUs resultReceipt = do
   1340         modifyTVar' st.vMoment \m ->
   1341             MOMENT (insertMap cogNum newState m.val) (m.work + runtimeUs)
   1342 
   1343         sideEffects <- case resultReceipt of
   1344             RECEIPT_OK{..} -> do
   1345               -- Delete the consumed `LiveRequest`s without formally
   1346               -- canceling it because it completed.
   1347               modifyTVar' st.vRequests \tab ->
   1348                   adjustMap (\kals -> foldl' delReq kals (fst<$>rets))
   1349                             cogNum tab
   1350               mapM performSpinEffects responses
   1351             _ -> do
   1352               -- If we crashed, do nothing. We'll formally cancel all
   1353               -- requests when we reparse in the next step, and we have no
   1354               -- valid responses to process side effects from.
   1355               pure mempty
   1356 
   1357         (startedFlows, onPersists) <- parseRequests st cogNum
   1358 
   1359         pure (sideEffects, startedFlows, onPersists)
   1360 
   1361     performSpinEffects :: Response
   1362                        -> STM ([Flow], [STM OnCommitFlow])
   1363     performSpinEffects (RespSpin newCogId fun) = do
   1364         reqChannelPools <- newTVar mempty
   1365         modifyTVar' st.vAsks \reqs ->
   1366             M.insert newCogId reqChannelPools reqs
   1367         modifyTVar' st.vMoment \m ->
   1368             m { val=insertMap newCogId (CG_SPINNING fun) m.val }
   1369         parseRequests st newCogId
   1370 
   1371     performSpinEffects _ = pure mempty
   1372 
   1373     -- Performed outside the Big Atomically Block
   1374     performAskAck :: Receipt
   1375                    -> Response
   1376                    -> IO ([Flow], [(Receipt, [STM OnCommitFlow])])
   1377     performAskAck result (RespTell TELL_PAYLOAD{..}) = do
   1378         -- When an IPC message is received, the corresponding
   1379         -- ask syscall on the asking cog must also be processed
   1380         -- within the same transaction.
   1381         --
   1382         -- We do this recursively invoking runResponse for that
   1383         -- syscall as well.
   1384         --
   1385         -- We are sure that the `reqIdx` still corresponds to
   1386         -- an active COG_ASK syscall because of reasons.
   1387         let outcome = case result of
   1388               RECEIPT_OK{} -> OutcomeOK (fanIdx 0 ret) tellId
   1389               _            -> OutcomeCrash
   1390 
   1391         -- TODO: Enable profiling flows; how do we connect
   1392         -- the recv completing to this?
   1393         let rtup = RTUP { key  = reqIdx
   1394                         , resp = RespAsk outcome
   1395                         , work = 0
   1396                         , flow = FlowDisabled
   1397                         }
   1398         runResponse st asker [(reqIdx.int, rtup)]
   1399 
   1400     performAskAck _ _ = pure ([], [])
   1401 
   1402 
   1403 {-
   1404     Hack to avoid comparing SERV requests.
   1405 
   1406     It's not expensive to replace the SERV request, but it is expensive
   1407     to check them for equality.
   1408 
   1409     This causes SERV requests to just always be considered non-matching,
   1410     unless the two requests happen to be pointer-equals.
   1411 
   1412     TODO: Find a better solution
   1413 -}
   1414 keep :: Fan -> Fan -> Bool
   1415 keep x y =
   1416     case reallyUnsafePtrEquality# x y of
   1417         1# -> True
   1418         _  ->
   1419            case x of
   1420                ROW rs ->
   1421                    if length rs == 4 && rs!0 == "http" && rs!2 == "serv"
   1422                    then False
   1423                    else x==y
   1424                _ -> x==y
   1425 
   1426 
   1427 concatUnzip :: [([a], [b])] -> ([a], [b])
   1428 concatUnzip a = let (f, s) = unzip a
   1429                 in (concat f, concat s)
   1430 
   1431 parseAllRequests :: Debug => Runner -> STM ([Flow], [STM OnCommitFlow])
   1432 parseAllRequests runner = do
   1433   cogs <- (keys . (.val)) <$> readTVar runner.vMoment
   1434   actions <- forM cogs $ parseRequests runner
   1435   pure $ concatUnzip actions
   1436 
   1437 -- | Given a noun in a runner, update the `requests`
   1438 parseRequests :: Debug => Runner -> CogId -> STM ([Flow], [STM OnCommitFlow])
   1439 parseRequests runner cogId = do
   1440     cogState <- (fromMaybe (error "no cogid m") . lookup cogId . (.val)) <$>
   1441                 readTVar runner.vMoment
   1442     syscalls <- (fromMaybe mempty . lookup cogId) <$>
   1443                 readTVar runner.vRequests
   1444 
   1445     let init = PRS{syscalls, flows=[], onPersist=[]}
   1446 
   1447     st <- flip execStateT init do
   1448               let expected = syscalls
   1449               let actual   = case cogState of
   1450                     CG_CRASHED{}       -> mempty
   1451                     CG_TIMEOUT{}       -> mempty
   1452                     CG_FINISHED{}      -> mempty
   1453                     CG_SPINNING cogFun -> getCurrentReqNoun cogFun
   1454 
   1455               for_ (mapToList expected) \(i,(v,_)) ->
   1456                   case actual V.!? i of
   1457                       Nothing           -> cancelReq i
   1458                       Just w | keep v w -> pure ()
   1459                       Just w            -> cancelReq i >> createReq i w
   1460 
   1461               for_ (zip [0..] $ toList actual) \(i,v) ->
   1462                   unless (member i expected) do
   1463                       createReq i v
   1464 
   1465     modifyTVar' runner.vRequests $ insertMap cogId st.syscalls
   1466 
   1467     pure (st.flows, st.onPersist)
   1468 
   1469   where
   1470 
   1471     cancelReq :: Int -> StateT ParseRequestsState STM ()
   1472     cancelReq key = do
   1473         -- traceM ("cancelReq:" <> show key)
   1474         slot <- use (#syscalls % at key)
   1475         case slot of
   1476             Nothing -> error "cancelReq: impossible"
   1477             Just (_, liveReq) -> do
   1478                 modifying' #syscalls (deleteMap key)
   1479                 lift (cancelRequest liveReq)
   1480 
   1481     reqDebugSummary :: Request -> ByteString
   1482     reqDebugSummary = \case
   1483         ReqEval{}  -> "%eval"
   1484         ReqCall rc -> "%call " <> (encodeUtf8 $
   1485                           describeSyscall runner.ctx.hw rc.device rc.params)
   1486         ReqWhat{}  -> "%what"
   1487         ReqTell{}  -> "%tell"
   1488         ReqAsk{}   -> "%ask"
   1489         ReqSpin{}  -> "%spin"
   1490         ReqReap{}  -> "%reap"
   1491         ReqStop{}  -> "%stop"
   1492         ReqWait{}  -> "%wait"
   1493         ReqWho{}   -> "%who"
   1494         UNKNOWN{}  -> "UNKNOWN"
   1495 
   1496     flowCategory :: Request -> ByteString
   1497     flowCategory = \case
   1498         ReqEval{}  -> "%eval"
   1499         ReqCall rc -> "%call " <> (encodeUtf8 $
   1500                           syscallCategory runner.ctx.hw rc.device rc.params)
   1501         ReqWhat{}  -> "%what"
   1502         ReqTell{}  -> "%tell"
   1503         ReqAsk{}   -> "%ask"
   1504         ReqSpin{}  -> "%cog"
   1505         ReqReap{}  -> "%reap"
   1506         ReqStop{}  -> "%cog"
   1507         ReqWait{}  -> "%wait"
   1508         ReqWho{}   -> "%cog"
   1509         UNKNOWN{}  -> "UNKNOWN"
   1510 
   1511     createReq :: Int -> Fan -> StateT ParseRequestsState STM ()
   1512     createReq i v = do
   1513         let request = valToRequest cogId v
   1514             reqData = reqDebugSummary request
   1515             reqName = "Cog " <> (encodeUtf8 $ tshow cogId.int)
   1516                    <> " idx "
   1517                    <> encodeUtf8 (tshow i) <> ": " <> reqData
   1518             catData = flowCategory request
   1519         f <- lift $ allocateRequestFlow reqName (decodeUtf8 catData)
   1520 
   1521         let rIdx = RequestIdx i
   1522 
   1523         (startedFlows, live, onPersist) <- lift $
   1524           buildLiveRequest f runner runner.ctx cogId rIdx request
   1525 
   1526         case onPersist of
   1527             Nothing -> pure ()
   1528             Just cb -> modifying' #onPersist (cb:)
   1529 
   1530         modifying' #syscalls (insertMap i (v, live))
   1531         modifying' #flows  (++ (f:startedFlows))
   1532 
   1533 evalWithTimeout
   1534     :: Debug
   1535     => Nat
   1536     -> [IO ()]
   1537     -> Fan
   1538     -> Fan
   1539     -> IO (NanoTime, EvalOutcome)
   1540 evalWithTimeout msTimeout preActions fun arg = do
   1541   (runtime, raw) <- withCalcRuntime $ timeout (fromIntegral msTimeout) $ do
   1542     try doAllEvals >>= \case
   1543       Left (PRIMOP_CRASH op val) -> do debug ("crash"::Text, NAT op, val)
   1544                                        pure (Left (op,val))
   1545       Right f                    -> pure (Right f)
   1546   case raw of
   1547     Nothing          -> pure (runtime, TIMEOUT)
   1548     Just (Left(o,e)) -> pure (runtime, CRASH o e)
   1549     Just (Right v)   -> pure (runtime, OKAY runtime v)
   1550   where
   1551     doAllEvals = do
   1552       sequence_ preActions
   1553       evaluate $ force (fun %% arg)
   1554 
   1555 
   1556 evalCheckingCrash
   1557     :: Debug
   1558     => Fan
   1559     -> Fan
   1560     -> IO (NanoTime, EvalOutcome)
   1561 evalCheckingCrash fun arg = do
   1562   (runtime, raw) <- withCalcRuntime $ do
   1563     try (evaluate $ force (fun %% arg)) >>= \case
   1564       Left (PRIMOP_CRASH op val) -> do debug ("crash"::Text, NAT op, val)
   1565                                        pure (Left (op,val))
   1566       Right f                    -> pure (Right f)
   1567   case raw of
   1568     Left (o,e) -> pure (runtime, CRASH o e)
   1569     Right v    -> pure (runtime, OKAY runtime v)
   1570 
   1571 -- The EventLog Routine --------------------------------------------------------
   1572 
   1573 data LogNext = LOG_NEXT
   1574     { shutdown :: Bool
   1575     , batchNum :: BatchNum
   1576     , receipts :: [(Receipt, [STM OnCommitFlow])]
   1577     , moment   :: Moment
   1578     }
   1579 
   1580 {-
   1581     This is the per-cog event-log persistence logic that runs in
   1582     `loggerAsync`.
   1583 
   1584     Event logging is almost entirely asynchronous w.r.t ship execution,
   1585     the only exception is that some syscalls are "synchronous" and they
   1586     need to block until the state that triggered them has been committed
   1587     to disk.
   1588 
   1589     Like snapshotting, the decision of whether or not to write a log
   1590     batch depends on the amount of cpu-work has been performed since
   1591     the last batch.
   1592 
   1593     We also write a final log batch right before shutdown, if the daemon
   1594     receives a SIGKILL.  This isn't synchronized with snapshot routine,
   1595     so the final disk state will often be a snapshot with a single log
   1596     batch that needs to be replayed, but this should be cheap in practice.
   1597 
   1598     There is also the `logImmediately` flag, which will cause the log
   1599     to be written right away.  This is important for "synchronous" events,
   1600     since we want to cause those to block for as little time as possible.
   1601 -}
   1602 logFun :: Debug => Machine -> IO ()
   1603 logFun machine =
   1604     loop
   1605   where
   1606     loop :: IO ()
   1607     loop = do
   1608         next <- withAlwaysTrace "Block" "log" (atomically getNextBatch)
   1609 
   1610         -- It's possible for next.receipts to be empty if a shutdown
   1611         -- is requested and no events have happened since the last batch
   1612         -- was written.
   1613         withTracingFlow "Log" "log" mempty [] $ do
   1614             if null next.receipts then do
   1615                 pure ([], [], ())
   1616             else do
   1617                 (startFlows, stepFlows) <- doBatch next
   1618                 pure (startFlows, stepFlows, ())
   1619 
   1620         case next.shutdown of
   1621           True  -> atomically $ writeTVar machine.shutdownSnapshot True
   1622           False -> loop
   1623 
   1624     -- Either reads a shutdown command or the next batch of work to log.
   1625     getNextBatch :: STM LogNext
   1626     getNextBatch = do
   1627         shutdown             <- readTVar machine.shutdownLogger
   1628         live                 <- readTVar machine.liveVar
   1629         (lastBatchNum, writ) <- readTVar machine.writVar
   1630         forcedLog            <- readTVar machine.logImmediately
   1631         fullQ                <- isFullTBQueue machine.logReceiptQueue
   1632 
   1633         -- TODO: IIUC "workNs" is currently actually just the runtime.
   1634         -- The idea was that we would use "total execution time" to
   1635         -- trigger snapshots, in order to bound the amount of time needed
   1636         -- to replay.  This makes no sense for log batches, and the
   1637         -- total-runtime makes no sense for snapshots.  Shit is working,
   1638         -- however, so there's no need to changes this atm.
   1639 
   1640         let timedLog  = live.work > (writ.work + logbatchWorkIntervalInNs)
   1641         let shouldLog = shutdown || forcedLog || fullQ || timedLog
   1642 
   1643         unless shouldLog retry
   1644 
   1645         when forcedLog (writeTVar machine.logImmediately False)
   1646 
   1647         receipts <- flushTBQueue machine.logReceiptQueue
   1648         moment   <- readTVar machine.liveVar
   1649 
   1650         let batchNum = lastBatchNum + 1
   1651         pure LOG_NEXT{shutdown, receipts, moment, batchNum}
   1652 
   1653     doBatch :: LogNext -> IO ([Flow], [Flow])
   1654     doBatch next = do
   1655         let (receipts, onCommit) = unzip next.receipts
   1656 
   1657         now <- getNanoTime
   1658 
   1659         writeLogBatch machine.ctx.lmdb $
   1660             LogBatch
   1661                 { batchNum  = next.batchNum
   1662                 , writeTime = now
   1663                 , executed  = receipts
   1664                 }
   1665 
   1666         atomically $ writeTVar machine.writVar (next.batchNum, next.moment)
   1667 
   1668         -- Perform the associated STM actions that were supposed to be
   1669         -- run once committed.  This is used for synchronous syscalls
   1670         -- which need to block until we have committed to the state that
   1671         -- triggered them.
   1672         (starts, steps) <- unzipOnCommitFlows <$>
   1673             traverse atomically (join onCommit)
   1674         pure (starts, steps)
   1675 
   1676 
   1677 -- The Snapshotting Routine ----------------------------------------------------
   1678 
   1679 {-
   1680     The is the per-machine snapshotting logic that runs in `snapshotAsync`.
   1681 
   1682     Snapshotting is fully asynchronous.  It doesn't block normal
   1683     execution, so we allow it to lag behind the current state.
   1684 
   1685     When we replay, we load the latest snapshot, and then we find all
   1686     of the log batches that come after that point.
   1687 
   1688     Here, we track the mount of CPU-work that will be required to replay those
   1689     batches, and we wait until that reaches a certain threshold before
   1690     taking a snapshot.
   1691 
   1692     We also take a snapshot right before shutdown, when the daemon receives a
   1693     SIGKILL.
   1694 -}
   1695 snapshotFun :: Debug => Machine -> IO ()
   1696 snapshotFun machine =
   1697     when machine.ctx.enableSnaps do
   1698         loop 0
   1699   where
   1700     loop lastSnapWork = do
   1701         let nextSnapWork = lastSnapWork + snapshotWorkIntervalInNs
   1702 
   1703         (shutdown, bn, moment) <-
   1704             withAlwaysTrace "Block" "log" $
   1705             atomically do
   1706                 shutdown           <- readTVar machine.shutdownSnapshot
   1707                 (logBN, logMoment) <- readTVar machine.writVar
   1708 
   1709                 let shouldSnap = shutdown || logMoment.work > nextSnapWork
   1710 
   1711                 unless shouldSnap retry
   1712 
   1713                 pure (shutdown, logBN, logMoment)
   1714 
   1715         withAlwaysTrace "Snapshot" "log" do
   1716             writeMachineSnapshot machine.ctx.lmdb bn moment.val
   1717 
   1718         unless shutdown do
   1719             loop moment.work