# Copyright 2008 Kevin Reid, under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ pragma.syntax("0.9") def makeSubOutStream :DeepFrozen := def transformOut :DeepFrozen := def makeAppendStream :DeepFrozen := def wholeNumCoding :DeepFrozen := def eventuall :DeepFrozen := # XXX parser bug triggered by using noun-string here def Octets :DeepFrozen := List[def Octett :DeepFrozen := 0..255] # XXX we'd like to be able to hint to use specialized arrays def TAG_CHUNK :DeepFrozen := [175] def TAG_END_OF_MESSAGE :DeepFrozen := [176] def messageJoining implements ExitViaHere, DeepFrozen { to makeJoiner(outStream) { var okay := true return def joinerInput { to getChunkType() { return List } to close() {when (okay) -> {}} # XXX implement to fail(_) {} # XXX implement to takeAtMost(n) { if (n.isZero()) { return [] } def okayNow := okay def [op, okRes] := Ref.promise() okay := op #traceln(`joiner readying`) return when (okayNow) -> { #traceln(`joiner 1 setting up`) def asMarkedChunk(chunk :List) { #traceln(`joiner 1 marking $chunk`) def a := TAG_CHUNK.diverge(Octett) wholeNumCoding.writingTo(makeAppendStream(a))(chunk.size()) return a.snapshot() + chunk } def msgOut := transformOut(Octets, asMarkedChunk, def sub := makeSubOutStream.stopClose(outStream)) Ref.whenResolvedOnly(sub.terminates(), fn t { okRes.resolve(true) if (Ref.isBroken(t)) { outStream.fail(t) } else { # XXX transformOut must have a schedule policy and this must match it #traceln(`joiner writing end tag`) (def osr := outStream.reserve()) <- resolve(TAG_END_OF_MESSAGE) #when (osr) -> { traceln(`tag resolver resolved`) } } }) [msgOut] } } } } to makeSplitter(inStream) { def takeWholeNum := wholeNumCoding.takingFrom(inStream, eventuall) def outerTerminator return def splitterInput { to takeAtMost(n) { if (Ref.isResolved(outerTerminator)) { return outerTerminator } if (n == 0) { return [] } var optMsgChunkSize := null def msgTerminator return [def splitMessageInput { to getChunkType() { return Octets } to takeAtMost(n :nullOk[int]) { if (Ref.isResolved(msgTerminator)) { return msgTerminator } if (n == 0) { return [] } if (!__equalizer.sameYet(optMsgChunkSize, null)) { return when (optMsgChunkSize) -> { #traceln(`splitMessageInput taking data chunk up to $optMsgChunkSize`) when (def r := inStream.takeAtMost(if (n == null) { optMsgChunkSize } else { optMsgChunkSize.max(n) })) -> { #traceln(`splitMessageInput took ${r.size()} data $r`) optMsgChunkSize -= r.size() if (optMsgChunkSize == 0) { optMsgChunkSize := null } r } } } else { #traceln(`splitMessageInput taking tag from $inStream`) return when (def tag := EIO.takeRange(1, 1, inStream)) -> { #traceln(`splitMessageInput switching $tag`) switch (tag) { match ==null { bind outerTerminator := null bind msgTerminator := Ref.broken("incomplete message") } match [] { [] } match ==TAG_CHUNK { optMsgChunkSize := takeWholeNum() [] } match ==TAG_END_OF_MESSAGE { bind msgTerminator := null } } } catch p { bind msgTerminator := bind outerTerminator := Ref.broken(p) } } } }] } } } }