# Copyright 2005-2007 Kevin Reid, under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ pragma.syntax("0.9") pragma.enable("dot-props") def chunkedMapStream as DeepFrozen { to run(OutChunk :Guard, sourceStream, func, namer) :any { def doMap(chunk) { def mapped :OutChunk := func(chunk) if (mapped.size() != chunk.size()) { throw(`mapStream chunk function $func produced ${mapped.size()} elements instead of ${chunk.size()}, ${chunk} to ${mapped}`) } return mapped } def mappingStream { to __printOn(tw :TextWriter) { tw.write("<-") tw.print(namer) tw.write("-") tw.quote(sourceStream) } to takeAtMost(maximum :(EIO::Range)) { return switch (sourceStream.takeAtMost(maximum)) { match p ? (!Ref.isResolved(p)) { when (p) -> { switch (p) { match t ? (t == null || Ref.isBroken(t)) { t } match chunk { doMap(chunk) } } } } match t ? (t == null || Ref.isBroken(t)) { t } # XXX add a restriction that the result list must be the same length match chunk { doMap(chunk) } } } to getChunkType() { return OutChunk } to close() { sourceStream.close() } to fail(p) { sourceStream.fail(p) } } return mappingStream } } def mapStream implements DeepFrozen, ExitViaHere { /** Return an InStream which takes elements from the given InStream and provides the results of the given function applied to each element. */ to run(OutElement, sourceStream, func) :any { def buffer := [].diverge() # XXX support chunk types return chunkedMapStream(List[OutElement], sourceStream, def unchunker(l) { for i => x in l { buffer.put(i, func(x) :OutElement) } return buffer.snapshot() }, func) } /** Return an InStream which takes elements from the given InStream and provides the result of the given function applied to lists of elements. */ to chunked(OutChunk, sourceStream, func) :any { return chunkedMapStream(OutChunk, sourceStream, func, func) } } # XXX should there be a failure-problem-mapping function parameter?