# Copyright 2005 Kevin Reid, under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ ? def ALL := EIO.getALL() ? def makeOutStreamShell := # value: ? def stream := makeOutStreamShell(float64, def streamBackend, def impl { > to write(elements) :void { print(elements) } > to flush() :void { print("flush") } > to terminate(t) :void { print(`terminate $t`) } > }) # value: -> ? stream.available() # value: 0 ? def wmatest(i) :void { stream.whenMoreAvailable(thunk{ print(`more $i`); wmatest(i + 1) }) }; wmatest(1) ? streamBackend.setAvailable(10) # stdout: more 1 ? stream.available() # value: 10 ? stream.write([1.0, 2.0, 3.0]) # stdout: [1.0, 2.0, 3.0] ? stream.available() # value: 7 insufficient availability ? stream.write("abcdefgh") # problem: test ALL while writing ? streamBackend.setAvailable(ALL) # stdout: more 2 ? stream.available() == ALL # value: true ? stream.write([1000.0]) # stdout: [1000.0] ? stream.available() == ALL # value: true misc methods ? stream.flush() # stdout: flush XXX impl should get to give an opinion ? stream.maxAvailable() == ALL # value: true XXX test that this is enforced ? stream.getElementType() # value: float64 ? def terminator := stream.terminates() # value: ? stream.isTerminated() # value: false termination and consequences XXX Java FQN ? stream.fail("biff" :) # stdout: terminate # value: ? stream.fail("blaff" :) # problem: this resolver's ref has already been resolved, therefore cannot be resolved to ? stream.close() # value: ? stream.write([]) # problem: biff ? stream.flush() -- no effect XXX check that setAvailable may not now be used ? stream.available() # value: 0 ? stream.getElementType() # value: float64 ? [terminator, stream.terminates() == terminator] # value: [, true] ? stream.maxAvailable() # value: 0 closure (testing with fresh stream) ? def stream := makeOutStreamShell(float64, def streamBackend, def impl { > to write(elements) :void { print(elements) } > to flush() :void { print("flush") } > to terminate(_) :void {} > }) # value: -> ? def terminator := stream.terminates() # value: ? stream.close() # value: true ? stream.close() # value: true ? stream.flush() -- no effect ? stream.write([]) # problem: ? stream.available() # value: 0 ? stream.getElementType() # value: float64 ? [terminator, stream.terminates() == terminator] # value: [true, true] ? stream.maxAvailable() # value: 0 whenAvailable ? def stream := makeOutStreamShell(float64, def streamBackend, def impl { > to write(elements) :void { print(elements) } > to flush() :void { print("flush") } > to terminate(_) :void {} > to wants(n) :void { print(`wants $n`) } > }) # value: -> ? stream.whenAvailable(0, thunk { print("wa1"); '1' }) # stdout: wa1 # value: '1' ? def wap2 := stream.whenAvailable(4, thunk { print("wa2"); '2' }) # stdout: wants 4 # value: ? stream.whenAvailable(4, thunk { print("wa3"); '3' }) # problem: whenAvailable reactor <_> already registered ? streamBackend.setAvailable(2) ? streamBackend.setAvailable(6); "set" # stdout: wa2 # value: "set" ? wap2 # value: '2' ? def wap4 := stream.whenAvailable(7, thunk { throw("blargh") }) # stdout: wants 7 # value: ? streamBackend.setAvailable(7) ? E.toQuote(wap4) # value: ">" ? wap4() # problem: blargh XXX ejectors for kinds of failure from write() --- Input streams --- ? def makeInStreamShell := # value: ? pragma.enable("dot-props") > def NOW := EIO::NOW > def LATER := EIO::LATER > def WAIT := EIO::WAIT > def ADVANCE := EIO::ADVANCE > def QUERY := EIO::QUERY > def ELEMENTS := EIO::ELEMENTS > def STATUS := EIO::STATUS > null read may return more elements than currently available wants exists so that stream#whenAvailable can create suction if the stream is lazy ? def buffer := [].diverge(int) > pragma.enable("accumulator") > def stream := makeInStreamShell(int, def streamBackend, def impl { > to semiObtain(n, proceed, report) :any { > def r := if (report == ELEMENTS) { buffer(0, n) } > if (proceed == ADVANCE) { buffer(0, n) := [] } > return r > } > to wants(minimum) :void { print(`wants $minimum`) } > to terminate(_) :void {} > }) # value: <- Testing bad arguments to obtain: ? stream.obtain(-1, 10, NOW, ADVANCE, ELEMENTS) # problem: -1 is not in the region (int >= 0) ? stream.obtain(5, 4, NOW, ADVANCE, ELEMENTS) # problem: doesn't match any of [nullOk, (int >= 5)] ? stream.obtain(ALL, 10, NOW, ADVANCE, ELEMENTS) # XXX should be a more specific error - the current one refers to that atLeast=ALL requires atMost=ALL # problem: must be null ? stream.obtain(1, 1, 43, ADVANCE, ELEMENTS) # problem: must be one of ["NOW", "LATER", "WAIT"]: 43 ? stream.obtain(1, 1, NOW, "foo", ELEMENTS) # problem: must be one of ["ADVANCE", "QUERY"]: "foo" ? stream.obtain(1, 1, NOW, ADVANCE, def el extends ELEMENTS {}) # XXX should this case be allowed instead? # problem: must be one of ["ELEMENTS", "STATUS"]: -- actual usage ? stream.whenAvailable(1, thunk { print("first inStream whenAvailable") }) # stdout: wants 1 # value: ? stream.obtain(5, 15, NOW, ADVANCE, ELEMENTS) # problem: ? buffer.append(__makeList.fromIteratableValues(1..18)) ? streamBackend.setAvailable(buffer.size()) # stdout: first inStream whenAvailable ? [stream.obtain(5, 10, NOW, ADVANCE, ELEMENTS), stream.available()] # value: [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 8] ? [stream.obtain(4, 5, NOW, QUERY, ELEMENTS), stream.available()] # value: [[11, 12, 13, 14, 15], 8] ? [stream.obtain(4, 6, NOW, QUERY, ELEMENTS), stream.available()] # value: [[11, 12, 13, 14, 15, 16], 8] ? def t := stream.obtain(6, 6, NOW, ADVANCE, STATUS) # value: ? t == stream.terminates() # value: true testing schedule==LATER ? stream.available() # value: 2 peek ? stream.obtain(0, ALL, NOW, QUERY, ELEMENTS) # value: [17, 18] requesting more elements than currently available ? def elementsVow := stream.obtain(3, 4, LATER, ADVANCE, ELEMENTS) # value: they are visibly claimed: ? stream.available() # value: 0 ? stream.obtain(0, ALL, NOW, QUERY, ELEMENTS) # value: [] ? stream.obtain(1, ALL, NOW, QUERY, ELEMENTS) # problem: XXX proper plurals This whenAvailable does not trigger immediately, even though there are 2 elements actually in the stream, because they have been claimed by the obtain for 3..6. ? stream.whenAvailable(2, thunk { print("whenAvailable from 17...") }) # stdout: wants 2 # value: ? elementsVow # value: After enough elements are available, the promise will be resolved. ? buffer.append([303, 116, 191, 222]) ? streamBackend.setAvailable(buffer.size()) # stdout: whenAvailable from 17... ? elementsVow # value: [17, 18, 303, 116] Termination ? def pwA := stream.whenAvailable(100, thunk {print(`termination test whenAvailable -- ${stream.terminates()}`); 0}) # stdout: wants 100 # value: ? def poAE := stream.obtain(50, 50, LATER, ADVANCE, ELEMENTS) # value: ? def poAS := stream.obtain(50, 50, LATER, ADVANCE, STATUS) # value: ? interp.waitAtTop(pwA) > stream.fail("boo") # value: # stdout: termination test whenAvailable -- ? [pwA, poAE, poAS] # value: [0, failed: problem: boo>, failed: problem: boo>] -- Input stream behavior on a dying impl ? def stream := makeInStreamShell(int, def streamBackend, def impl { > to semiObtain(count, proceed, report) :any { > throw("SPLORK") > } > to terminate(_) :void {} > }) # value: <- ? streamBackend.setAvailable(100) ? stream.terminates() # value: ? stream.obtain(1, 1, NOW, ADVANCE, ELEMENTS) # problem: SPLORK ? stream.terminates() # value: > ? stream.obtain(1, 1, NOW, ADVANCE, ELEMENTS) # problem: <- failed: --- remaining and obtain(ALL, ...) ? def buffer := [0, 1, 2, 3, 4, 5, 6].diverge(int) > def stream := makeInStreamShell(int, def streamBackend, def impl { > to semiObtain(count, proceed, report) :any { > def r := buffer(0, count) > if (proceed == ADVANCE) { buffer(0, count) := [] } > return r > } > to terminate(_) :void {} > }) # value: <- ? streamBackend.setAvailable(buffer.size()) ? [stream.available(), stream.remaining(), stream.terminates()] # value: [7, null, ] ? stream.obtain(ALL, ALL, NOW, ADVANCE, ELEMENTS) # problem: > ? streamBackend.resolveRemaining(buffer.size()) ? [stream.available(), stream.remaining(), stream.terminates()] # value: [7, 7, ] ? stream.obtain(1, 1, NOW, ADVANCE, ELEMENTS) # value: [0] ? [stream.available(), stream.remaining(), stream.terminates()] # value: [6, 6, ] ? stream.obtain(ALL, ALL, NOW, ADVANCE, ELEMENTS) # value: [1, 2, 3, 4, 5, 6] shell automatically terminates on reaching remaining ? stream.terminates() # value: true --- stream with 0 remaining should be closed ? def stream := makeInStreamShell(int, def streamBackend, def impl { > to semiObtain(count, proceed, report) :any { > return [] > } > to terminate(_) :void {} > }) # value: <- ? streamBackend.resolveRemaining(0); stream.terminates() # value: true --- obtain shorthands NOTE: These tests, for convenience, assume that a string's stream is implemented using this InStreamShell. ' ? def stream := "abcdefghijklmnopqrstuvwxyz".asStream() # value: "abcdefghijklmnopqrst\...".asStream() ? stream.read(1, 2) # value: "ab" ? interp.waitAtTop(def p := stream.readLater(1, 2)); p # value: ? p # value: "cd" ? stream.readOptOne() # value: 'e' XXX this test won't work until atLeast is fixed: if the stream is closed, fewer than atLeast elements may be returned, but we don't implement this x ? "".asStream().readOptOne() x # value: null ? "abcdef".asStream().readAll() # value: "abcdef" XXX this is not working for mysterious reasons; test and implementation disabled until later x ? interp.waitAtTop(def p := "abcdef".asStream().readAllLater(); p) x # value: x x ? p x # value: "abcdef" ? stream.peek(1, 2) # value: "fg" ? stream.peek(1, 2) # value: "fg" ? def p := stream.skip(2) # value: ? p == stream.terminates() # value: true XXX becomesReady (not done yet because of inconsistent documentation: defined as returning false==STATUS, but that would not result in resolution upon availability as documented in text) -- XXX write tests that STATUS is correct: according to http://www.erights.org/elib/concurrency/eio/obtaining.html the return value of STATUS is "the termination status of the stream *after* these elements have been obtained" (emphasis mine) XXX for both directions of streams: notify impl synchronously upon termination? (terminator provides eventual-or-poll notification) XXX InStream sugar methods XXX maxAvailable