# Copyright 2004-2007 Kevin Reid, under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ pragma.syntax("0.9") def ALL :DeepFrozen := EIO.getALL() def InStream :DeepFrozen := def defaultEstimator as DeepFrozen { to estimate(size) { return size } } def whenMaybeNow(ref, reactor) as DeepFrozen { return if (Ref.isResolved(ref)) { reactor(ref) } else { Ref.whenResolved(ref, reactor) } } # interface ChunkFilter[InChunk, OutChunk] { # to run(chunk :InChunk) :OutChunk # optional to finish() :OutChunk # optional to estimate(_ :SizeFor[OutChunk]) :SizeFor[InChunk] # } /** Transforms an InStream with a stateful function. ... When the source stream becomes closed, chunkFilter.finish() will be invoked to provide a final chunk, if it __respondsTo that message. ... If chunkFilter.estimate/1 never overestimates, then transformIn will never take unneeded elements from the underlying stream. */ def transformIn implements DeepFrozen { to run(Chunk :Guard, chunkFilter, underlying :InStream) { def sizeEstimator := if (chunkFilter.__respondsTo("estimate", 1)) { chunkFilter } else { defaultEstimator } var optBuffer := null var shouldFinish := true def takeFromBuffer(maximum) { return if (maximum == ALL || optBuffer.size() <= maximum) { def r := optBuffer optBuffer := null r } else { def r := optBuffer(0, maximum) optBuffer := optBuffer(maximum) r } } def doBuffer(chunk, maximum) { require(optBuffer == null) return if (chunk == null || Ref.isBroken(chunk)) { # We could choose to terminate the underlying stream here. We don't, # because the information in the problem shouldn't necessarily be # revealed to upstream, and there might be reason to discard this # transformer and start using the underlying stream directly. # # XXX There should be an option to change this behavior, because some # clients will want automatic upstream termination of the underlying # stream. chunk } else { optBuffer := chunk takeFromBuffer(maximum) } } return def transformInStream { to __printOn(out :TextWriter) { out.write("<-") out.print(chunkFilter) out.write("-") out.printSame(underlying) } to getChunkType() { return Chunk } to fail(p) { underlying.fail(p) } to close() { underlying.close() } to takeAtMost(maximum) { return if (optBuffer != null) { takeFromBuffer(maximum) } else { def estimate := if (maximum == ALL || maximum.isZero()) { maximum } else { def estimate := sizeEstimator.estimate(maximum) if (estimate.isZero()) { throw(`$sizeEstimator estimated chunk size $estimate, which cannot be correct`) } estimate } whenMaybeNow(underlying.takeAtMost(estimate), fn uchunk { switch (uchunk) { match b ? (b == null || Ref.isBroken(b)) { if (shouldFinish && (shouldFinish := false chunkFilter.__respondsTo("finish", 0))) { doBuffer(chunkFilter.finish(), maximum) } else { b } } match _ { doBuffer(chunkFilter(uchunk), maximum) } } }) } } } } }