# Copyright 2005-2007 Kevin Reid, under the terms of the MIT X license # found at http://www.opensource.org/licenses/mit-license.html ................ ? def ALL := EIO.getALL() ? def transformIn :DeepFrozen := # value: This is an InStream transformer which, unlike mapStream (XXX naming irregularity), does not require input elements to be paired with output elements. ? var state := 0 > def wobbler(elements) :any { > var out := [] > for e in elements { > out += [e] * state > state := (state + 1) % 3 > } > return out > } # value: ? def s := transformIn(List, wobbler, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9].asStream()) # value: <--[0, 1, 2, 3, 4, ...].asStream() ? s.getChunkType() # value: List ? EIO.takeAll(s) # value: [1, 2, 2, 4, 5, 5, 7, 8, 8] Upward closure ? def p; def s := transformIn(List, fn x {x}, def testStream { to close() { stderr.println("close") } }) # value: <-<__main$_>- ? s.close() # stderr: close # ? def p; def s := transformIn(List, fn x {x}, def testStream { to fail(p) { stderr.println(p) } }) # value: <-<__main$_>- ? s.fail("ding") # stderr: ding # Finishing chunk ? def s := transformIn(String, > def _ { to run(x) { return x } > to finish() { return "b" }}, > "a".asStream()) > EIO.takeAll(s) # value: "ab" Finishing error The finish may cause failure; this is used for cases where the input ends in the middle of some sort of segment boundary. ? def s := transformIn(String, > def _ { to run(x) { return x } > to finish() { return Ref.broken("foo") }}, > "".asStream()) > def c := s.takeAtMost(1) # value: Estimation By default, as many input elements are requested as output elements. ? def s1 := "abcdef".asStream() > def s2 := transformIn(String, fn x { x }, s1) > s2.takeAtMost(3) # value: "abc" ? s1 # value: "def".asStream() This can be customized with an estimate/1 method, which should compute the number of elements to take to produce the given number of output elements. Under- or over-estimation is acceptable. ? def s1 := "abcdef".asStream() > def s2 := transformIn(String, > def _ { to run(c) { return c } > to estimate(o) { return o // 3 } }, > s1) > s2.takeAtMost(3) # value: "a" ? s1 # value: "bcdef".asStream() Overestimation & buffer If the estimator overestimates, the extra elements will be buffered, and available even if the source stream is terminated. ? def s1 := "abcdef".asStream() > def s2 := transformIn(String, > def _ { to run(c) { return c } > to estimate(o) { return o * 3 } }, > s1) > s2.takeAtMost(1) # value: "a" ? s1 # value: "def".asStream() ? s1.close() ? s2.takeAtMost(3) # value: "bc" ? [s2.takeAtMost(3)] # value: [null] Maximal underestimation ? state := 0 > def s := transformIn(List, wobbler, [1, 2, 3].asStream()) > s.takeAtMost(1) # value: [] This returns the empty list because wobbler returns 0 elements for the first input element. This is non-optimal, but not currently considered erroneous. Bad estimator ? def s := transformIn(String, > def _ { to run(c) { return c } > to estimate(o) { return 0 } }, > "abcdef".asStream()) > s.takeAtMost(1) # problem: <__main$_> estimated chunk size 0, which cannot be correct