# Copyright 2007 Kevin Reid under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ pragma.syntax("0.9") pragma.enable("accumulator") def messageDescs :DeepFrozen := def makeAppendStream :DeepFrozen := def typeEncodings := [] def [=> OpCode :DeepFrozen] | _ := typeEncodings def now :DeepFrozen := # XXX parser bug: shouldn't need line continuation here def msgDecoders :DeepFrozen := accum [].asMap() for rec in messageDescs[typeEncodings] { _.with( def [verb :String, opCode :(OpCode.asType()), decoders :DeepFrozen] := rec opCode, def decodeMsg(ins, rec) implements DeepFrozen { E.send(rec, verb, accum [] for i => decoder in decoders { _.with(decoder.takingFrom(ins, now)()) }) } ) } def makeCapTPUnserializer(receiver) implements ExitViaHere, DeepFrozen { # XXX this should not make a stream available until the previous one is closed -- potential ordering problem return def capTPUnserializerInput { to getChunkType() { return List } to close() {} to fail(_) {} to flush() {} to takeAtMost(n) { if (n.isZero()) { return [] } def buffer := [].diverge(0..!2**8) def s := makeAppendStream(buffer) # XXX could skip the buffer and use a pipe instead, except that the decoder is not prepared to deal with non-immediate availability Ref.whenResolvedOnly(s.terminates(), fn t { if (Ref.isBroken(t)) { traceln(`CapTP unserializer for $receiver discarding broken message of length ${buffer.size()} (${Ref.optProblem(t)})`) } else { def ds := buffer.snapshot().asStream() def opcode := OpCode.takingFrom(ds, now)() def handler := msgDecoders.fetch(opcode, fn{throw(`Unrecognized CapTP opcode: $opcode`)}) handler(ds, receiver) } }) return [s] } } }