Machine.hs (62457B)
1 -- Copyright 2023 The Plunder Authors 2 -- Use of this source code is governed by a BSD-style license that can be 3 -- found in the LICENSE file. 4 5 {- 6 TOPLEVEL TODOS: 7 8 - TODO: `shutdownMachine` works for now, but what we really want is a way 9 to hit ctrl-c once to begin shutdown and then have a second ctrl-c 10 which aborts snapshotting. 11 -} 12 13 {-# LANGUAGE NoFieldSelectors #-} 14 {-# LANGUAGE Strict #-} 15 {-# LANGUAGE StrictData #-} 16 {-# OPTIONS_GHC -Wall #-} 17 {-# OPTIONS_GHC -Werror #-} 18 {-# OPTIONS_GHC -Wno-name-shadowing #-} 19 20 module Server.Machine 21 ( Machine(..) 22 , MachineContext(..) 23 , Moment(..) 24 , performReplay 25 , replayAndCrankMachine 26 , shutdownMachine 27 ) 28 where 29 30 import PlunderPrelude 31 32 import Control.Monad.State (StateT, execStateT, modify', runStateT) 33 import Fan (Fan(..), PrimopCrash(..), fanIdx, (%%)) 34 import GHC.Conc (unsafeIOToSTM) 35 import GHC.Prim (reallyUnsafePtrEquality#) 36 import Optics (set) 37 import Server.Convert () 38 import System.Random (randomIO) 39 import System.Random.Shuffle (shuffleM) 40 41 import qualified GHC.Natural as GHC 42 43 import Data.Sorted 44 import Fan.Convert 45 import Fan.Prof 46 import Server.Common 47 import Server.Debug 48 import Server.Evaluator 49 import Server.Hardware.Types 50 import Server.LmdbStore 51 import Server.Time 52 import Server.Types.Logging 53 54 import qualified Data.IntMap as IM 55 import qualified Data.Map as M 56 import qualified Data.Set as S 57 import qualified Data.Vector as V 58 import qualified Server.AgentMachine as AgentMachine 59 60 -------------------------------------------------------------------------------- 61 62 receiptQueueMax :: GHC.Natural 63 receiptQueueMax = 65536 64 65 66 -------------------------------------------------------------------------------- 67 68 data Moment = MOMENT { 69 val :: Map CogId CogState, 70 work :: NanoTime 71 } 72 73 data MachineContext = MACHINE_CONTEXT 74 { lmdb :: LmdbStore 75 , hw :: DeviceTable 76 , eval :: Evaluator 77 , enableSnaps :: Bool 78 , agentMach :: AgentMachine.Machine 79 } 80 81 -- | Flow report for an onCommit call. 82 data OnCommitFlow = OnCommitFlow 83 { step :: Flow -- ^ This flow caused this commit. 84 , starts :: [Flow] -- ^ This commit started these flows. 85 } 86 87 unzipOnCommitFlows :: [OnCommitFlow] -> ([Flow], [Flow]) 88 unzipOnCommitFlows onCommits = ( map (.step) onCommits 89 , join $ map (.starts) onCommits) 90 91 -- | Toplevel handle for a Machine, a group of individual Cog fan evaluation 92 -- that makes requests and receives responses and which share an event log. 93 -- 94 -- Internally, machine execution has three long lived asyncs: a `Runner` async, 95 -- a `Logger` async, and a `Snapshotter` async. These communicate and are 96 -- controlled via the following STM variables: 97 data Machine = MACHINE { 98 ctx :: MachineContext, 99 100 -- The three phases: 101 runnerAsync :: Async (), 102 loggerAsync :: Async (), 103 snapshotAsync :: Async (), 104 105 -- Signal to shut down all the asyncs. After setting this, you should wait on 106 -- all three in order. 107 shutdownLogger :: TVar Bool, 108 shutdownSnapshot :: TVar Bool, 109 110 -- Starts empty, but is filled once the machine has shutdown and the 111 shutdownComplete :: TMVar (), 112 113 -- Noun state and time of the last commit at each of the three phases. 114 liveVar :: TVar Moment, 115 writVar :: TVar (BatchNum, Moment), 116 117 -- | Command to log immediately (because of a pending add) 118 logImmediately :: TVar Bool, 119 logReceiptQueue :: TBQueue (Receipt, [STM OnCommitFlow]) 120 } 121 122 oneSecondInNs :: NanoTime 123 oneSecondInNs = NanoTime $ 10 ^ (9::Int) 124 125 -- Configuration for 126 logbatchWorkIntervalInNs, snapshotWorkIntervalInNs :: NanoTime 127 logbatchWorkIntervalInNs = oneSecondInNs 128 snapshotWorkIntervalInNs = oneSecondInNs * 45 129 130 -- twoSecondsInMicroseconds :: Nat 131 -- twoSecondsInMicroseconds = 2 * 10 ^ (6::Int) 132 133 thirtySecondsInMicroseconds :: Nat 134 thirtySecondsInMicroseconds = 30 * 10 ^ (6::Int) 135 136 data TellOutcome 137 = OutcomeOK Fan TellId 138 | OutcomeCrash 139 deriving (Show) 140 141 data Response 142 = RespEval EvalOutcome 143 | RespCall Fan SysCall 144 | RespWhat (Set Nat) 145 | RespTell TellPayload 146 | RespAsk TellOutcome 147 | RespSpin CogId Fan 148 | RespReap CogId (Maybe CogState) 149 | RespStop CogId (Maybe CogState) 150 | RespWait CogId 151 | RespWho CogId 152 deriving (Show) 153 154 responseToVal :: Response -> Fan 155 responseToVal (RespCall f _) = f 156 responseToVal (RespWhat w) = toNoun w 157 responseToVal (RespTell TELL_PAYLOAD{..}) = fanIdx 1 ret 158 responseToVal (RespAsk (OutcomeOK val _)) = (NAT 0) %% val 159 responseToVal (RespAsk OutcomeCrash) = (NAT 0) 160 responseToVal (RespSpin (COG_ID id) _) = fromIntegral id 161 responseToVal (RespReap _ f) = toNoun f 162 responseToVal (RespStop _ f) = toNoun f 163 responseToVal (RespWait _) = (NAT 0) 164 responseToVal (RespWho (COG_ID id)) = fromIntegral id 165 responseToVal (RespEval e) = 166 ROW case e of 167 TIMEOUT -> mempty -- [] 168 OKAY _ r -> rowSingleton r -- [f] 169 CRASH n e -> arrayFromListN 2 [NAT n, e] -- [n e] 170 171 responseToReceiptItem :: (Int, ResponseTuple) -> (Int, ReceiptItem) 172 responseToReceiptItem (idx, tup) = case tup.resp of 173 RespEval OKAY{} -> (idx, ReceiptEvalOK) 174 RespSpin cog _ -> (idx, ReceiptSpun cog) 175 RespReap cog _ -> (idx, ReceiptReap cog) 176 RespStop cog _ -> (idx, ReceiptStop cog) 177 RespTell TELL_PAYLOAD{..} -> (idx, ReceiptTell{..}) 178 RespAsk (OutcomeOK _ tellid) -> (idx, ReceiptAsk tellid) 179 resp -> (idx, ReceiptVal (responseToVal resp)) 180 181 makeOKReceipt :: CogId -> [(Int, ResponseTuple)] -> Receipt 182 makeOKReceipt cogId = 183 RECEIPT_OK cogId . mapFromList . fmap responseToReceiptItem 184 185 data CallRequest = CR 186 { durable :: Bool 187 , device :: DeviceName 188 , params :: Vector Fan 189 } 190 deriving (Show) 191 192 data SpinRequest = SR 193 { cogFun :: Fan 194 } 195 deriving (Show) 196 197 data AskRequest = ASKR 198 { cogDst :: CogId 199 , channel :: Word64 200 , msg :: Fan 201 } 202 deriving (Show) 203 204 data Request 205 = ReqEval EvalRequest 206 | ReqCall CallRequest 207 | ReqWhat (Set Nat) 208 | ReqTell Word64 Fan 209 | ReqAsk AskRequest 210 | ReqSpin SpinRequest 211 | ReqReap CogId 212 | ReqStop CogId 213 | ReqWait CogId 214 | ReqWho 215 | UNKNOWN Fan 216 deriving (Show) 217 218 {- 219 [%eval timeout/@ fun/fan arg/Fan ...] 220 [%cog %spin fun/Fan] 221 [%cog %ask dst/Nat chan/Nat param/Fan] 222 [%cog %tell chan/Nat fun/Fan] 223 [%cog %wait dst/Nat] 224 [$call synced/? param/Fan ...] 225 -} 226 valToRequest :: CogId -> Fan -> Request 227 valToRequest cogId top = fromMaybe (UNKNOWN top) do 228 row <- getRowVec top 229 tag <- fromNoun @DeviceName (row V.! 0) 230 231 if tag == "eval" then do 232 nat <- fromNoun @Nat (row V.! 1) 233 let timeoutSecs = nat 234 let timeoutMs = timeoutSecs * 1000 235 guard (length row >= 4) 236 let func = row V.! 2 237 let args = V.drop 3 row 238 let flow = FlowDisabled -- TODO: What? 239 let er = EVAL_REQUEST{func,args,cogId,flow,timeoutMs} 240 pure (ReqEval er) 241 -- e (ReqEval $ error "_" timeoutSecs (row V.! 2) (V.drop 3 row)) 242 else if tag == "what" then do 243 what <- fromNoun @(Set Nat) (row V.! 1) 244 pure $ ReqWhat what 245 else if tag == "cog" then do 246 nat <- fromNoun @Nat (row V.! 1) 247 case nat of 248 "spin" | length row == 3 -> pure (ReqSpin $ SR $ row V.! 2) 249 "reap" | length row == 3 -> ReqReap <$> fromNoun @CogId (row V.! 2) 250 "stop" | length row == 3 -> ReqStop <$> fromNoun @CogId (row V.! 2) 251 "wait" | length row == 3 -> ReqWait <$> fromNoun @CogId (row V.! 2) 252 "tell" | length row == 4 -> do 253 channel <- fromNoun (row V.! 2) 254 pure (ReqTell channel (row V.! 3)) 255 "ask" | length row == 5 -> do 256 dst <- fromNoun @CogId (row V.! 2) 257 channel <- fromNoun (row V.! 3) 258 pure (ReqAsk (ASKR dst channel (row V.! 4))) 259 "who" | length row == 2 -> pure ReqWho 260 _ -> Nothing 261 else do 262 _ <- guard (length row >= 3) 263 nat <- fromNoun @Nat (row V.! 1) 264 case nat of 265 0 -> pure (ReqCall $ CR False tag $ V.drop 2 row) 266 1 -> pure (ReqCall $ CR True tag $ V.drop 2 row) 267 _ -> Nothing 268 269 getCurrentReqNoun :: Fan -> Vector Fan 270 getCurrentReqNoun s = do 271 case s of 272 KLO _ xs -> do 273 let len = sizeofSmallArray xs 274 case (xs .! (len-1)) of 275 ROW x -> V.fromArray x 276 _ -> mempty 277 _ -> mempty 278 279 -- A request noun is empty if it is a row with a nonzero value. 280 hasNonzeroReqs :: Fan -> Bool 281 hasNonzeroReqs = any (/= NAT 0) . getCurrentReqNoun 282 283 data EvalCancelledError = EVAL_CANCELLED 284 deriving (Exception, Show) 285 286 -- | A list of parsed out valid requests from `noun`. For every cog, for every 287 -- index in that cog's requests table, there is a raw fan value and a 288 -- `LiveRequest` which contains STM variables to listen 289 type CogSysCalls = IntMap (Fan, LiveRequest) 290 type MachineSysCalls = Map CogId CogSysCalls 291 292 -- | The PendingAsk contains a pointer to an %asking request, along with its 293 -- value. It exists so we can quickly look up its %msg at execution time from a 294 -- pool of open asks. 295 data PendingAsk = PENDING_ASK 296 { requestor :: CogId 297 , reqIdx :: RequestIdx 298 , msg :: Fan 299 } 300 deriving (Show) 301 302 -- | All the information needed for both 303 data TellPayload = TELL_PAYLOAD 304 { asker :: CogId 305 , reqIdx :: RequestIdx 306 307 -- The locally unique tellId for this apply. Written to the event log. 308 , tellId :: TellId 309 310 -- The value produced by running the tell function against its input. It is 311 -- deliberately left lazy since they have to be created inside an STM 312 -- action in `receiveResponse`, but must be evaluate/forced and credited 313 -- against the tell's timeout. (We previously also separated the value here 314 -- into its head and tail with `fanIdx` instead of just storing the full 315 -- result, but that causes some partial evaluation and can crash the 316 -- interpreter if the first statement is a `trk`.) 317 , ret :: ~Fan 318 } 319 deriving (Show) 320 321 -- This is a collection of all {PendingAsk}s within all the cogs of a 322 -- machine on all channels. This is part of the top-level Machine STM state, 323 -- because these IPC interactions need happen transactionally. 324 type CogChannelPool a = Map CogId (TVar (ChannelPool a)) 325 326 type ChannelPool a = Map Word64 (TVar (Pool a)) 327 328 channelPoolRegister :: Word64 -> TVar (ChannelPool a) -> a 329 -> STM Int 330 channelPoolRegister channel vChannels psr = do 331 channels <- readTVar vChannels 332 pool <- case M.lookup channel channels of 333 Just pool -> pure pool 334 Nothing -> do 335 pool <- newTVar emptyPool 336 modifyTVar' vChannels $ insertMap channel pool 337 pure pool 338 poolRegister pool psr 339 340 -- Given a channel number, takes an item from that pool, cleaning up empty 341 -- channel pools. 342 -- 343 -- Unlike `readPool`, this never retries and returns a Maybe instead because 344 -- retrying during `receiveResponse` can cause more widespread blockage. 345 channelPoolTake :: Word64 -> TVar (ChannelPool a) -> STM (Maybe a) 346 channelPoolTake channel vChannels = do 347 channels <- readTVar vChannels 348 case lookup channel channels of 349 Nothing -> pure Nothing 350 Just vPool -> do 351 pool <- readTVar vPool 352 case IM.minView pool.tab of 353 Nothing -> error "Pool didn't get cleaned when empty?" 354 Just (x, xs) -> do 355 case null xs of 356 True -> modifyTVar' vChannels $ M.delete channel 357 False -> writeTVar vPool (set #tab xs $ pool) 358 pure $ Just x 359 360 channelPoolUnregister :: Word64 -> TVar (ChannelPool a) -> Int -> STM () 361 channelPoolUnregister channel vChannels poolId = 362 do 363 channels <- readTVar vChannels 364 case lookup channel channels of 365 Nothing -> pure () 366 Just vPool -> do 367 empty <- stateTVar vPool $ \pool -> 368 let newPool = over #tab (deleteMap poolId) pool 369 in (null newPool.tab, newPool) 370 371 when empty $ do 372 modifyTVar' vChannels $ deleteMap channel 373 374 -- | Data used only by the Runner async. This is all the data needed to 375 -- run the main thread of Fan evaluation and start Requests that it made. 376 data Runner = RUNNER 377 { ctx :: MachineContext 378 , vMoment :: TVar Moment -- ^ Current value + cumulative CPU time 379 , vRequests :: TVar MachineSysCalls -- ^ Current requests table 380 381 , vTellId :: TVar Int 382 383 , vAsks :: TVar (CogChannelPool PendingAsk) 384 } 385 386 -- ----------------------------------------------------------------------- 387 388 -- An effect that happens atomically after a Response is processed by a cog and 389 -- didn't crash. 390 data CogReplayEffect 391 = CSpin 392 { reCogId :: CogId 393 , reFun :: Fan 394 } 395 | CStop 396 { reCogId :: CogId } 397 | CTell 398 { tellId :: TellId 399 , tell :: Fan 400 } 401 | CAsk 402 { tellId :: TellId 403 } 404 deriving (Show) 405 406 data ResponseTuple = RTUP 407 { key :: RequestIdx 408 , resp :: Response 409 , work :: NanoTime 410 , flow :: Flow 411 } 412 deriving (Show) 413 414 -- | A `LiveRequest` is a handle that could produce a Response to an open 415 -- Request. 416 data LiveRequest 417 = LiveEval { 418 leIdx :: RequestIdx, 419 leRecord :: Evaluation 420 } 421 | LiveCall { 422 lcIdx :: RequestIdx, 423 lcCancel :: Cancel, 424 lcSysCall :: SysCall 425 } 426 | LiveWhat { 427 lwhIdx :: RequestIdx, 428 lwhCog :: Set Nat, 429 lwhRuntime :: Set Nat 430 } 431 | LiveTell { 432 ltCogId :: CogId, 433 ltIdx :: RequestIdx, 434 ltChannel :: Word64, 435 ltChannels :: TVar (ChannelPool PendingAsk), 436 ltFun :: Fan 437 } 438 | LiveAsk { 439 lcrChannel :: Word64, 440 lcrPoolId :: Int, 441 lcrChannels :: TVar (ChannelPool PendingAsk) 442 } 443 | LiveSpin { 444 lsIdx :: RequestIdx, 445 lsFun :: Fan 446 } 447 | LiveReap { 448 lrIdx :: RequestIdx, 449 lrCogId :: CogId 450 } 451 | LiveStop { 452 lstIdx :: RequestIdx, 453 lstCogId :: CogId 454 } 455 | LiveWait { 456 lwaIdx :: RequestIdx, 457 lwaCogId :: CogId 458 } 459 | LiveWho { 460 lwIdx :: RequestIdx, 461 lwCogId :: CogId 462 } 463 | LiveUnknown 464 465 instance Show LiveRequest where 466 show = \case 467 LiveEval{} -> "EVAL" 468 LiveCall{} -> "CALL" 469 LiveWhat{} -> "WHAT" 470 LiveTell{} -> "TELL" 471 LiveAsk{} -> "ASK" 472 LiveSpin{} -> "SPIN" 473 LiveReap{} -> "REAP" 474 LiveStop{} -> "STOP" 475 LiveWait{} -> "WAIT" 476 LiveWho{} -> "WHO" 477 LiveUnknown -> "UNKNOWN" 478 479 validLiveRequest :: LiveRequest -> Bool 480 validLiveRequest LiveUnknown = False 481 validLiveRequest _ = True 482 483 data ParseRequestsState = PRS 484 { syscalls :: CogSysCalls 485 , flows :: [Flow] 486 , onPersist :: [STM OnCommitFlow] 487 } 488 489 makeFieldLabelsNoPrefix ''MachineContext 490 makeFieldLabelsNoPrefix ''Moment 491 makeFieldLabelsNoPrefix ''Runner 492 makeFieldLabelsNoPrefix ''ParseRequestsState 493 494 -- ----------------------------------------------------------------------- 495 -- No template haskell beyond this point because optics. 496 -- ----------------------------------------------------------------------- 497 498 tripleToPair :: (a, b, c) -> (a, b) 499 tripleToPair (a, b, _) = (a, b) 500 501 third :: (a, b, c) -> c 502 third (_, _, c) = c 503 504 type ReconstructedEvals = [(Fan, Fan, Maybe CogReplayEffect)] 505 506 type Tells = Map TellId Fan 507 508 recomputeEvals 509 :: Moment 510 -> Tells 511 -> CogId 512 -> IntMap ReceiptItem 513 -> StateT NanoTime IO ReconstructedEvals 514 recomputeEvals m tells cogId tab = 515 for (mapToList tab) \(idx, rVal) -> do 516 let k = toNoun (fromIntegral idx :: Word) 517 case rVal of 518 ReceiptVal val -> pure (k, val, Nothing) 519 ReceiptEvalOK -> do 520 -- We performed an eval which succeeded the 521 -- first time. 522 case getEvalFunAt m cogId (RequestIdx idx) of 523 Nothing -> 524 throwIO INVALID_OK_RECEIPT_IN_LOGBATCH 525 Just (fun, args) -> do 526 (runtime, res) <- withCalcRuntime do 527 evaluate $ force (foldl' (%%) fun args) 528 modify' (+ runtime) 529 pure (k, ROW (rowSingleton res), Nothing) 530 ReceiptSpun newCogId -> do 531 case getSpinFunAt m cogId (RequestIdx idx) of 532 Nothing -> 533 throwIO $ INVALID_SPUN_RECEIPT_IN_LOGBATCH newCogId 534 Just fun -> do 535 let ef = CSpin newCogId fun 536 pure (k, NAT $ fromIntegral newCogId.int, Just ef) 537 ReceiptTell{..} -> do 538 -- We have to retrieve data from both the ask and the tell 539 case (getAskFunAt m asker cogId reqIdx, 540 getTellFunAt m cogId (RequestIdx idx)) of 541 (Just ask, Just tellFun) -> do 542 let ret = tellFun %% (toNoun asker) %% ask 543 askResp = fanIdx 0 ret 544 tellResp = fanIdx 1 ret 545 ef = CTell tellId askResp 546 pure (k, tellResp, Just ef) 547 _ -> 548 throwIO $ INVALID_TELL_RECEIPT_IN_LOGBATCH 549 ReceiptAsk{..} -> do 550 case lookup tellId tells of 551 Nothing -> 552 throwIO $ INVALID_ASK_RECEIPT_IN_LOGBATCH 553 Just askResponse -> 554 pure (k, NAT 0 %% askResponse, Just $ CAsk tellId) 555 ReceiptReap{..} -> do 556 case M.lookup cogNum m.val of 557 Nothing -> 558 throwIO INVALID_REAP_RECEIPT_IN_LOGBATCH 559 Just CG_SPINNING{} -> 560 throwIO INVALID_REAP_RECEIPT_IN_LOGBATCH 561 Just val -> do 562 let ef = CStop cogNum 563 pure (k, (NAT 0) %% toNoun val, Just ef) 564 ReceiptStop{..} -> do 565 case M.lookup cogNum m.val of 566 Nothing -> 567 pure (k, NAT 0, Nothing) 568 Just val -> do 569 let ef = CStop cogNum 570 pure (k, (NAT 0) %% toNoun val, Just ef) 571 where 572 getEvalFunAt :: Moment -> CogId -> RequestIdx -> Maybe (Fan, Vector Fan) 573 getEvalFunAt m cogId idx = withRequestAt m cogId idx $ \case 574 ReqEval er -> Just (er.func, er.args) 575 _ -> Nothing 576 577 578 getSpinFunAt :: Moment -> CogId -> RequestIdx -> Maybe Fan 579 getSpinFunAt m cogId idx = withRequestAt m cogId idx $ \case 580 ReqSpin (SR fun) -> Just fun 581 _ -> Nothing 582 583 getAskFunAt :: Moment -> CogId -> CogId -> RequestIdx -> Maybe Fan 584 getAskFunAt m sender receiver idx = withRequestAt m sender idx $ \case 585 ReqAsk (ASKR cogDst _channel msg) 586 | cogDst == receiver -> Just msg 587 | otherwise -> Nothing 588 _ -> Nothing 589 590 getTellFunAt :: Moment -> CogId -> RequestIdx -> Maybe Fan 591 getTellFunAt m sender idx = withRequestAt m sender idx $ \case 592 ReqTell _channel fun -> Just fun 593 _ -> Nothing 594 595 withRequestAt :: Moment -> CogId -> RequestIdx -> (Request -> Maybe a) 596 -> Maybe a 597 withRequestAt m cogId (RequestIdx idx) fun = do 598 case lookup cogId m.val of 599 Nothing -> Nothing 600 Just CG_CRASHED{} -> Nothing 601 Just CG_TIMEOUT{} -> Nothing 602 Just CG_FINISHED{} -> Nothing 603 Just (CG_SPINNING cog) -> do 604 let row = getCurrentReqNoun cog 605 case row V.!? idx of 606 Nothing -> Nothing 607 Just val -> fun $ valToRequest cogId val 608 609 -- | Loads and the last snapshot from disk, and loops over the event loop 610 -- (starting from that event) and processes each Receipt from each LogBatch. 611 -- 612 -- At the end, we return the current batch number and the computed Moment 613 -- (the current state of a running cog). 614 performReplay 615 :: Debug 616 => Cushion 617 -> MachineContext 618 -> ReplayFrom 619 -> IO (BatchNum, Moment) 620 performReplay cache ctx replayFrom = do 621 let mkInitial (a, b) = (b, (MOMENT a 0, mempty)) 622 (bn, (m, _)) <- loadLogBatches replayFrom mkInitial doBatch ctx.lmdb cache 623 pure (bn, m) 624 where 625 doBatch :: (BatchNum, (Moment, Tells)) -> LogBatch 626 -> IO (BatchNum, (Moment, Tells)) 627 doBatch (_, momentTells) LogBatch{batchNum,executed} = 628 loop batchNum momentTells executed 629 630 loop :: BatchNum -> (Moment, Tells) -> [Receipt] 631 -> IO (BatchNum, (Moment, Tells)) 632 loop bn mt [] = pure (bn, mt) 633 loop bn (m, t) (x:xs) = 634 let mybCogFun = case M.lookup x.cogNum m.val of 635 Nothing -> Nothing 636 Just x -> cogSpinningFun x 637 in case x of 638 RECEIPT_TIME_OUT{..} -> do 639 -- Evaluating this bundle of responses timed out the first time 640 -- we ran it, so just don't run it and just recreate the 641 -- timed-out state from the recorded timeout amount. 642 case mybCogFun of 643 Nothing -> throwIO INVALID_TIMEOUT_IN_LOGBATCH 644 Just fun -> do 645 let cog = CG_TIMEOUT timeoutAmount fun 646 pure (bn, (m { val = M.insert x.cogNum cog m.val}, t)) 647 648 RECEIPT_CRASHED{..} -> do 649 -- While it's deterministic whether a plunder computation 650 -- crashes or not, its crash value is not deterministic so we 651 -- must reconstitute the crash from the recorded result. 652 case mybCogFun of 653 Nothing -> throwIO INVALID_CRASHED_IN_LOGBATCH 654 Just final -> do 655 let cog = CG_CRASHED{op,arg,final} 656 pure (bn, (m { val = M.insert x.cogNum cog m.val}, t)) 657 658 RECEIPT_OK{..} -> do 659 -- The original run succeeded, rebuild the value from the 660 -- receipt items. 661 (eRes, eWork) <- runStateT (recomputeEvals m t cogNum inputs) 0 662 663 let arg = TAb $ mapFromList $ map tripleToPair eRes 664 665 let runEffect :: (Moment, Tells) -> Maybe CogReplayEffect 666 -> IO (Moment, Tells) 667 runEffect (m, t) = \case 668 Just CSpin{..} -> pure $ (m { 669 val = M.insert reCogId (CG_SPINNING reFun) m.val }, 670 t) 671 Just CStop{..} -> pure $ (m { 672 val = M.delete reCogId m.val }, t) 673 Just CTell{..} -> pure $ (m, M.insert tellId tell t) 674 Just CAsk{..} -> pure $ (m, M.delete tellId t) 675 Nothing -> pure (m, t) 676 677 (m, t) <- foldlM runEffect (m, t) (map third eRes) 678 679 fun <- case mybCogFun of 680 Nothing -> throwIO INVALID_COGID_IN_LOGBATCH 681 Just fun -> pure fun 682 683 (iWork, outcome) <- evalCheckingCrash fun arg 684 685 newVal <- case outcome of 686 TIMEOUT -> 687 error "performReplay: impossible timeout on replay" 688 CRASH o e -> pure $ CG_CRASHED o e fun 689 OKAY _ result -> pure $ CG_SPINNING result 690 691 let newMoment = MOMENT 692 { work = m.work + eWork + iWork 693 , val = insertMap x.cogNum newVal m.val 694 } 695 696 loop bn (newMoment, t) xs 697 698 -- | Main entry point for starting running a Cog. 699 replayAndCrankMachine 700 :: Debug 701 => Cushion 702 -> MachineContext 703 -> ReplayFrom 704 -> IO Machine 705 replayAndCrankMachine cache ctx replayFrom = do 706 let threadName = encodeUtf8 "Main" 707 708 withProcessName processName (wrapReplay threadName) 709 where 710 processName = "Machine" 711 712 wrapReplay :: ByteString -> IO Machine 713 wrapReplay threadName = do 714 withThreadName threadName $ do 715 setThreadSortIndex (-3) 716 withTracingFlow "Replay" "machine" mempty [] $ do 717 (batchNum, moment) <- performReplay cache ctx replayFrom 718 719 debugText $ "REPLAY TIME: " <> tshow moment.work <> " ns" 720 721 (machine, flows) <- buildMachine threadName batchNum moment 722 pure (flows, [], machine) 723 724 buildMachine 725 :: ByteString 726 -> BatchNum 727 -> Moment 728 -> IO (Machine, [Flow]) 729 buildMachine threadName lastBatch moment = do 730 shutdownLogger <- newTVarIO False 731 shutdownSnapshot <- newTVarIO False 732 shutdownComplete <- newEmptyTMVarIO 733 liveVar <- newTVarIO moment 734 writVar <- newTVarIO (lastBatch, moment) 735 logImmediately <- newTVarIO False 736 logReceiptQueue <- newTBQueueIO receiptQueueMax 737 738 runner <- atomically do 739 vMoment <- newTVar moment 740 vRequests <- newTVar mempty 741 742 reqChannels <- forM (keys moment.val) $ \k -> do 743 channelPools <- newTVar mempty 744 pure (k, channelPools) 745 vAsks <- newTVar $ mapFromList reqChannels 746 747 vTellId <- newTVar 0 748 749 pure RUNNER{vMoment, ctx, vRequests, vTellId, vAsks} 750 751 -- TODO: Hack because I don't understand how this is supposed 752 -- to work. 753 initialFlows <- newEmptyMVar 754 755 machine <- mdo 756 snapshotAsync <- asyncOnCurProcess $ withThreadName "Snapshot" $ do 757 setThreadSortIndex (-1) 758 handle (onErr "snapshot") $ snapshotFun machine 759 760 loggerAsync <- asyncOnCurProcess $ withThreadName "Log" $ do 761 setThreadSortIndex (-2) 762 handle (onErr "log") $ logFun machine 763 764 runnerAsync <- asyncOnCurProcess $ withThreadName threadName $ do 765 handle (onErr "runner") $ 766 runnerFun initialFlows machine processName runner 767 768 let machine = MACHINE{..} 769 pure machine 770 771 flows <- takeMVar initialFlows 772 pure (machine, flows) 773 774 onErr name e = do 775 debugText $ name <> " thread was killed by: " <> pack (displayException e) 776 throwIO (e :: SomeException) 777 778 -- | Synchronously shuts down a machine and wait for it to exit. 779 shutdownMachine :: Machine -> IO () 780 shutdownMachine machine@MACHINE{ctx,runnerAsync} = do 781 -- Kill the runner to immediately cancel any computation being done. 782 cancel runnerAsync 783 -- Kill agentMach's runner, too 784 cancel ctx.agentMach.runnerAsync 785 786 waitForLogAsyncShutdown machine 787 788 waitForLogAsyncShutdown :: Machine -> IO () 789 waitForLogAsyncShutdown MACHINE{..} = do 790 -- Wait for the logging system to finish any logging task it's already 791 -- started on. 792 atomically $ writeTVar shutdownLogger True 793 wait loggerAsync 794 wait snapshotAsync 795 atomically $ putTMVar shutdownComplete () 796 797 -- | Given a Request parsed from the cog, turn it into a LiveRequest that can 798 -- produce a value and that we can listen to. 799 buildLiveRequest 800 :: Debug 801 => Flow 802 -> Runner 803 -> MachineContext 804 -> CogId 805 -> RequestIdx 806 -> Request 807 -> STM ([Flow], LiveRequest, Maybe (STM OnCommitFlow)) 808 buildLiveRequest causeFlow runner ctx cogId reqIdx = \case 809 ReqEval EVAL_REQUEST{..} -> do -- timeoutMs func args -> do 810 let req = EVAL_REQUEST{flow=causeFlow, timeoutMs, func, args, cogId} 811 leRecord <- pleaseEvaluate ctx.eval req 812 pure ([], LiveEval{leIdx=reqIdx, leRecord}, Nothing) 813 814 ReqCall cr -> do 815 callSt <- STVAR <$> newTVar LIVE 816 817 let lcSysCall = SYSCALL cogId cr.device cr.params callSt causeFlow 818 -- If the call requires durability, pass the STM action to start 819 -- it back to the caller. Otherwise, just run the action and 820 -- pass back nothing. 821 (startedFlows, logCallback, lcCancel) <- 822 if cr.durable then do 823 canceled <- newTVar False 824 vCancel <- newTVar (CANCEL $ writeTVar canceled True) 825 826 -- If the cancel happens before the disk sync, then we 827 -- never submit the request to hardware. If the cancel 828 -- happens after `onCommit`, then we run the actual cancel 829 -- action. 830 831 let onCancel = do 832 cancel <- readTVar vCancel 833 cancel.action 834 835 let onCommit = do 836 isCanceled <- readTVar canceled 837 startedFlows <- case isCanceled of 838 True -> pure [] 839 False -> do 840 (cancel, startedFlows) <- 841 callHardware ctx.hw cr.device lcSysCall 842 writeTVar vCancel cancel 843 pure startedFlows 844 pure $ OnCommitFlow causeFlow startedFlows 845 846 pure ([], Just onCommit, CANCEL onCancel) 847 else do 848 (cancel, flows) <- callHardware ctx.hw cr.device lcSysCall 849 pure (flows, Nothing, cancel) 850 851 pure (startedFlows, LiveCall{lcIdx=reqIdx, lcSysCall, lcCancel}, 852 logCallback) 853 854 ReqWhat what -> do 855 -- NOTE: If we ever make the hardware set runtime dynamic, the 856 -- DeviceTable's data should be a `TVar (Map ...)` to let us detect 857 -- changes to the variable. 858 let runtime = S.fromList $ keys $ table ctx.hw 859 pure ([], LiveWhat{lwhIdx=reqIdx,lwhCog=what,lwhRuntime=runtime}, 860 Nothing) 861 862 ReqAsk ASKR{cogDst,channel,msg} -> do 863 asks <- readTVar runner.vAsks 864 case M.lookup cogDst asks of 865 -- TODO: Pretty sure using `error` here is wrong. Figure out what 866 -- the real error handling should be when a cog requests from a cog 867 -- that doesn't exist. 868 Nothing -> error $ "Bad cogDst " <> show cogDst 869 Just vChannels -> do 870 poolId <- 871 channelPoolRegister channel 872 vChannels 873 (PENDING_ASK cogId reqIdx msg) 874 pure ([], LiveAsk{lcrChannel=channel, 875 lcrPoolId=poolId, 876 lcrChannels=vChannels}, Nothing) 877 878 ReqTell channel function -> do 879 asks <- readTVar runner.vAsks 880 channels <- case M.lookup cogId asks of 881 Nothing -> error $ "Listening on a nonexistent cog " <> show cogId 882 Just pool -> pure pool 883 pure ([], LiveTell{ltCogId=cogId, 884 ltIdx=reqIdx, 885 ltChannel=channel, 886 ltChannels=channels, 887 ltFun=function}, Nothing) 888 889 ReqSpin (SR func) -> do 890 pure ([], LiveSpin{lsIdx=reqIdx, lsFun=func}, Nothing) 891 892 ReqReap cogid -> do 893 pure ([], LiveReap{lrIdx=reqIdx, lrCogId=cogid}, Nothing) 894 895 ReqStop cogid -> do 896 pure ([], LiveStop{lstIdx=reqIdx, lstCogId=cogid}, Nothing) 897 898 ReqWait cogid -> do 899 pure ([], LiveWait{lwaIdx=reqIdx, lwaCogId=cogid}, Nothing) 900 901 ReqWho -> do 902 pure ([], LiveWho{lwIdx=reqIdx, lwCogId=cogId}, Nothing) 903 904 UNKNOWN _ -> do 905 pure ([], LiveUnknown, Nothing) 906 907 cancelRequest :: LiveRequest -> STM () 908 cancelRequest LiveEval{..} = leRecord.cancel.action 909 cancelRequest LiveCall{..} = lcCancel.action 910 cancelRequest LiveWhat{} = pure () 911 cancelRequest LiveTell{} = pure () 912 cancelRequest LiveAsk{..} = 913 channelPoolUnregister lcrChannel lcrChannels lcrPoolId 914 cancelRequest LiveSpin{} = pure () -- Not asynchronous 915 cancelRequest LiveReap{} = pure () 916 cancelRequest LiveStop{} = pure () 917 cancelRequest LiveWait{} = pure () 918 cancelRequest LiveWho{} = pure () 919 cancelRequest LiveUnknown = pure () 920 921 {- 922 An eval that was timed out or crashed requires zero work to replay, 923 since we persist the result instead of re-computing it during replay. 924 -} 925 workToReplayEval :: EvalOutcome -> NanoTime 926 workToReplayEval = \case 927 OKAY w _ -> w 928 CRASH{} -> 0 929 TIMEOUT -> 0 930 931 receiveResponse 932 :: Runner 933 -> (Fan, LiveRequest) 934 -> STM (Maybe ResponseTuple) 935 receiveResponse st = \case 936 (_, LiveEval{..}) -> do 937 getEvalOutcome leRecord >>= \case 938 Nothing -> pure Nothing 939 Just (outcome, flow) -> do 940 let work = workToReplayEval outcome 941 pure (Just RTUP{key=leIdx, resp=RespEval outcome, work, flow}) 942 943 (_, LiveCall{..}) -> do 944 getCallResponse lcSysCall >>= \case 945 Nothing -> pure Nothing 946 Just (fan, flow) -> do 947 let resp = RespCall fan lcSysCall 948 -- TODO: Handle runtimeNs here, if we spent seconds on 949 -- an http transfer, we want to commit that as work so 950 -- we wouldn't fetch the same file again when restarting. 951 -- 952 -- TODO: Reconsider the above TODO? Replay does not 953 -- run effects again. 954 pure (Just RTUP{key=lcIdx, resp, work=0, flow}) 955 956 (_, LiveWhat{..}) -> do 957 guard (lwhCog /= lwhRuntime) 958 let resp = RespWhat lwhRuntime 959 pure (Just RTUP{key=lwhIdx,resp,work=0,flow=FlowDisabled}) 960 961 (_, LiveAsk{}) -> 962 -- Asks are never responded to normally, they're responded manually as 963 -- a side effect of a tell so that we maintain atomicity of the 964 -- request/serve pair in the log. 965 retry 966 967 (_, LiveTell{..}) -> do 968 channelPoolTake ltChannel ltChannels >>= \case 969 Nothing -> pure Nothing 970 Just PENDING_ASK{..} -> do 971 tellId <- stateTVar st.vTellId \s -> (TellId s, s + 1) 972 973 let pa = TELL_PAYLOAD { 974 asker=requestor, 975 reqIdx, 976 tellId, 977 ret = ltFun %% (toNoun requestor) %% msg 978 } 979 980 pure (Just RTUP{key=ltIdx, 981 resp=RespTell pa, 982 work=0, 983 -- TODO: Hook up send flows here. 984 flow=FlowDisabled}) 985 986 (_, LiveSpin{..}) -> 987 do 988 cogid <- getRandomNonConflictingId =<< readTVar st.vMoment 989 -- TODO: How should the flows be represented when spinning a new cog!? 990 pure (Just RTUP{key=lsIdx, resp=RespSpin cogid lsFun, work=0, 991 flow=FlowDisabled}) 992 where 993 getRandomNonConflictingId :: Moment -> STM CogId 994 getRandomNonConflictingId moment = do 995 r :: Word64 <- unsafeIOToSTM $ randomIO 996 case lookup (COG_ID r) moment.val of 997 Nothing -> pure $ COG_ID r 998 Just _ -> getRandomNonConflictingId moment 999 1000 (_, LiveReap{..}) -> do 1001 do 1002 myb <- (lookup lrCogId . (.val)) <$> readTVar st.vMoment 1003 case myb of 1004 Nothing -> do 1005 pure (Just RTUP{key=lrIdx, resp=RespReap lrCogId Nothing, 1006 work=0, flow=FlowDisabled}) 1007 Just cogval -> case cogval of 1008 CG_SPINNING{} -> pure Nothing 1009 _ -> do 1010 performStoplike RespReap lrIdx lrCogId cogval 1011 1012 (_, LiveStop{..}) -> do 1013 do 1014 myb <- (lookup lstCogId . (.val)) <$> readTVar st.vMoment 1015 case myb of 1016 Nothing -> 1017 pure (Just RTUP{key=lstIdx, resp=RespStop lstCogId Nothing, 1018 work=0, flow=FlowDisabled}) 1019 Just cogval -> performStoplike RespStop lstIdx lstCogId cogval 1020 1021 (_, LiveWait{..}) -> do 1022 myb <- (lookup lwaCogId . (.val)) <$> readTVar st.vMoment 1023 case myb of 1024 Nothing -> 1025 pure (Just RTUP{key=lwaIdx, resp=RespWait lwaCogId, 1026 work=0, flow=FlowDisabled}) 1027 Just _ -> pure Nothing 1028 1029 (_, LiveWho{..}) -> do 1030 pure (Just RTUP{key=lwIdx, resp=RespWho lwCogId, work=0, 1031 flow=FlowDisabled}) 1032 1033 (_, LiveUnknown) -> do 1034 pure Nothing 1035 1036 where 1037 -- Common implementation of %stop and %reap, building a response while also 1038 -- removing the current cog's state and canceling all open requests. 1039 performStoplike :: (CogId -> Maybe CogState -> Response) 1040 -> RequestIdx 1041 -> CogId 1042 -> CogState 1043 -> STM (Maybe ResponseTuple) 1044 performStoplike mkResponse reqIdx cogId cogVal = do 1045 modifyTVar' st.vAsks $ M.delete cogId 1046 modifyTVar' st.vMoment 1047 \m -> m { val=deleteMap cogId m.val } 1048 1049 reqs <- stateTVar st.vRequests getAndRemoveReqs 1050 mapM_ cancelRequest reqs 1051 1052 pure (Just RTUP{key=reqIdx, resp=mkResponse cogId (Just cogVal), 1053 work=0, flow=FlowDisabled}) 1054 where 1055 getAndRemoveReqs s = 1056 ( getLiveReqs $ fromMaybe mempty $ lookup cogId s 1057 , deleteMap cogId s 1058 ) 1059 getLiveReqs csc = map (\(_,(_,lr)) -> lr) $ mapToList csc 1060 1061 1062 -- Design point: Why not use optics and StateT in Runner? Because StateT in IO 1063 -- doesn't have a MonandUnliftIO instance, which means that we can't bracket 1064 -- our calls to the profiling system, which you really want to do to keep 1065 -- things exception safe. We thus only use it in the one super stateful method, 1066 -- parseRequests. 1067 1068 -- The Cog Runner -------------------------------------------------------------- 1069 1070 runnerFun :: Debug => MVar [Flow] -> Machine -> ByteString -> Runner -> IO () 1071 runnerFun initialFlows machine processName st = 1072 bracket_ registerCogsWithHardware stopCogsWithHardware $ do 1073 -- Process the initial syscall vector 1074 (flows, onCommit) <- atomically (parseAllRequests st) 1075 1076 -- We got here via replay, so any synchronous requests are 1077 -- immediately safe to execute. 1078 for_ onCommit atomically 1079 -- TODO: do this all in one atomically? 1080 1081 -- The code that calls us wants these. 1082 putMVar initialFlows flows 1083 1084 -- Run the event loop until we're forced to stop. 1085 machineTick 1086 1087 -- Force a sync to shutdown. 1088 waitForLogAsyncShutdown machine 1089 where 1090 registerCogsWithHardware :: IO () 1091 registerCogsWithHardware = do 1092 m <- readTVarIO st.vMoment 1093 let k :: [CogId] = keys m.val 1094 for_ k $ \cogid -> 1095 for_ machine.ctx.hw.table $ \d -> d.spin cogid 1096 1097 stopCogsWithHardware :: IO () 1098 stopCogsWithHardware = do 1099 m <- readTVarIO st.vMoment 1100 for_ (keys m.val) $ \cogid -> 1101 for_ machine.ctx.hw.table $ \d -> d.stop cogid 1102 1103 -- We've completed a unit of work. Time to tell the logger about 1104 -- the new state of the world. 1105 exportState :: (Receipt, [STM OnCommitFlow]) -> STM () 1106 exportState (receipt, onCommit) = do 1107 when (length onCommit > 0) do 1108 writeTVar machine.logImmediately True 1109 writeTBQueue machine.logReceiptQueue (receipt, onCommit) 1110 1111 -- When a "tell" gets a response, it must be the only response in the 1112 -- response bundle, so make sure they're handled separately. 1113 separateTells :: [(CogId, CogSysCalls)] -> [(CogId, CogSysCalls)] 1114 separateTells = loop 1115 where 1116 loop [] = [] 1117 loop ((id,calls):xs) = tellList ++ (id, rest) : loop xs 1118 where 1119 (tells, rest) = IM.partition isTell calls 1120 1121 isTell (_, LiveTell{}) = True 1122 isTell (_, _) = False 1123 1124 tellList = map mkEachTell $ IM.toList tells 1125 mkEachTell (k, v) = (id, IM.singleton k v) 1126 1127 -- TODO: Optimize 1128 collectResponses :: IntMap (Maybe a) -> [(Int, a)] 1129 collectResponses imap = 1130 go [] (mapToList imap) 1131 where 1132 go !acc [] = acc 1133 go !acc ((k,Just v) : kvs) = go ((k,v):acc) kvs 1134 go !acc ((_,Nothing) : kvs) = go acc kvs 1135 1136 mkCogResponses :: [(CogId, CogSysCalls)] 1137 -> [STM (CogId, [(Int, ResponseTuple)])] 1138 mkCogResponses = map build 1139 where 1140 build :: (CogId, CogSysCalls) -> STM (CogId, [(Int, ResponseTuple)]) 1141 build (k, sysCalls) = do 1142 returns <- fmap collectResponses 1143 $ traverse (receiveResponse st) 1144 $ sysCalls 1145 1146 when (null returns) retry 1147 1148 pure (k, returns) 1149 1150 -- Collects all responses that are ready for one cog. 1151 takeReturns :: IO (Maybe (CogId, [(Int, ResponseTuple)])) 1152 takeReturns = do 1153 withAlwaysTrace "WaitForResponse" "cog" do 1154 -- This is in a separate atomically block because it doesn't 1155 -- change and we want to minimize contention. 1156 machineSysCalls <- 1157 (separateTells . mapToList) <$> atomically (readTVar st.vRequests) 1158 1159 -- We now have a mapping from cog ids to the CogSysCalls map. We 1160 -- need to verify if waiting on this will ever complete. 1161 -- 1162 -- Note: This is a separate mechanism than CG_FINISHED; the 1163 -- interpreter not understanding a request should trigger an 1164 -- interpreter shutdown but should not trigger a %reap or %stop 1165 -- visible finished state. 1166 case machineHasLiveRequests machineSysCalls of 1167 False -> pure Nothing 1168 True -> do 1169 reordered <- shuffleM machineSysCalls 1170 -- debugText $ "takeReturns: " <> tshow reordered 1171 Just <$> (atomically $ asum $ mkCogResponses reordered) 1172 1173 -- Every machineTick, we try to pick off as much work as possible for cogs 1174 -- to do. 1175 -- 1176 -- TODO: This is the simple way of doing things, there's a ton of runtime 1177 -- gains that could be had at the cost of complexity by having a persistent 1178 -- async for each Cog, but that adds some sync subtlety while this means 1179 -- the machine is as fast as only the slowest cog, but is obviously correct. 1180 machineTick :: IO () 1181 machineTick = takeReturns >>= \case 1182 Nothing -> do 1183 putStrLn "Shutting down..." 1184 -- TODO: Check the state of the machine and print it here. Print if 1185 -- there are timed out cogs. Print if there are cogs which we can't 1186 -- run because of UNKNOWN requests. Print if all cogs exited 1187 -- cleanly with no requests (or 0 requests). 1188 pure () 1189 Just (cogId, valTuples) -> do 1190 results <- cogTick (cogId, valTuples) 1191 atomically $ do 1192 mapM_ exportState results 1193 readTVar st.vMoment >>= writeTVar machine.liveVar 1194 machineTick 1195 1196 -- Returns true if there are any valid, open LiveRequests that aren't 1197 -- UNKNOWN. This is used to end the machine when we will never be able to 1198 -- provide it with work. 1199 machineHasLiveRequests :: [(CogId, CogSysCalls)] -> Bool 1200 machineHasLiveRequests = any \(_, csc) -> cogHasLiveRequests csc 1201 1202 cogHasLiveRequests :: CogSysCalls -> Bool 1203 cogHasLiveRequests = any \(_, req) -> validLiveRequest req 1204 1205 -- 1206 cogTick 1207 :: (CogId, [(Int, ResponseTuple)]) 1208 -> IO [(Receipt, [STM OnCommitFlow])] 1209 cogTick (cogId, valTuples) = 1210 withProcessName processName $ 1211 withThreadName ("Cog: " <> (encodeUtf8 $ tshow cogId)) $ do 1212 let endFlows = (.flow) . snd <$> valTuples 1213 let traceArg = M.singleton "syscall indicies" 1214 $ Right 1215 $ tshow 1216 $ fmap fst valTuples 1217 1218 withTracingFlow "Response" "cog" traceArg endFlows $ do 1219 (flows, receipts) <- runResponse st cogId valTuples 1220 pure (flows, [], receipts) 1221 1222 {- 1223 Given a set of responses to syscalls in a cogs SysCall table, create 1224 a new event value and pass that into the cog, to get the new cog state. 1225 1226 Side Effects: 1227 1228 - The PLAN value for the cog is replaced. 1229 1230 - The cog's requests row is updated to reflect the new set of 1231 requests. 1232 1233 - If we received a COG_TELL message, the corresponding COG_ASK request 1234 is also processed. 1235 1236 - If we spawned a cog or stopped a cog, we inform every hardware 1237 device that this happened. 1238 1239 Results: 1240 1241 - The set of profiling events triggered by this change. 1242 1243 - A list of event-log receipts (and the corresponding actions to 1244 be triggered when those receipts have been committed to disk. 1245 (This is for disk-synchronized syscalls). 1246 -} 1247 runResponse 1248 :: Debug 1249 => Runner 1250 -> CogId 1251 -> [(Int, ResponseTuple)] 1252 -> IO ([Flow], [(Receipt, [STM OnCommitFlow])]) 1253 runResponse st cogNum rets = do 1254 let arg = TAb 1255 $ mapFromList 1256 $ flip fmap rets 1257 $ \(k,v) -> (NAT (fromIntegral k), responseToVal v.resp) 1258 moment <- atomically (readTVar st.vMoment) 1259 let fun = fromMaybe (error "Trying to run a stopped cog") 1260 $ cogSpinningFun 1261 $ fromMaybe (error "Invalid cogid") 1262 $ lookup cogNum moment.val 1263 (runtimeUs, result) <- do 1264 withAlwaysTrace "Eval" "cog" do 1265 let preEvaluate = map (ensureResponseEvaluated . (.resp) . snd) rets 1266 evalWithTimeout thirtySecondsInMicroseconds preEvaluate fun arg 1267 1268 let resultReceipt = case result of 1269 TIMEOUT -> RECEIPT_TIME_OUT{cogNum,timeoutAmount=runtimeUs} 1270 CRASH op arg -> RECEIPT_CRASHED{..} 1271 OKAY{} -> makeOKReceipt cogNum rets 1272 1273 let newState = case result of 1274 OKAY _ resultFan -> case hasNonzeroReqs resultFan of 1275 True -> CG_SPINNING resultFan 1276 False -> CG_FINISHED resultFan 1277 CRASH op arg -> CG_CRASHED op arg fun 1278 TIMEOUT -> CG_TIMEOUT runtimeUs fun 1279 1280 withAlwaysTrace "Tick" "cog" do 1281 let responses = ((.resp) . snd) <$> rets 1282 1283 -- 1) Notify hardware about spins which started a cog. We have to do this 1284 -- before we record the cog state or do parseRequests because the initial 1285 -- state of a newly created cog might try to communicate with the 1286 -- hardware. 1287 traverse_ performAlertHardwareOnSpin responses 1288 1289 -- 2) Perform parseRequest and handle all changes that have to be handled 1290 -- attomically. This has to happen after we notify the hardware about 1291 -- cogs being spun. 1292 (sideEffects, startedFlows, onPersists) <- atomically $ 1293 performStateUpdate newState responses runtimeUs resultReceipt 1294 1295 -- 3) If this cog shut down for any reason, we have to alert the hardware 1296 -- that it has shut down. This has to be done after parsing requests 1297 -- because parsing requests will cancel ongoing hardware events. 1298 case newState of 1299 CG_SPINNING _ -> pure () 1300 _ -> for_ st.ctx.hw.table $ \d -> d.stop cogNum 1301 1302 -- 4) %cog %ask/%tell must execute in pairs. We finally synchronously 1303 -- trigger any %ask callback now that the %tell has processed. 1304 afterResults <- mapM (performAskAck resultReceipt) responses 1305 1306 let (sideEffectFlows, sideEffectPersists) = concatUnzip sideEffects 1307 (afterFlows, afterReceipts) = concatUnzip afterResults 1308 receiptPairs = [ ( resultReceipt 1309 , onPersists ++ sideEffectPersists 1310 ) 1311 ] 1312 ++ afterReceipts 1313 1314 pure ( startedFlows ++ afterFlows ++ sideEffectFlows 1315 , receiptPairs ) 1316 1317 where 1318 delReq :: CogSysCalls -> Int -> CogSysCalls 1319 delReq acc k = IM.delete k acc 1320 1321 ensureResponseEvaluated :: Response -> IO () 1322 ensureResponseEvaluated (RespTell TELL_PAYLOAD{..}) = do 1323 -- We must force evaluate the response thunk so that we make sure it 1324 -- doesn't have a crash, but must do this while crediting the runtime 1325 -- (and possible crash) to the %tell since the %tell provides the 1326 -- function. 1327 evaluate $ force ret 1328 pure () 1329 1330 ensureResponseEvaluated _ = pure () 1331 1332 performAlertHardwareOnSpin :: Response -> IO () 1333 performAlertHardwareOnSpin (RespSpin newCogId _) = do 1334 -- If we just spun up another cog, tell all the hardware 1335 -- devices that it exists now. 1336 for_ st.ctx.hw.table $ \d -> d.spin newCogId 1337 performAlertHardwareOnSpin _ = pure () 1338 1339 performStateUpdate newState responses runtimeUs resultReceipt = do 1340 modifyTVar' st.vMoment \m -> 1341 MOMENT (insertMap cogNum newState m.val) (m.work + runtimeUs) 1342 1343 sideEffects <- case resultReceipt of 1344 RECEIPT_OK{..} -> do 1345 -- Delete the consumed `LiveRequest`s without formally 1346 -- canceling it because it completed. 1347 modifyTVar' st.vRequests \tab -> 1348 adjustMap (\kals -> foldl' delReq kals (fst<$>rets)) 1349 cogNum tab 1350 mapM performSpinEffects responses 1351 _ -> do 1352 -- If we crashed, do nothing. We'll formally cancel all 1353 -- requests when we reparse in the next step, and we have no 1354 -- valid responses to process side effects from. 1355 pure mempty 1356 1357 (startedFlows, onPersists) <- parseRequests st cogNum 1358 1359 pure (sideEffects, startedFlows, onPersists) 1360 1361 performSpinEffects :: Response 1362 -> STM ([Flow], [STM OnCommitFlow]) 1363 performSpinEffects (RespSpin newCogId fun) = do 1364 reqChannelPools <- newTVar mempty 1365 modifyTVar' st.vAsks \reqs -> 1366 M.insert newCogId reqChannelPools reqs 1367 modifyTVar' st.vMoment \m -> 1368 m { val=insertMap newCogId (CG_SPINNING fun) m.val } 1369 parseRequests st newCogId 1370 1371 performSpinEffects _ = pure mempty 1372 1373 -- Performed outside the Big Atomically Block 1374 performAskAck :: Receipt 1375 -> Response 1376 -> IO ([Flow], [(Receipt, [STM OnCommitFlow])]) 1377 performAskAck result (RespTell TELL_PAYLOAD{..}) = do 1378 -- When an IPC message is received, the corresponding 1379 -- ask syscall on the asking cog must also be processed 1380 -- within the same transaction. 1381 -- 1382 -- We do this recursively invoking runResponse for that 1383 -- syscall as well. 1384 -- 1385 -- We are sure that the `reqIdx` still corresponds to 1386 -- an active COG_ASK syscall because of reasons. 1387 let outcome = case result of 1388 RECEIPT_OK{} -> OutcomeOK (fanIdx 0 ret) tellId 1389 _ -> OutcomeCrash 1390 1391 -- TODO: Enable profiling flows; how do we connect 1392 -- the recv completing to this? 1393 let rtup = RTUP { key = reqIdx 1394 , resp = RespAsk outcome 1395 , work = 0 1396 , flow = FlowDisabled 1397 } 1398 runResponse st asker [(reqIdx.int, rtup)] 1399 1400 performAskAck _ _ = pure ([], []) 1401 1402 1403 {- 1404 Hack to avoid comparing SERV requests. 1405 1406 It's not expensive to replace the SERV request, but it is expensive 1407 to check them for equality. 1408 1409 This causes SERV requests to just always be considered non-matching, 1410 unless the two requests happen to be pointer-equals. 1411 1412 TODO: Find a better solution 1413 -} 1414 keep :: Fan -> Fan -> Bool 1415 keep x y = 1416 case reallyUnsafePtrEquality# x y of 1417 1# -> True 1418 _ -> 1419 case x of 1420 ROW rs -> 1421 if length rs == 4 && rs!0 == "http" && rs!2 == "serv" 1422 then False 1423 else x==y 1424 _ -> x==y 1425 1426 1427 concatUnzip :: [([a], [b])] -> ([a], [b]) 1428 concatUnzip a = let (f, s) = unzip a 1429 in (concat f, concat s) 1430 1431 parseAllRequests :: Debug => Runner -> STM ([Flow], [STM OnCommitFlow]) 1432 parseAllRequests runner = do 1433 cogs <- (keys . (.val)) <$> readTVar runner.vMoment 1434 actions <- forM cogs $ parseRequests runner 1435 pure $ concatUnzip actions 1436 1437 -- | Given a noun in a runner, update the `requests` 1438 parseRequests :: Debug => Runner -> CogId -> STM ([Flow], [STM OnCommitFlow]) 1439 parseRequests runner cogId = do 1440 cogState <- (fromMaybe (error "no cogid m") . lookup cogId . (.val)) <$> 1441 readTVar runner.vMoment 1442 syscalls <- (fromMaybe mempty . lookup cogId) <$> 1443 readTVar runner.vRequests 1444 1445 let init = PRS{syscalls, flows=[], onPersist=[]} 1446 1447 st <- flip execStateT init do 1448 let expected = syscalls 1449 let actual = case cogState of 1450 CG_CRASHED{} -> mempty 1451 CG_TIMEOUT{} -> mempty 1452 CG_FINISHED{} -> mempty 1453 CG_SPINNING cogFun -> getCurrentReqNoun cogFun 1454 1455 for_ (mapToList expected) \(i,(v,_)) -> 1456 case actual V.!? i of 1457 Nothing -> cancelReq i 1458 Just w | keep v w -> pure () 1459 Just w -> cancelReq i >> createReq i w 1460 1461 for_ (zip [0..] $ toList actual) \(i,v) -> 1462 unless (member i expected) do 1463 createReq i v 1464 1465 modifyTVar' runner.vRequests $ insertMap cogId st.syscalls 1466 1467 pure (st.flows, st.onPersist) 1468 1469 where 1470 1471 cancelReq :: Int -> StateT ParseRequestsState STM () 1472 cancelReq key = do 1473 -- traceM ("cancelReq:" <> show key) 1474 slot <- use (#syscalls % at key) 1475 case slot of 1476 Nothing -> error "cancelReq: impossible" 1477 Just (_, liveReq) -> do 1478 modifying' #syscalls (deleteMap key) 1479 lift (cancelRequest liveReq) 1480 1481 reqDebugSummary :: Request -> ByteString 1482 reqDebugSummary = \case 1483 ReqEval{} -> "%eval" 1484 ReqCall rc -> "%call " <> (encodeUtf8 $ 1485 describeSyscall runner.ctx.hw rc.device rc.params) 1486 ReqWhat{} -> "%what" 1487 ReqTell{} -> "%tell" 1488 ReqAsk{} -> "%ask" 1489 ReqSpin{} -> "%spin" 1490 ReqReap{} -> "%reap" 1491 ReqStop{} -> "%stop" 1492 ReqWait{} -> "%wait" 1493 ReqWho{} -> "%who" 1494 UNKNOWN{} -> "UNKNOWN" 1495 1496 flowCategory :: Request -> ByteString 1497 flowCategory = \case 1498 ReqEval{} -> "%eval" 1499 ReqCall rc -> "%call " <> (encodeUtf8 $ 1500 syscallCategory runner.ctx.hw rc.device rc.params) 1501 ReqWhat{} -> "%what" 1502 ReqTell{} -> "%tell" 1503 ReqAsk{} -> "%ask" 1504 ReqSpin{} -> "%cog" 1505 ReqReap{} -> "%reap" 1506 ReqStop{} -> "%cog" 1507 ReqWait{} -> "%wait" 1508 ReqWho{} -> "%cog" 1509 UNKNOWN{} -> "UNKNOWN" 1510 1511 createReq :: Int -> Fan -> StateT ParseRequestsState STM () 1512 createReq i v = do 1513 let request = valToRequest cogId v 1514 reqData = reqDebugSummary request 1515 reqName = "Cog " <> (encodeUtf8 $ tshow cogId.int) 1516 <> " idx " 1517 <> encodeUtf8 (tshow i) <> ": " <> reqData 1518 catData = flowCategory request 1519 f <- lift $ allocateRequestFlow reqName (decodeUtf8 catData) 1520 1521 let rIdx = RequestIdx i 1522 1523 (startedFlows, live, onPersist) <- lift $ 1524 buildLiveRequest f runner runner.ctx cogId rIdx request 1525 1526 case onPersist of 1527 Nothing -> pure () 1528 Just cb -> modifying' #onPersist (cb:) 1529 1530 modifying' #syscalls (insertMap i (v, live)) 1531 modifying' #flows (++ (f:startedFlows)) 1532 1533 evalWithTimeout 1534 :: Debug 1535 => Nat 1536 -> [IO ()] 1537 -> Fan 1538 -> Fan 1539 -> IO (NanoTime, EvalOutcome) 1540 evalWithTimeout msTimeout preActions fun arg = do 1541 (runtime, raw) <- withCalcRuntime $ timeout (fromIntegral msTimeout) $ do 1542 try doAllEvals >>= \case 1543 Left (PRIMOP_CRASH op val) -> do debug ("crash"::Text, NAT op, val) 1544 pure (Left (op,val)) 1545 Right f -> pure (Right f) 1546 case raw of 1547 Nothing -> pure (runtime, TIMEOUT) 1548 Just (Left(o,e)) -> pure (runtime, CRASH o e) 1549 Just (Right v) -> pure (runtime, OKAY runtime v) 1550 where 1551 doAllEvals = do 1552 sequence_ preActions 1553 evaluate $ force (fun %% arg) 1554 1555 1556 evalCheckingCrash 1557 :: Debug 1558 => Fan 1559 -> Fan 1560 -> IO (NanoTime, EvalOutcome) 1561 evalCheckingCrash fun arg = do 1562 (runtime, raw) <- withCalcRuntime $ do 1563 try (evaluate $ force (fun %% arg)) >>= \case 1564 Left (PRIMOP_CRASH op val) -> do debug ("crash"::Text, NAT op, val) 1565 pure (Left (op,val)) 1566 Right f -> pure (Right f) 1567 case raw of 1568 Left (o,e) -> pure (runtime, CRASH o e) 1569 Right v -> pure (runtime, OKAY runtime v) 1570 1571 -- The EventLog Routine -------------------------------------------------------- 1572 1573 data LogNext = LOG_NEXT 1574 { shutdown :: Bool 1575 , batchNum :: BatchNum 1576 , receipts :: [(Receipt, [STM OnCommitFlow])] 1577 , moment :: Moment 1578 } 1579 1580 {- 1581 This is the per-cog event-log persistence logic that runs in 1582 `loggerAsync`. 1583 1584 Event logging is almost entirely asynchronous w.r.t ship execution, 1585 the only exception is that some syscalls are "synchronous" and they 1586 need to block until the state that triggered them has been committed 1587 to disk. 1588 1589 Like snapshotting, the decision of whether or not to write a log 1590 batch depends on the amount of cpu-work has been performed since 1591 the last batch. 1592 1593 We also write a final log batch right before shutdown, if the daemon 1594 receives a SIGKILL. This isn't synchronized with snapshot routine, 1595 so the final disk state will often be a snapshot with a single log 1596 batch that needs to be replayed, but this should be cheap in practice. 1597 1598 There is also the `logImmediately` flag, which will cause the log 1599 to be written right away. This is important for "synchronous" events, 1600 since we want to cause those to block for as little time as possible. 1601 -} 1602 logFun :: Debug => Machine -> IO () 1603 logFun machine = 1604 loop 1605 where 1606 loop :: IO () 1607 loop = do 1608 next <- withAlwaysTrace "Block" "log" (atomically getNextBatch) 1609 1610 -- It's possible for next.receipts to be empty if a shutdown 1611 -- is requested and no events have happened since the last batch 1612 -- was written. 1613 withTracingFlow "Log" "log" mempty [] $ do 1614 if null next.receipts then do 1615 pure ([], [], ()) 1616 else do 1617 (startFlows, stepFlows) <- doBatch next 1618 pure (startFlows, stepFlows, ()) 1619 1620 case next.shutdown of 1621 True -> atomically $ writeTVar machine.shutdownSnapshot True 1622 False -> loop 1623 1624 -- Either reads a shutdown command or the next batch of work to log. 1625 getNextBatch :: STM LogNext 1626 getNextBatch = do 1627 shutdown <- readTVar machine.shutdownLogger 1628 live <- readTVar machine.liveVar 1629 (lastBatchNum, writ) <- readTVar machine.writVar 1630 forcedLog <- readTVar machine.logImmediately 1631 fullQ <- isFullTBQueue machine.logReceiptQueue 1632 1633 -- TODO: IIUC "workNs" is currently actually just the runtime. 1634 -- The idea was that we would use "total execution time" to 1635 -- trigger snapshots, in order to bound the amount of time needed 1636 -- to replay. This makes no sense for log batches, and the 1637 -- total-runtime makes no sense for snapshots. Shit is working, 1638 -- however, so there's no need to changes this atm. 1639 1640 let timedLog = live.work > (writ.work + logbatchWorkIntervalInNs) 1641 let shouldLog = shutdown || forcedLog || fullQ || timedLog 1642 1643 unless shouldLog retry 1644 1645 when forcedLog (writeTVar machine.logImmediately False) 1646 1647 receipts <- flushTBQueue machine.logReceiptQueue 1648 moment <- readTVar machine.liveVar 1649 1650 let batchNum = lastBatchNum + 1 1651 pure LOG_NEXT{shutdown, receipts, moment, batchNum} 1652 1653 doBatch :: LogNext -> IO ([Flow], [Flow]) 1654 doBatch next = do 1655 let (receipts, onCommit) = unzip next.receipts 1656 1657 now <- getNanoTime 1658 1659 writeLogBatch machine.ctx.lmdb $ 1660 LogBatch 1661 { batchNum = next.batchNum 1662 , writeTime = now 1663 , executed = receipts 1664 } 1665 1666 atomically $ writeTVar machine.writVar (next.batchNum, next.moment) 1667 1668 -- Perform the associated STM actions that were supposed to be 1669 -- run once committed. This is used for synchronous syscalls 1670 -- which need to block until we have committed to the state that 1671 -- triggered them. 1672 (starts, steps) <- unzipOnCommitFlows <$> 1673 traverse atomically (join onCommit) 1674 pure (starts, steps) 1675 1676 1677 -- The Snapshotting Routine ---------------------------------------------------- 1678 1679 {- 1680 The is the per-machine snapshotting logic that runs in `snapshotAsync`. 1681 1682 Snapshotting is fully asynchronous. It doesn't block normal 1683 execution, so we allow it to lag behind the current state. 1684 1685 When we replay, we load the latest snapshot, and then we find all 1686 of the log batches that come after that point. 1687 1688 Here, we track the mount of CPU-work that will be required to replay those 1689 batches, and we wait until that reaches a certain threshold before 1690 taking a snapshot. 1691 1692 We also take a snapshot right before shutdown, when the daemon receives a 1693 SIGKILL. 1694 -} 1695 snapshotFun :: Debug => Machine -> IO () 1696 snapshotFun machine = 1697 when machine.ctx.enableSnaps do 1698 loop 0 1699 where 1700 loop lastSnapWork = do 1701 let nextSnapWork = lastSnapWork + snapshotWorkIntervalInNs 1702 1703 (shutdown, bn, moment) <- 1704 withAlwaysTrace "Block" "log" $ 1705 atomically do 1706 shutdown <- readTVar machine.shutdownSnapshot 1707 (logBN, logMoment) <- readTVar machine.writVar 1708 1709 let shouldSnap = shutdown || logMoment.work > nextSnapWork 1710 1711 unless shouldSnap retry 1712 1713 pure (shutdown, logBN, logMoment) 1714 1715 withAlwaysTrace "Snapshot" "log" do 1716 writeMachineSnapshot machine.ctx.lmdb bn moment.val 1717 1718 unless shutdown do 1719 loop moment.work