# Copyright 2005 Kevin Reid, under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ pragma.enable("easy-return") pragma.disable("explicit-result-guard") pragma.enable("dot-props") def makeInStreamShell := def NOW := EIO::NOW def ELEMENTS := EIO::ELEMENTS def terminate implements DeepFrozen { to run(stream, terminator) { switch (Ref.optProblem(terminator)) { match p :notNull { stream.fail(p) } match ==null { stream.close() } match _ { throw("reality failure") } } }} def chunkedMapStream implements DeepFrozen { to run(outType :Guard, sourceStream, func, namer) :any { def ourStream := makeInStreamShell(outType, def backend, def mapStreamImpl { to __printOn(tw :TextWriter) { tw.print(namer) tw.write("-") tw.quote(sourceStream) } to semiObtain(count :EIO::Range, proceed, report) { def original := sourceStream.obtain(count, count, NOW, proceed, report) return if (report == ELEMENTS) { func(original) } else { original } } to terminate(t) { terminate(sourceStream, t) } }) def update() { backend.setAvailable(sourceStream.available()) if ((def r := sourceStream.remaining()) != EIO::ALL) { backend.resolveRemaining(r) } if (Ref.isResolved(def t := sourceStream.terminates())) { terminate(ourStream, t) } sourceStream.whenMoreAvailable(update) } update() return ourStream } } def mapStream implements DeepFrozen { /** Return an InStream which takes elements from the given InStream and provides the results of the given function applied to each element. */ to run(outType, sourceStream, func) :any { def buffer := [].diverge(outType) return chunkedMapStream(outType, sourceStream, def unchunker(l) { buffer.setSize(l.size()) for i => x in l { buffer.put(i, func(x)) } return buffer.snapshot() }, func) } /** Return an InStream which takes elements from the given InStream and provides the result of the given function applied to lists of elements. */ to chunked(outType, sourceStream, func) :any { return chunkedMapStream(outType, sourceStream, func, func) } } # XXX should there be a failure-problem-mapping function parameter?