Iteratees

Overview

play.api.libs.iteratee.Iteratee[E, +A] は入力から出力を組み立てる Consumer となる。単に入力 E に対して、どのように出力 A を組み立てるかのみを定義する。

ヘルパーメゾッド Iteratee.fold[E, A] により、アキュームレータ A と入力 E から、出力 A を返す関数を指定するだけで Iteratee[E, A] を実装できる。

val it: Iteratee[String, Int] = Iteratee.fold[String, Int](0) { (acc, x) =>
  acc + x.toInt
}

play.api.libs.iteratee.Enumerator[E]Iteratee に入力を与える Producer となる。

val result: Future[Int] = Enumerator("1", "2", "3") |>>> it
result.onComplete(println) // Success(6) ... 1 + 2 + 3

Iteratee

Iteratee は、一つのインスタンスで連続する入力を処理する(ループ等を繰り返す)のではない。ステップ毎に新たな Iteratee を生成して引き継いでいく。

Input

play.api.libs.iteratee.InputIteratee に送る入力を表す。

trait Input[+E]

object Input {
  case class El[+E](e: E) extends Input[E]
  case object Empty extends Input[Nothing]
  case object EOF extends Input[Nothing]
}
  • Input.El(e) は入力があることを示す: Iteratee は入力を出力に変換し、次の Iteratee に引き継ぐ。
  • Input.Empty は入力がないことを示す: Iteratee は何もせずに、次の Iteratee に引き継ぐ。
  • Input.EOF は入力の終端を示す: Iteratee は処理結果を返す。

DoneIteratee

play.api.libs.iteratee.Done[E, A] により、どのような入力があっても固定の結果を返す Iteratee を生成できる。Input.EOF が送られた時に、最終の出力結果を返すために用いる。

val doneIt = Done[String, Int](123, Input.Empty)
// It will print Success(123) regardless of the type of input.
Iteratee.flatten(doneIt.feed(Input.EOF)).run.onComplete(println)
Iteratee.flatten(doneIt.feed(Input.Empty)).run.onComplete(println)
Iteratee.flatten(doneIt.feed(Input.El("345"))).run.onComplete(println)

ContIteratee

play.api.libs.iteratee.Cont[E, A]) により、継続する Iteratee を生成できる。入力に応じた次の Iteratee を返す関数 Input[E] => Iteratee[E, A]apply メゾッドに渡せばよい。

def step(acc: Int)(in: Input[String]): Iteratee[String, Int] = in match {
  // Add the number converted from a string to the state
  case Input.El(e: String) => Cont(step(acc + e.toInt))
  // Keep the state
  case Input.Empty => Cont(step(acc))
  // Done iteration
  case Input.EOF => Done(acc, Input.EOF)
}

val contIt = Cont[String, Int](step(0))
Iteratee.flatten(contIt.feed(Input.EOF)).run
  .onComplete(a => assert(Success(0) === a))
Iteratee.flatten(contIt.feed(Input.El("123"))).run
  .onComplete(a => assert(Success(123) === a))

(for {
  it1 <- contIt.feed(Input.El("12"))
  it2 <- it1.feed(Input.El("34"))
  it3 <- it2.feed(Input.Empty)
  it4 <- it3.feed(Input.El("56"))
  a <- it4.run
} yield a).onComplete(println) // Success(102) ... 12 + 34 + 56

ErrorIteratee

play.api.libs.iteratee.Error[E] により、エラーを返す Iteratee を生成できる。Done と同様に、この Iteratee に引き継がれた場合は、以降の入力は無視される。

def step(acc: Int)(in: Input[String]): Iteratee[String, Int] = in match {
  case Input.El(e: String) =>
    if (!e.isEmpty) Cont(step(acc + e.toInt))
    else Error("empty string", Input.Empty)
  case Input.Empty => Cont(step(acc))
  case Input.EOF => Done(acc, Input.EOF)
}

(for {
  it1 <- errorIt.feed(Input.El("12"))
  it2 <- it1.feed(Input.El(""))
  // The following feed will be ignored because it just
  // returns Error(msg, e) regardless of the type of input.
  it3 <- it2.feed(Input.El("56"))
  a <- it3.run
} yield a).onComplete(println) // Failure(java.lang.RuntimeException: empty string)

Step

独自の Iteratee を作成するには、Iteratee#fold メゾッドを実装する。

def fold[B](folder: Step[E, A] => Future[B])
           (implicit ec: ExecutionContext) : Future[B]

fold メゾッドにより、folder 関数を通じて Iteratee は自身がどのステップ play.api.libs.iteratee.Step であるかを伝えて処理結果を得る。

trait Step[E, +A]

object Step {
  case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
  case class Cont[E, +A](k: Input[E] => Iteratee[A, E]) extends Step[E, A]
  case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]
}

folder 関数はどのようなものかは、Iteratee#run の実装を参考にするとよい。 ContIteratee であれば、Input.EOF を送って処理結果を得ている。

