# Copyright 2005 Kevin Reid, under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ ? pragma.enable("dot-props") > def [ALL, NOW, ADVANCE, QUERY, ELEMENTS] := \ > [EIO::ALL, EIO::NOW, EIO::ADVANCE, EIO::QUERY, EIO::ELEMENTS] > null ? def makeOutStreamShell := > def variablePrintStream() :any { > def stream := makeOutStreamShell( > char, > def printStreamBackend, > def impl { > to write(elements) :void { print(__makeTwine.fromSequence(elements)) } > to flush() :void { print("") } > to terminate(t) :void { print(``) }}) > #printStreamBackend.setAvailable(ALL) > [stream, printStreamBackend] > } # value: ? def makePrintStream() :any { > def [s, b] := variablePrintStream() > b.setAvailable(ALL) > s > } # value: --- Meta-testing ? if (makePrintStream().available() =~ v ? (v != ALL)) { v } --- Join Trivial join ? EIO.join("abcdef".asStream(), makePrintStream()) # stdout: abcdef # value: true Incremental read availability ? def [po, pi] := EIO.pipe([].asMap()); null ? def jr := EIO.join(pi, makePrintStream()) # value: ? po.write("hi") # stdout: hi ? po.write("bye") # stdout: bye (The quoting is to capture the state of the reference *then* rather than after all three subexpressions have evaluated; the fourth item is to delay a bit.) ? [E.toQuote(jr), E.toQuote(po.close()), E.toQuote(jr), interp.waitAtTop(false <- not())] # value: ["", "true", "true", null] # stdout: Incremental write availability ? def [outStream, streamBackend] := variablePrintStream() > def jr := EIO.join("abcdef".asStream(), outStream) # value: ? streamBackend.setAvailable(2) # stdout: ab ? streamBackend.setAvailable(1); streamBackend.setAvailable(2) # stdout: cde ? streamBackend.setAvailable(2) # stdout: f XXX Output close propagation XXX Input failure propagation XXX Output failure propagation XXX Non-closing join XXX Non-synchronous join