Iteratees
Overview
play.api.libs.iteratee.Iteratee[E, +A]
は入力から出力を組み立てる Consumer となる。単に入力 E
に対して、どのように出力 A
を組み立てるかのみを定義する。
ヘルパーメソッド Iteratee.fold[E, A]
により、アキュームレータ A
と入力 E
から、出力 A
を返す関数を指定するだけで Iteratee[E, A]
を実装できる。
val it: Iteratee[String, Int] = Iteratee.fold[String, Int](0) { (acc, x) =>
acc + x.toInt
}
play.api.libs.iteratee.Enumerator[E]
は Iteratee
に入力を与える Producer となる。
val result: Future[Int] = Enumerator("1", "2", "3") |>>> it
result.onComplete(println) // Success(6) ... 1 + 2 + 3
Iteratee
Iteratee は、一つのインスタンスで連続する入力を処理する(ループ等を繰り返す)のではない。ステップ毎に新たな Iteratee を生成して引き継いでいく。
Input
play.api.libs.iteratee.Input
は Iteratee に送る入力を表す。
trait Input[+E]
object Input {
case class El[+E](e: E) extends Input[E]
case object Empty extends Input[Nothing]
case object EOF extends Input[Nothing]
}
Input.El(e)
は入力があることを示す: Iteratee は入力を出力に変換し、次の Iteratee に引き継ぐ。Input.Empty
は入力がないことを示す: Iteratee は何もせずに、次の Iteratee に引き継ぐ。Input.EOF
は入力の終端を示す: Iteratee は処理結果を返す。
DoneIteratee
play.api.libs.iteratee.Done[E, A]
により、どのような入力があっても固定の結果を返す Iteratee を生成できる。Input.EOF
が送られた時に、最終の出力結果を返すために用いる。
val doneIt = Done[String, Int](123, Input.Empty)
// It will print Success(123) regardless of the type of input.
Iteratee.flatten(doneIt.feed(Input.EOF)).run.onComplete(println)
Iteratee.flatten(doneIt.feed(Input.Empty)).run.onComplete(println)
Iteratee.flatten(doneIt.feed(Input.El("345"))).run.onComplete(println)
ContIteratee
play.api.libs.iteratee.Cont[E, A])
により、継続する Iteratee を生成できる。入力に応じた次の Iteratee を返す関数 Input[E] => Iteratee[E, A]
を apply
メソッドに渡せばよい。
def step(acc: Int)(in: Input[String]): Iteratee[String, Int] = in match {
// Add the number converted from a string to the state
case Input.El(e: String) => Cont(step(acc + e.toInt))
// Keep the state
case Input.Empty => Cont(step(acc))
// Done iteration
case Input.EOF => Done(acc, Input.EOF)
}
val contIt = Cont[String, Int](step(0))
Iteratee.flatten(contIt.feed(Input.EOF)).run
.onComplete(a => assert(Success(0) === a))
Iteratee.flatten(contIt.feed(Input.El("123"))).run
.onComplete(a => assert(Success(123) === a))
(for {
it1 <- contIt.feed(Input.El("12"))
it2 <- it1.feed(Input.El("34"))
it3 <- it2.feed(Input.Empty)
it4 <- it3.feed(Input.El("56"))
a <- it4.run
} yield a).onComplete(println) // Success(102) ... 12 + 34 + 56
ErrorIteratee
play.api.libs.iteratee.Error[E]
により、エラーを返す Iteratee を生成できる。Done
と同様に、この Iteratee に引き継がれた場合は、以降の入力は無視される。
def step(acc: Int)(in: Input[String]): Iteratee[String, Int] = in match {
case Input.El(e: String) =>
if (!e.isEmpty) Cont(step(acc + e.toInt))
else Error("empty string", Input.Empty)
case Input.Empty => Cont(step(acc))
case Input.EOF => Done(acc, Input.EOF)
}
(for {
it1 <- errorIt.feed(Input.El("12"))
it2 <- it1.feed(Input.El(""))
// The following feed will be ignored because it just
// returns Error(msg, e) regardless of the type of input.
it3 <- it2.feed(Input.El("56"))
a <- it3.run
} yield a).onComplete(println) // Failure(java.lang.RuntimeException: empty string)
Step
独自の Iteratee を作成するには、Iteratee#fold
メソッドを実装する。
def fold[B](folder: Step[E, A] => Future[B])
(implicit ec: ExecutionContext) : Future[B]
fold
メソッドにより、folder
関数を通じて Iteratee は自身がどのステップ play.api.libs.iteratee.Step
であるかを伝えて処理結果を得る。
trait Step[E, +A]
object Step {
case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
case class Cont[E, +A](k: Input[E] => Iteratee[A, E]) extends Step[E, A]
case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]
}
folder
関数はどのようなものかは、Iteratee#run
の実装を参考にするとよい。 ContIteratee であれば、Input.EOF
を送って処理結果を得ている。
def run: Future[A] = fold({
case Step.Done(a, _) => Future.successful(a)
case Step.Cont(k) => k(Input.EOF).fold({
case Step.Done(a1, _) => Future.successful(a1)
case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF")
case Step.Error(msg, e) => sys.error(msg)
})(dec)
case Step.Error(msg, e) => sys.error(msg)
})(dec)
Done / Cont / Error の各 Iteratee の fold
の実装は、以下と同等である。
val k(in: Step): Iteratee[String, Int] = in match {
...
}
// val contIteratee = Cont[String, Int](k)
val contIteratee = new Iteratee[String, Int] {
def fold[B](folder: Step[String,Int] => Future[B])
(implicit ec: ExecutionContext) : Future[B] =
folder(Step.Cont(k))
}
// val doneIteratee = Done[String, Int](1, Input.Empty)
val doneIteratee = new Iteratee[String, Int] {
def fold[B](folder: Step[String,Int] => Future[B])
(implicit ec: ExecutionContext) : Future[B] =
folder(Step.Done(1, Input.Empty))
}
// val errorIteratee = Error[String]("something wrong", Input.Empty)
val errorIteratee = new Iteratee[String, Int] {
def fold[B](folder: Step[String,Int] => Future[B])
(implicit ec: ExecutionContext) : Future[B] =
folder(Step.Error("something wrong", Input.Empty)
}
Helper Methods
consume
val it: Iteratee[String, String] = Iteratee.consume[String]()
val result: Future[String] = Enumerator("foo", "bar", "baz") |>>> it
result.onComplete(println) // foobarbaz
foreach
val it: Iteratee[String, Unit] = Iteratee.foreach[String](prinln)
Enumerator("foo", "bar", "baz") |>>> it
// foo
// bar
// baz
flatten
継続する Iteratee は、遅延評価で非同期に得るため Future[Iteratee[E, A]]
となる。Iteratee#run
で Input.EOF
を送るには、flatMap
や for-comprehension を介して行なう必要がある。
val it: Iteratee[String, String] = Iteratee.consume[String]()
val newIt: Future[Iteratee[String, String]] = Enumerator("foo", "bar") |>> it
val result: Future[String] = newIt.flatMap(_.run)
Iteratee.flatten
は Iteratee#fold
内部で flatMap
を行う Iteratee
を生成する。あたかも初回の Iteratee のように振る舞う。
val futureIt: Iteratee[String, String] = Iteratee.flatten(newIt)
val result: Future[String] = futureIt.run
Enumerator
Enumerator は Iteratee に送る入力ストリームを生成する。
val enumerator1: Enumerator[String] = Enumerator("foo", "bar")
val enumerator2: Enumerator[String] = Enumerator("baz", "qux")
val enumerator = enumerator1.andThen(enumerator2)
val it: Iteratee[String, String] = Iteratee.consume[String]()
val newIt: Future[Iteratee[String, String]] = enumerator(it)
val result: Future[String] = Iteratee.flatten(newIt).run
result.onComplete(println) // foobarbazqux
>>> (andThen)
Enumerator#andThen
で Enumerator を連結することができる。エイリアスとして Enumerator#>>>
が提供されている。
val enumerator1: Enumerator[String] = Enumerator("foo", "bar")
val enumerator2: Enumerator[String] = Enumerator("baz", "qux")
//val enumerator = enumerator1.andThen(enumerator2)
val enumerator = enumerator1 >>> enumerator2
|>> (apply)
Enumerator#apply
に Iteratee を渡す事で、Future[Iteratee[E, A]]
が得られる。Iteratee#feed
は内部的には同等のことを行なっている。エイリアスとして Enumerator#|>>
が提供されている。
val it: Iteratee[String, String] = Iteratee.consume[String]()
val enumerator: Enumerator[String] = Enumerator("Foo", "Bar", "Baz")
//val newIt: Future[Iteratee[String, String]] = enumerator(it)
val newIt: Future[Iteratee[String, String]] = enumerator |>> it
|>>> (run)
Enumerator#|>>>
により、入力ストリームの送信後に Input.EOF
を送信して処理結果を得ることができる。
val it: Iteratee[String, String] = Iteratee.consume[String]()
val enumerator: Enumerator[String] = Enumerator("Foo", "Bar", "Baz")
//val result: Future[String] = enumerator(it).flatMap(_.run)
val result: Future[String] = enumerator |>>> it
Future[Iteratee[E, A]]
のまま flatMap
で Iteratee#run
を送って Future[B]
を得るので、Iteratee
に置き換える Iteratee.flatten
とは異なる。
val newIt: Future[Iteratee[String, String]] = enumerator |>> it
val futureIt: Iteratee[String, String] = Iteratee.flatten(newIt)
val result: Future[String] = futureIt.run
Helper Methods
repeatM / generateM
Enumerator は、無限の入力ストリームを扱うことができる。Enumerator.repeatM
に Future[E]
を返す関数を渡すことで、無限に反復実行される。
import play.api.libs.concurrent.Promise
...
val dateEnumerator: Enumerator[Date] = Enumerator.repeatM {
Promise.timeout(new Date(), 1.seconds)
}
Enumerator.generateM
では、Future[Option[E]]
を返す関数を渡すことで、None
の場合に反復実行を停止する。
val endOfTime = System.currentTimeMillis() + 3000L
val dateEnumerator: Enumerator[Date] = Enumerator.generateM {
Promise.timeout({
if (System.currentTimeMillis() < endOfTime) Some(new Date())
else None
}, 1.seconds)
}
fromStream / fromFile
Enumerator.fromStream
では java.io.InputStream
を入力ソースとすることができる。内部的には Enumerator.generateM
を用いており、読み込み中に Some[Array[Byte]]
を返し、読み込み完了後に None
を返している。
val streamEnumerator: Enumerator[Array[Byte]] = {
Enumerator.fromStream(new FileInputStream(new File("/path/to/file")))
}
// or use Enumerator.fromFile
val fileEnumerator: Enumerator[Array[Byte]] = {
Enumerator.fromFile(new File("/path/to/file"))
}
Enumeratee
plap.api.libs.iteratee.Enumeratee[From, To]
により、ストリームデータを変換をすることができる。
ヘルパーメソッド Enumeratee.map
に変換関数を渡せば Enumeratee を生成できる。
val byteToHexStr: Enumeratee[Byte, String] = Enumeratee.map[Byte] { b =>
"%02X".format(b)
}
&>> (transform)
Enumeratee#transform
により、前段に変換を加えた Iteratee を生成できる。エイリアスとして Enumeratee#&>>
が提供されている。
val consume: Iteratee[String, String] = Iteratee.consume[String]()
//val it = byteToHexStr.transform(consume)
val it: Iteratee[Byte, String] = byteToHexStr &>> consume
&> (through)
Enumeratee は Enumerator に対しても適用できる。Enumerater#through
により、後段に変換を加えた Enumerator を生成できる。エイリアスとして Enumerator#&>
が提供されている。
// Make sure that either "&>" or "through" is defined
// on Enumerator, not on Enumeratee.
val enumerator = Enumerator("Hello".getBytes())
//val hexStrEnumerator = enumerator.through(byteHexStr)
val hexStrEnumerator: Enumerator[Byte] = enumerator &> byteToHexStr
apply
元の Iteratee がすでに EOF
を受けて完了していた場合、Enumeratee#transform
で Iteratee を変換したところで、その後の入力は破棄されてしまう。
val sum = Iteratee.fold[Int, Int](0) { (acc, x) =>
acc + x
}
val strToInt = Enumeratee.map[String](_.toInt)
val doneIt = Iteratee.flatten(Enumerator(1, 2) >>> Enumerator.eof |>> sum)
// The iteratee doneIt has been done,
Iteratee.isDoneOrError(doneIt).onComplete(println) // Success(true)
val transformedIt = strToInt &>> doneIt
// so any inputs after that will be ignored.
(Enumerator("3", "4", "5") |>>> transformedIt).onComplete(println) // Success(3)
Enumeratee#apply
は、変換前の Iteratee を出力とする Iteratee を生成する。
// The method apply returns Iteratee[String, Iteratee[Int, Int]],
val adaptedIt: Iteratee[String, Iteratee[Int, Int]] = strToInt(sum)
// so we can get the original iteratee after the adaptedIt is done.
val originalIt: Interatee[Int, Int] = Iteratee.flatten(
Enumerator("1", "2") |>>> adaptedIt)
// The original iteratee has not been done yet because it's just
// an output of the adaptedIt.
Iteratee.isDoneOrError(originalIt).onComplete(println) // Success(false)
(Enumerator(3, 4, 5) |>>> originalIt).onComplete(println) // Success(15)
変換した Iteratee を EOF
で完了させた後でも、出力は変換元の Iteratee であるので、入力を継続できる。すなわち Enumeratee を切り替えながら、異なる Enumerator からの入力を Iteratee にまとめることができる。
Traversable
Enumeratee.(take|drop|takeWhile|dropWhile|...)
等のヘルパーメソッドは、他のコレクション API のように、要素を切り出す Enumeratee を生成できる。
ただし切り出し位置は Enumerator から送信されるチャンク単位になる。
val it = Iteratee.fold[Array[Byte], String]("") { (acc, x) =>
acc ++ x.map(_.toChar).mkString("")
}
val enumerator = Enumerator(
"123".getBytes(),
"456".getBytes(),
"789".getBytes()
)
def limitChunks(n: Int) = {
Enumeratee.take[Array[Byte]](n)
}
(enumerator |>>> limitChunks(2) &>> it)
.onComplete(println) // Success("123456")
入力が scala.collection.TraversableLike
を含んでいれば、play.api.iteratee.Traversable
を使うことで、TraversableLike
の実装に応じて切り出し位置を決定する。つまり Array[Byte]
なら、配列インデックスでカウントされる。
def limitBytes(n: Int) = {
Traversable.take[Array[Byte]](n)
}
(enumerator |>>> limitBytes(5) &>> it)
.onComplete(println) // Success("12345")