# Copyright 2005 Kevin Reid, under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ pragma.enable("easy-return") pragma.disable("explicit-result-guard") pragma.enable("dot-props") def makeOutStreamShell := def makeInStreamShell := def makeSimplePipe implements DeepFrozen { to run([=> Element :Guard := any, => bufferLimit :(EIO::Range) := EIO::ALL, => synchronous :boolean := false, => label :DeepFrozen := "anonymous"]) { def buffer := [].diverge(Element) def inBackend def inStream def outStream := makeOutStreamShell(Element, def outBackend, def outImpl { to __printOn(tw :TextWriter) { tw.quote(label) tw.write(" pipe") } to write(elements :List) { buffer.append(elements) if (synchronous) { inBackend.setAvailable(buffer.size()) } else { inBackend <- setAvailable(buffer.size()) } } to terminate(t) { if (!inStream.isTerminated()) { if (Ref.isBroken(t)) { inStream.fail(Ref.optProblem(t)) } else { inStream.close() } } } }) bind inStream := makeInStreamShell(Element, inBackend__Resolver, def inImpl { to __printOn(tw :TextWriter) { tw.quote(label) tw.write(" pipe") } to semiObtain(count, proceed, report) :any { def n := if (count == EIO::ALL) { buffer.size() } \ else { count.max(buffer.size()) } def r := if (report == EIO::ELEMENTS) { buffer(0, n) } if (proceed == EIO::ADVANCE) { buffer(0, n) := [] inBackend.setAvailable(buffer.size()) if (bufferLimit != EIO::ALL) { outBackend.setAvailable(bufferLimit - buffer.size()) } } return r } to terminate(t) { if (!outStream.isTerminated()) { if (Ref.isBroken(t)) { outStream.fail(Ref.optProblem(t)) } else { outStream.close() } } } }) outBackend.setAvailable(bufferLimit) return [outStream, inStream] } }