def run: Future[A] = fold({
  case Step.Done(a, _) => Future.successful(a)
  case Step.Cont(k) => k(Input.EOF).fold({
    case Step.Done(a1, _) => Future.successful(a1)
    case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF")
    case Step.Error(msg, e) => sys.error(msg)
  })(dec)
  case Step.Error(msg, e) => sys.error(msg)
})(dec)

Done / Cont / Error の各 Iterateefold の実装は、以下と同等である。

val k(in: Step): Iteratee[String, Int] = in match {
  ...
}
// val contIteratee = Cont[String, Int](k)
val contIteratee = new Iteratee[String, Int] {
  def fold[B](folder: Step[String,Int] => Future[B])
             (implicit ec: ExecutionContext) : Future[B] =
    folder(Step.Cont(k))
}

// val doneIteratee = Done[String, Int](1, Input.Empty)
val doneIteratee = new Iteratee[String, Int] {
  def fold[B](folder: Step[String,Int] => Future[B])
             (implicit ec: ExecutionContext) : Future[B] =
    folder(Step.Done(1, Input.Empty))
}

// val errorIteratee = Error[String]("something wrong", Input.Empty)
val errorIteratee = new Iteratee[String, Int] {
  def fold[B](folder: Step[String,Int] => Future[B])
             (implicit ec: ExecutionContext) : Future[B] =
    folder(Step.Error("something wrong", Input.Empty)
}

Helper Methods

consume

val it: Iteratee[String, String] = Iteratee.consume[String]()
val result: Future[String] = Enumerator("foo", "bar", "baz") |>>> it
result.onComplete(println) // foobarbaz

foreach

val it: Iteratee[String, Unit] = Iteratee.foreach[String](prinln)
Enumerator("foo", "bar", "baz") |>>> it
// foo
// bar
// baz

flatten

継続する Iteratee は、遅延評価で非同期に得るため Future[Iteratee[E, A]] となる。Iteratee#runInput.EOF を送るには、flatMapfor-comprehension を介して行なう必要がある。

val it: Iteratee[String, String] = Iteratee.consume[String]()
val newIt: Future[Iteratee[String, String]] = Enumerator("foo", "bar") |>> it
val result: Future[String] = newIt.flatMap(_.run)

Iteratee.flattenIteratee#fold 内部で flatMap を行う Iteratee を生成する。あたかも初回の Iteratee のように振る舞う。

val futureIt: Iteratee[String, String] = Iteratee.flatten(newIt)
val result: Future[String] = futureIt.run

Enumerator

EnumeratorIteratee に送る入力ストリームを生成する。

val enumerator1: Enumerator[String] = Enumerator("foo", "bar")
val enumerator2: Enumerator[String] = Enumerator("baz", "qux")
val enumerator = enumerator1.andThen(enumerator2)

val it: Iteratee[String, String] = Iteratee.consume[String]()
val newIt: Future[Iteratee[String, String]] = enumerator(it)

val result: Future[String] = Iteratee.flatten(newIt).run
result.onComplete(println) // foobarbazqux

>>> (andThen)

Enumerator#andThenEnumerator を連結することができる。エイリアスとして Enumerator#>>> が提供されている。

val enumerator1: Enumerator[String] = Enumerator("foo", "bar")
val enumerator2: Enumerator[String] = Enumerator("baz", "qux")
//val enumerator = enumerator1.andThen(enumerator2)
val enumerator = enumerator1 >>> enumerator2

|>> (apply)

Enumerator#applyIteratee を渡す事で、Future[Iteratee[E, A]] が得られる。Iteratee#feed は内部的には同等のことを行なっている。エイリアスとして Enumerator#|>> が提供されている。

val it: Iteratee[String, String] = Iteratee.consume[String]()
val enumerator: Enumerator[String] = Enumerator("Foo", "Bar", "Baz")
//val newIt: Future[Iteratee[String, String]] = enumerator(it)
val newIt: Future[Iteratee[String, String]] = enumerator |>> it

|>>> (run)

Enumerator#|>>> により、入力ストリームの送信後に Input.EOF を送信して処理結果を得ることができる。

val it: Iteratee[String, String] = Iteratee.consume[String]()
val enumerator: Enumerator[String] = Enumerator("Foo", "Bar", "Baz")
//val result: Future[String] = enumerator(it).flatMap(_.run)
val result: Future[String] = enumerator |>>> it

Future[Iteratee[E, A]] のまま flatMapIteratee#run を送って Future[B] を得るので、Iteratee に置き換える Iteratee.flatten とは異なる。

val newIt: Future[Iteratee[String, String]] = enumerator |>> it
val futureIt: Iteratee[String, String] = Iteratee.flatten(newIt)
val result: Future[String] = futureIt.run

Helper Methods

repeatM / generateM

Enumerator は、無限の入力ストリームを扱うことができる。Enumerator.repeatMFuture[E] を返す関数を渡すことで、無限に反復実行される。

import play.api.libs.concurrent.Promise
...
val dateEnumerator: Enumerator[Date] = Enumerator.repeatM {
  Promise.timeout(new Date(), 1.seconds)
}

Enumerator.generateM では、Future[Option[E]] を返す関数を渡すことで、None の場合に反復実行を停止する。

val endOfTime = System.currentTimeMillis() + 3000L
val dateEnumerator: Enumerator[Date] = Enumerator.generateM {
  Promise.timeout({
    if (System.currentTimeMillis() < endOfTime) Some(new Date())
    else None
  }, 1.seconds)
}

fromStream / fromFile

Enumerator.fromStream では java.io.InputStream を入力ソースとすることができる。内部的には Enumerator.generateM を用いており、読み込み中に Some[Array[Byte]] を返し、読み込み完了後に None を返している。

val streamEnumerator: Enumerator[Array[Byte]] = {
  Enumerator.fromStream(new FileInputStream(new File("/path/to/file")))
}
// or use Enumerator.fromFile
val fileEnumerator: Enumerator[Array[Byte]] = {
  Enumerator.fromFile(new File("/path/to/file"))
}

Enumeratee

plap.api.libs.iteratee.Enumeratee[From, To] により、ストリームデータを変換をすることができる。

ヘルパーメゾッド Enumeratee.map に変換関数を渡せば Enumeratee を生成できる。

val byteToHexStr: Enumeratee[Byte, String] = Enumeratee.map[Byte] { b =>
  "%02X".format(b)
}

&>> (transform)

Enumeratee#transform により、前段に変換を加えた Iteratee を生成できる。エイリアスとして Enumeratee#&>> が提供されている。

val consume: Iteratee[String, String] = Iteratee.consume[String]()
//val it = byteToHexStr.transform(consume)
val it: Iteratee[Byte, String] = byteToHexStr &>> consume

&> (through)

EnumerateeEnumerator に対しても適用できる。Enumerater#through により、後段に変換を加えた Enumerator を生成できる。エイリアスとして Enumerator#&> が提供されている。

// Make sure that either "&>" or "through" is defined
// on Enumerator, not on Enumeratee.

val enumerator = Enumerator("Hello".getBytes())
//val hexStrEnumerator = enumerator.through(byteHexStr)
val hexStrEnumerator: Enumerator[Byte] = enumerator &> byteToHexStr

apply

元の Iteratee がすでに EOF を受けて完了していた場合、Enumeratee#transformIteratee を変換したところで、その後の入力は破棄されてしまう。

val sum = Iteratee.fold[Int, Int](0) { (acc, x) =>
  acc + x
}
val strToInt = Enumeratee.map[String](_.toInt)

val doneIt = Iteratee.flatten(Enumerator(1, 2) >>> Enumerator.eof |>> sum)
// The iteratee doneIt has been done,
Iteratee.isDoneOrError(doneIt).onComplete(println) // Success(true)
val transformedIt = strToInt &>> doneIt
// so any inputs after that will be ignored.
(Enumerator("3", "4", "5") |>>> transformedIt).onComplete(println) // Success(3)

Enumeratee#apply は、変換前の Iteratee を出力とする Iteratee を生成する。

// The method apply returns Iteratee[String, Iteratee[Int, Int]],
val adaptedIt: Iteratee[String, Iteratee[Int, Int]] = strToInt(sum)
// so we can get the original iteratee after the adaptedIt is done.
val originalIt: Interatee[Int, Int] = Iteratee.flatten(
    Enumerator("1", "2") |>>> adaptedIt)
// The original iteratee has not been done yet because it's just
// an output of the adaptedIt.
Iteratee.isDoneOrError(originalIt).onComplete(println) // Success(false)
(Enumerator(3, 4, 5) |>>> originalIt).onComplete(println) // Success(15)

変換した IterateeEOF で完了させた後でも、出力は変換元の Iteratee であるので、入力を継続できる。すなわち Enumeratee を切り替えながら、異なる Enumerator からの入力を Iteratee にまとめることができる。

Traversable

Enumeratee.(take|drop|takeWhile|dropWhile|...) 等のヘルパーメゾッドは、他のコレクション API のように、要素を切り出す Enumeratee を生成できる。

ただし切り出し位置は Enumerator から送信されるチャンク単位になる。

val it = Iteratee.fold[Array[Byte], String]("") { (acc, x) =>
  acc ++ x.map(_.toChar).mkString("")
}

val enumerator = Enumerator(
  "123".getBytes(),
  "456".getBytes(),
  "789".getBytes()
)

def limitChunks(n: Int) = {
  Enumeratee.take[Array[Byte]](n)
}
(enumerator |>>> limitChunks(2) &>> it)
  .onComplete(println) // Success("123456")

入力が scala.collection.TraversableLike を含んでいれば、play.api.iteratee.Traversable を使うことで、TraversableLike の実装に応じて切り出し位置を決定する。つまり Array[Byte] なら、配列インデックスでカウントされる。

def limitBytes(n: Int) = {
  Traversable.take[Array[Byte]](n)
}
(enumerator |>>> limitBytes(5) &>> it)
  .onComplete(println) // Success("12345")