# Copyright 2007 Kevin Reid, under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ pragma.syntax("0.9") pragma.enable("call-pattern") def ALL := EIO.getALL() def makeWriterStream implements DeepFrozen, ExitViaHere { to run(Chunk, caller, target) { var lastReservationSerial := -1 var lastDeliverySerial := -1 var flushMark := -1 def resolved := [].asMap().diverge() def termination def processResolutions() { def startingMark := lastDeliverySerial for i in (startingMark.next())..lastReservationSerial { def resolution := resolved.fetch(i, __break) if (!Ref.isResolved(resolution)) { when (resolution) -> { processResolutions() } break } resolved.removeKey(i) lastDeliverySerial := i switch (resolution) { match ==null { caller(target, "close", []) bind termination := resolution } match Ref.broken(p) { caller(target, "fail", [p]) bind termination := resolution } match _ { escape notChunk { caller(target, "write", [Chunk.coerce(resolution, notChunk)]) } catch coercionFailure { caller(target, "fail", [coercionFailure]) bind termination := Ref.broken(coercionFailure) } } } } if (flushMark > startingMark) { caller(target, "flush", []) flushMark := lastDeliverySerial } } def outStream { # implements OutStream to __printOn(tw :TextWriter) { tw.write("->") tw.quote(caller) tw.write("(") tw.quote(target) tw.write(", ...)") } to reserve() { if (Ref.isResolved(termination)) { return termination } def reservationSerial := lastReservationSerial += 1 return def resolver { to resolve(resolution) { resolved[reservationSerial] := resolution processResolutions() } } } to flush() { if (flushMark != lastReservationSerial && lastDeliverySerial == lastReservationSerial) { caller(target, "flush", []) } flushMark := lastReservationSerial } to getChunkType() { return Chunk } to terminates() { return termination } } return outStream } }