# Copyright 2007 Kevin Reid under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ pragma.syntax("0.9") pragma.enable("accumulator") pragma.enable("call-pattern") def map as DeepFrozen { to v(f, coll) { return accum [] for v in coll {_.with(f(v))}} to kv(f, coll) { return accum [] for k => v in coll {_.with(f(k, v))}} } # definitions per http://www.erights.org/elib/distrib/captp/types.html def IncomingPos :DeepFrozen := -2**31..!2**31 def ExportPos :DeepFrozen := 1..!2**31 def AnswerPos :DeepFrozen := -2**31..-1 def ImportPos :DeepFrozen := 0..!2**31 def WireDelta :DeepFrozen := 0..255 def MsgCount :DeepFrozen := 0..!2**63 def MsgName :DeepFrozen := String def VatID :DeepFrozen := def SwissBase :DeepFrozen := int def SwissNumber :DeepFrozen := int def SwissHash :DeepFrozen := int def Nonce :DeepFrozen := int def OneArgFunc :DeepFrozen := def Vine :DeepFrozen := any # should be rcvr (DeepFrozen missing) def DESCS_EXIT :DeepFrozen := "CapTP_1_descs" def makeProxy :DeepFrozen := def makeDelayedRedirector :DeepFrozen := def makeBrand :DeepFrozen := def Throwable :DeepFrozen := def makeCycleBreaker :DeepFrozen := def deSubgraphKit :DeepFrozen := def makeNonceLocator :DeepFrozen := def [=> makeAnswersTable :DeepFrozen, => makeExportsTable :DeepFrozen, => makeProxiesTable :DeepFrozen, => makeNearGiftTable :DeepFrozen, => makePromiseGiftTable :DeepFrozen, => makeVine :DeepFrozen] | _ := def descMakerStandin as DeepFrozen {} def makeCapTPConnection implements ExitViaHere, DeepFrozen { # XXX this should be a map to run(var outgoingReceiver, swissTable, locatorUnum, whenGarbage, hub, localVatID, remoteSearchPath, remoteVatID) { def answers := makeAnswersTable() def exports := makeExportsTable() def imports := makeProxiesTable() def questions := makeProxiesTable() def nonceLocator := makeNonceLocator( def promiseGifts := makePromiseGiftTable(whenGarbage, nonceLocator), def nearGifts := makeNearGiftTable(whenGarbage), remoteVatID, hub, swissTable) def termination # resolved when connection is dead def remoteNonceLocatorRecord # Currently unused. var currentIncomingSerialNumber := 0 def prepareQuestion def [remoteIncomingPosSealer, remoteIncomingPosUnsealer] := makeBrand("CapTP far ref's remote incomingPos") def remoteIncomingPosBrand := remoteIncomingPosSealer.getBrand() def once := # DeepFrozen missing def scopeWithoutDescs := { def s := deSubgraphKit.getDefaultScope().diverge() s["Ref"] := Ref s["CapTP_1_locatorUnum"] := locatorUnum s.snapshot() } def unscope := { def u := makeCycleBreaker.byInverting(scopeWithoutDescs).diverge() u[descMakerStandin] := DESCS_EXIT u.snapshot() } /** Uncalls our local NonceLocator being passed out. */ def nonceLocatorUncaller { to optUncall(ref) { if (__equalizer.sameYet(ref, nonceLocator)) { return [descMakerStandin, "Import", [0]] } } } def pbcUncaller { to optUncall(r) { if (r =~ p :pbc && r == p) { return p.__optUncall() } else if (r =~ Ref.broken(problem)) { # XXX this discards the identities of unconnected refs -- OK? return [Ref, "broken", [problem]] } } } /** Uncalls Far refs that are being passed to their home vat. */ def goingHomeUncaller { to optUncall(ref) { if (remoteIncomingPosUnsealer.amplify(ref) \ =~ [[proxy, pos :IncomingPos]]) { if (false && !(__equalizer.sameYet(proxy, ref))) { # This test is stubbed out because the promise proxies are not the same when they should be. XXX fix proxy implementation, then remove "false && ".y return [__identityFunc, "run", [Ref.broken("impersonated going-home proxy")]] } else { return [descMakerStandin, "Incoming", [pos]] } } } } def proxyUncaller { to optUncall(r) { # XXX implement Ref.isPassByProxy if (!Ref.isPassByCopy(r)) { #traceln(`exporting $r`) if ((def index := exports.indexFor(r)) != -1) { exports.incr(index) return [descMakerStandin, "Import", [index]] } else { def swissNumber := swissTable.getNewSwiss(r) return [descMakerStandin, "NewFar", [exports.newFarPos(r), swissNumber.cryptoHash()]] } } } } /** Uncalls promises, assuming that they are either local or from a foreign comm system; that is, they are not subject to 3-vat introduction. */ def genericPromiseUncaller { to optUncall(r) { if (Ref.isEventual(r) && !Ref.isResolved(r)) { def rdrSwissBase := swissTable.nextSwiss() def [rdrAnswerPos, rdrRecord] := prepareQuestion(true) # XXX we're not using rdrSwissBase for anything; it *should* be extractible from the question proxy so that we can pass it to another vat E.sendOnly(r, "__whenMoreResolved", [rdrRecord.localReference()]) return [descMakerStandin, "NewRemotePromise", [ exports."bind"(r), rdrAnswerPos, rdrSwissBase]] } } } /** Uncalls promises that are part of another CapTP connection of this hub. */ def otherConnPromiseUncaller { to optUncall(r) { if (Ref.isEventual(r) && #!Ref.isResolved(r) && # -- until Far3Desc works hub.amplifyFor3Desc(r, remoteVatID) \ =~ descArgs :Tuple[List[String], String, int, any]) { return [descMakerStandin, "Promise3", descArgs] } } } # XXX document exit table used def encRecognizer := deSubgraphKit.makeRecognizer( [goingHomeUncaller, nonceLocatorUncaller, otherConnPromiseUncaller, genericPromiseUncaller, pbcUncaller, , proxyUncaller], unscope) /** given an index from an incoming message, look up the corresponding local object */ def lookupIncoming(index :int) { return if (index.isZero()) { nonceLocator } else if (index.belowZero()) { answers[-index] } else { # above zero exports[index] } } def lookupImport(index :int) { return if (index.isZero()) { return remoteNonceLocatorRecord.receivedReference() } else if (index.belowZero()) { #questions[-index] throw("can't happen: <0 import") } else { # above zero imports[index].receivedReference() } } /** Transform a CapTP message argument object into the form for the outgoing receiver to accept. */ def outEncode(object) { return once(fn builder { encRecognizer.recognize(object, builder) }) } def requireLive() { if (Ref.isResolved(termination)) { throw(`this CapTP connection has been terminated (${Ref.optProblem(termination)})`) } } def checkedAndAboutToHandleIncoming() { requireLive() currentIncomingSerialNumber += 1 } def isProxyForOtherConnection(ref) { return remoteIncomingPosUnsealer.amplify(ref) == null && hub.isOurProxy(ref) } def makeProxyRecord(far, position :vow[IncomingPos]) { var wireCount := 0 # number of mentions in incoming messages var localCount := 0 # number of local proxies with finalizers var fresh := true # have any messages been sent on this ref? def [resolutionSlot, proxySlotResolver] := Ref.promise() def proxyRecord /** for compatibility with the classic Redirector implementations - XXX review whether this facet and interface are appropriate */ def proxyResolver { to getProxy() { return proxyRecord.localReference() } to resolve(resolution) :void { proxySlotResolver.resolve(&resolution) } to smash(problem) :void { proxySlotResolver.resolveRace(__makeFinalSlot(Ref.broken(problem))) } to optHandler() { if (!proxySlotResolver.isDone()) { return def stubProxyHandler { to isFresh() { return fresh } to sameConnection(otherRef) { return remoteIncomingPosUnsealer.amplify(otherRef) =~ [_] } } } } } def capTPEventualHandler { to handleSend(verb, args) { if (verb == "__whenMoreResolved" && args.size() == 1) { capTPEventualHandler.handleSendOnly(verb, args) return null } else { def [answerPos, questionRecord] := prepareQuestion(false) def redirector := questionRecord.makeRedirector() fresh := false E.sendOnly(outgoingReceiver, "Deliver", [answerPos, outEncode(redirector), position, verb, outEncode(args)]) return questionRecord.localReference() } } to handleSendOnly(verb, args) { #traceln(`sendOnly $verb $args`) if (verb == "__whenMoreResolved" \ && args =~ [reactor] \ && isProxyForOtherConnection(reactor)) { E.sendOnly(reactor, "run", [proxyRecord.localReference()]) return null } else { fresh := false E.sendOnly(outgoingReceiver, "DeliverOnly", [position, verb, outEncode(args)]) } } to handleOptSealedDispatch(brand) { if (brand == remoteIncomingPosBrand) { traceln(__equalizer.sameYet(proxyRecord.localReference(), proxyRecord.localReference())) return remoteIncomingPosSealer.seal([proxyRecord.localReference(), position]) } else if (brand == hub.get3DescBrand()) { return hub.get3DescSealer().seal(fn recipID { def nonce := swissTable.nextSwiss() # XXX use a long # XXX we shouldn't have the authority to specify the path+vatID - it should be baked into our access to the hub def farVine := remoteNonceLocatorRecord.localReference() <- provideFor( def incomingDescStub implements pbc {to __optUncall() {return [descMakerStandin, "Incoming", [position]]}}, recipID, nonce) [remoteSearchPath, remoteVatID, nonce, makeVine(farVine)] }) } } to __printOn(out :TextWriter) { out.print("") } } return bind proxyRecord { to smash(problem :Throwable) { proxyResolver.smash(problem) } to makeRedirector() { return makeDelayedRedirector(proxyResolver) } /** make a local reference, and increment the wireCount (number of received descriptors for this local entry) */ to receivedReference() { wireCount += 1 return proxyRecord.localReference() } to localReference() { def proxy := makeProxy(capTPEventualHandler, resolutionSlot, far) localCount += 1 whenGarbage(proxy, once(fn { localCount -= 1 if (localCount.atMostZero()) { if (position.belowZero()) { # question E.sendOnly(outgoingReceiver, "GCAnswer", [position]) questions.free(-position) } else if (position.aboveZero()) { # import E.sendOnly(outgoingReceiver, "GCExport", [position, wireCount]) # no imports.free(...) because the other side might reuse this entry } wireCount := 0 } })) return proxy } } } # The remote nonce locator is a Far reference which is unique to this connection -- XXX this representation will have to be changed for ShutdownOp; see connection.updoc bind remoteNonceLocatorRecord := makeProxyRecord(true, 0) bind prepareQuestion(resolved :boolean) { def position := -(questions."bind"(def record := makeProxyRecord( resolved, position))) return [position, record] } def incomingDescMaker { to NewFar(importPos :ImportPos, swissHash :SwissHash) { requireLive() imports.put(importPos, makeProxyRecord(true, importPos)) return imports[importPos].receivedReference() } to NewRemotePromise(importPos :ImportPos, rdrPos :AnswerPos, rdrBase :SwissBase) { requireLive() imports.put(importPos, def record := makeProxyRecord(false, importPos)) { def redirector := record.makeRedirector() swissTable.registerIdentity(redirector, rdrBase) answers.put(-rdrPos, redirector, true) } return record.receivedReference() } to Import(importPos :ImportPos) { requireLive() return lookupImport(importPos) } to Incoming(incomingPos :IncomingPos) { requireLive() return lookupIncoming(incomingPos) } to Promise3(searchPath :List[String], hostID :VatID, nonce :Nonce, vine :Vine) { # XXX how does this succeed in waiting long enough? return hub[searchPath, hostID] <- nonceLocator() \ <- acceptFrom(remoteSearchPath, remoteVatID, nonce, makeVine(vine)) } } def buildScope := scopeWithoutDescs.with(DESCS_EXIT, incomingDescMaker) /** Turns an incoming encoded-object (a closure over some Data-E recognizer) into a local object by providing it with a builder. */ def inObj(specimen, ejector) { # XXX All this would be better done by a Data-E protocol verifier intermediary -- and we ought to be hiding the builder products from the provided recognizer, anyway. var builderDead := false def baseBuilder := deSubgraphKit.makeBuilder(buildScope) def capTPArgBuilder { match msg { requireLive() if (builderDead) { throw.eject(ejector, `this CapTP argument builder is no longer valid`) } E.callWithPair(baseBuilder, msg) } } try { return specimen(capTPArgBuilder) } finally { builderDead := true } } def capTPReceiver { to DeliverOnly(recipPos :IncomingPos, verb :MsgName, via (inObj) args :List[any]) :void { checkedAndAboutToHandleIncoming() E.sendOnly(lookupIncoming(recipPos), verb, args) } to Deliver(answerPos :AnswerPos, via (inObj) rdr :OneArgFunc, recipPos :IncomingPos, verb :MsgName, via (inObj) args :List[any]) :void { checkedAndAboutToHandleIncoming() def answer := E.send(lookupIncoming(recipPos), verb, args) answers.put(-answerPos, answer, true) E.sendOnly(answer, "__whenMoreResolved", [rdr]) } to GCExport(exportPos :ExportPos, wireDelta :WireDelta) { checkedAndAboutToHandleIncoming() exports.decr(exportPos, wireDelta) # XXX do we need to do something upon reaching 0? } to GCAnswer(answerPos :AnswerPos) { checkedAndAboutToHandleIncoming() answers.free(-answerPos) } to Terminated(via (inObj) problem :Throwable) { checkedAndAboutToHandleIncoming() bind termination := Ref.broken(problem) # Discard our local references, break our proxies exports.smash(problem) imports.smash(problem) answers.smash(problem) questions.smash(problem) remoteNonceLocatorRecord.smash(problem) # Discard our outgoing references, since we will no longer use them outgoingReceiver := null # XXX review for additional cleanup needed/useful } } /** Facet of this connection provided for the LocatorUnum etc. */ def outgoingConnection { to __printOn(out :TextWriter) { out.print("") } to nonceLocator() { return remoteNonceLocatorRecord.localReference() } } # XXX review what the least authority is for this /** Facet of this connection provided for its peer connections, to manage 3-vat introductions. */ def peerConnection { to __printOn(out :TextWriter) { out.print("") } to nonceLocator() { return remoteNonceLocatorRecord.localReference() } to getPromiseGiftTable() { return promiseGifts } to getNearGiftTable() { return nearGifts } } return [capTPReceiver, outgoingConnection, peerConnection] } }