# Copyright 2005-2007 Kevin Reid, under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ pragma.syntax("0.9") pragma.enable("dot-props") def ALL :DeepFrozen := EIO.getALL() def InStream :DeepFrozen := def OutStream :DeepFrozen := def enqueue(item, &tail) as DeepFrozen { def resolver := tail tail := def next resolver.resolve([item, next]) } # XXX revise parameter map for new stream scheme def makeSimplePipe implements DeepFrozen, ExitViaHere { to run([=> Chunk :Guard := any, => bufferLimit :(EIO::Range) := EIO::ALL, => synchronous :boolean := false, => label :DeepFrozen := "anonymous"]) { require(synchronous == false) require(bufferLimit == EIO::ALL) def [var chunkQHead, var chunkQTail] := Ref.promise() def [var holeQHead, var holeQTail ] := Ref.promise() def terminator def terminate(t) { bind terminator := t def closedLoop := [t, closedLoop] chunkQTail.resolve(closedLoop) chunkQTail := def deadQResolver { to resolve(_) {} } holeQHead := closedLoop } def makeAvailable() { def [item, hole] := Ref.promise() enqueue(hole, &holeQTail) when (item) -> { if (item == null || Ref.isBroken(item)) { terminate(item) } else { escape bogus { enqueue(Chunk.coerce(item, bogus), &chunkQTail) } catch p { terminate(Ref.broken(p)) } } } } makeAvailable() def outStream implements OutStream { to __printOn(tw :TextWriter) { tw.write("->") tw.quote(label) tw.write(" pipe") } to reserve() { def qCur := holeQHead def qRest holeQHead := qRest return when (qCur) -> { def [r, bind qRest] := qCur r } } /** Flushing has no effect for this variety of stream. */ to flush() {} to getChunkType() { return Chunk } to terminates() { return terminator } } def inStream implements InStream { to __printOn(tw :TextWriter) { tw.write("<-") tw.quote(label) tw.write(" pipe") } to takeAtMost(maximum) { if (Ref.isBroken(chunkQHead)) { throw(Ref.optProblem(chunkQHead)) } def qCur := chunkQHead def takenChunk := when (qCur <- get(0)) -> { def [availableChunk, qRest] := qCur if (availableChunk == null || Ref.isBroken(availableChunk) || maximum == ALL || availableChunk.size() <= maximum) { # chunk completely used, or stream closed makeAvailable() chunkQHead := qRest availableChunk } else { # chunk partly used def taken := availableChunk.size().min(maximum) chunkQHead := [availableChunk(taken), qRest] availableChunk(0, taken) } } catch p { # XXX should be stream failure chunkQHead := Ref.broken(p) } chunkQHead := Ref.broken("There is already an outstanding take request.") return takenChunk } to getChunkType() { return Chunk } to fail(problem) { terminate(Ref.broken(problem)) } to close() { terminate(null) } } return [outStream, inStream] } }