# Copyright 2005-2007 Kevin Reid, under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ ? pragma.syntax("0.9") > pragma.enable("dot-props") > pragma.enable("call-pattern") > pragma.enable("one-method-object") >def ALL := EIO::ALL > null ? def makeWriterStream := # value: ? var notifyPair := Ref.promise() > def notify() { notifyPair[1].resolve(null); notifyPair := Ref.promise() } # value: ? def makePrintStream() { > return makeWriterStream( > String, > E.call, > def reporter { > to write(elements) :void { print(elements); notify() } > to flush() :void { print(""); notify() } > to close() :void { print(``); notify() } > to fail(p) :void { print(``); notify() } > }) > } # value: --- Join Trivial join ? [EIO.join("abcdef".asStream(), makePrintStream())] # stdout: abcdef # value: [null] The return value is near because both streams are prompt; the output is delayed because our test Incremental read availability ? def [po, pi] := EIO.pipe([].asMap()); null ? def jr := EIO.join(pi, makePrintStream()) # value: ? po.reserve() <- resolve("hi"); interp.waitAtTop(notifyPair[0]) # stdout: hi ? po.reserve() <- resolve("bye"); interp.waitAtTop(notifyPair[0]) # stdout: bye ? po.reserve() <- resolve(null); interp.waitAtTop(notifyPair[0]) # stdout: Incremental write availability XXX stream redesign: rewrite these tests for the new streams x ? def [outStream, streamBackend] := variablePrintStream() x > def jr := EIO.join("abcdef".asStream(), outStream) x # value: x x ? streamBackend.setAvailable(2) x # stdout: ab x x ? streamBackend.setAvailable(1); streamBackend.setAvailable(2) x # stdout: cde x x ? streamBackend.setAvailable(2) x # stdout: f XXX Output close propagation XXX Input failure propagation XXX Output failure propagation XXX Non-closing join XXX Non-synchronous join