Java SE 8ストリームを使用したデータの処理、パート1

Raoul-Gabriel Urma

ストリーム運用を使用して高度なデータ処理クエリを表現します。

コレクションなしで何をするのでしょうか。ほぼすべてのJavaアプリケーションが、コレクションを作成および処理します。コレクションは多くのプログラミング作業にとって基本的なもので、データをグループ化して処理することができます。たとえば、顧客の取引明細書を表す銀行取引のコレクションを作成できます。次に、回収全体を処理して、顧客が費やした金額を調べることもできます。その重要性にもかかわらず、Javaではコレクションの処理は完璧とは言えません。

  • 原本は、Java Magazineの2014年3月/4月号に掲載されました。今すぐ購読する

まず、コレクションに対する一般的な処理パターンは、「finding」(たとえば、最も価値の高いトランザクションを見つける)や「grouping」(たとえば、食料品の買い物に関連するすべてのトランザクションをグループ化する)といったSQLのような操作に似ています。ほとんどのデータベースでは、このような操作を宣言的に指定できます。たとえば、次のSQLクエリでは、"SELECT id, MAX(value) from transactions"という値が最も大きいトランザクションIDを検索できます。

おわかりのように、最大値を計算する方法を導入する必要はありません(たとえば、ループや最大値を追跡する変数を使用する)。期待しているものだけを表現すればよいのです。この基本的な考え方は、このようなクエリの明示的な実装方法について心配する必要がないことを意味します。なぜコレクションで同じようなことができないのでしょうか。ループを使った操作を何度も何度も再実装していないでしょうか。

では、どうすれば本当に大きなコレクションを効率的に処理できるのでしょうか。理想的には、処理を高速化するには、マルチコア・アーキテクチャを活用する必要があります。ただし、パラレル・コードの記述は困難でエラーが発生しやすくなります。

Java SE 8で解決Java API設計者は、宣言的な方法でデータを処理できるストリームと呼ばれる新しい抽象化を使用してAPIを更新しています。さらに、ストリームは、マルチスレッド・コードの単一行を記述しなくても、マルチコア・アーキテクチャを利用できます。良さそうですよね。この一連の記事では、その点を探っていきます。

驚くべき考えがあります。この2つの操作は、要素を「永遠に」作り出すことができるのです。

ストリームで何ができるかを詳しく調べる前に、Java SE 8ストリームで新しいプログラミング・スタイルを理解できるように、例を見てみましょう。たとえば、groceryタイプのすべてのトランザクションを検索し、トランザクション値の降順でソートされたトランザクションIDのリストを返す必要があるとします。Java SE 7では、リスト1に示すようにこれを行います。Java SE 8では、リスト2に示すように実行します。

List<Transaction> groceryTransactions = new Arraylist<>();
for(Transaction t: transactions){
  if(t.getType() == Transaction.GROCERY){
    groceryTransactions.add(t);
  }
}
Collections.sort(groceryTransactions, new Comparator(){
  public int compare(Transaction t1, Transaction t2){
    return t2.getValue().compareTo(t1.getValue());
  }
});
List<Integer> transactionIds = new ArrayList<>();
for(Transaction t: groceryTransactions){
  transactionsIds.add(t.getId());
}

リスト1

List<Integer> transactionsIds = 
    transactions.stream()
                .filter(t -> t.getType() == Transaction.GROCERY)
                .sorted(comparing(Transaction::getValue).reversed())
                .map(Transaction::getId)
                .collect(toList());

リスト2

図1に、Java SE 8のコードを示します。最初に、Listで使用可能なstream()メソッドを使用して、トランザクションのリスト(データ)からストリームを取得します。次に、複数の操作(filtersortedmapcollect)が連鎖してパイプラインを形成します。これは、データに対する問合せを形成しているとみなすことができます。

ストリーム-f1
図1

では、コードを並列化する方法は?Java SE 8では簡単です。リスト3に示すように、stream()parallel Stream()に置き換えるだけで、Streams APIは問合せを内部的に分解してコンピュータ上の複数のコアを活用できます。

List<Integer> transactionsIds = 
    transactions.parallelStream()
                .filter(t -> t.getType() == Transaction.GROCERY)
                .sorted(comparing(Transaction::getValue).reversed())
                .map(Transaction::getId)
                .collect(toList());

リスト3

このコードに少々圧倒されても心配無用です。これがどのように機能するかについては、次のセクションで説明します。ただし、ラムダ式(たとえば、t-> t.getCategory() == Transaction.GROCERY)およびメソッド参照(たとえば、Transaction::getId)が使用されていることに注意します。(ラムダ式について知識を深めるには、Java Magazineの過去の記事や、この記事のエンドツーエンドの他のリソースを参照してください。)

現在は、ストリームを、データ・コレクションに対する効率的なSQLのような操作を表現するための抽象化として確認できます。また、これらの操作はラムダ式で簡潔にパラメータ化できます。

Java SE 8ストリームに関するこの一連の記事の最後に、Streams APIを使用してリスト3のようなコードを記述し、強力なクエリを表現できます。

ここから始めよう

少し理論から始めましょう。ストリームはどのように定義できるでしょうか。簡単に定義すると、「集計操作をサポートする、ソースからの要素のシーケンス」です。それを分解してみましょう:

  • 要素のシーケンス: ストリームは、固有の要素タイプの値のシーケンス・セットへのインターフェイスを提供します。ただし、ストリームには実際にはエレメントは格納されず、オンデマンドで計算されます。
  • ソース:ストリームは、コレクション、配列、I/Oリソースなど、データ・ソースを提供します。
  • 集計操作: Streamsは、filtermapreducefindmatchsortedなどの機能プログラミング言語からのSQLのような操作および共通操作をサポートします。

さらに、ストリーム操作には、収集操作と大きく異なる2つの基本的な特性があります。

  • パイプライン化:多くのストリーム操作は、ストリーム自体を返します。これにより、操作を連鎖させて、より大きなパイプラインにできます。これにより、lazinessshort-circuitingなどの特定の最適化が可能になります。この最適化については、あとで説明があります。
  • 内部反復:明示的に反復されるコレクション(外部反復)とは対照的に、ストリーム操作はバックグラウンドで反復を実行します。

前述のコード例を見直して、これらのアイデアを説明します。図2に、リスト2の詳細を示します。

ストリーム-f2
図2

最初に、stream()メソッドを呼び出して、トランザクションのリストからストリームを取得します。データソースはトランザクションのリストであり、ストリームに一連の要素を提供します。次に、ストリームに一連の集計操作を適用します。filter (述語が指定された要素をフィルタリング)、sorted (比較対象を指定して要素をソート)、およびmap (情報の抽出)です。collect以外のすべての操作では、Streamが返されるため、これらを連結してソースに対するクエリと見なすことができるパイプラインを形成することができます。

collectが呼び出されるまで、実際には作業は行われません。collect操作によってパイプラインの処理が開始され、結果(Streamではないもの、ここではList)が返されます。collectについては、この先詳しく説明しますので、今は気にしないでください。現時点では、collectストリームの要素を要約結果に蓄積するためのさまざまなレシピを引数に取る操作として見ることができます。ここで、toList()は、StreamListに変換するためのレシピを示します。

ストリームで使用可能な様々なメソッドを調べる前に、ストリームとコレクションの概念上の違いを一時停止して反映することをお薦めします。

Streamsとコレクション

コレクションの既存のJava概念とストリームの新しい概念の両方が、一連の要素へのインタフェースを提供します。では、その違いは何でしょうか。要するに、コレクションはデータに関するものであり、ストリームは計算に関するものです。

DVDに保存されている映画を考えてみましょう。これはデータ構造全体を含むので、(おそらくバイトかフレームの、ここではどちらでも関係ない)コンテナです。ここで、同じ動画がインターネットでストリーミング配信されている場合を考えてみましょう。これは(バイトまたはフレームの)ストリームになります。ストリーミング動画プレーヤーは、ユーザーが視聴している場所の数フレーム先までしかダウンロードしていない必要があるため、ストリーム内のほとんどの値が計算される前に、ストリームの先頭から値を表示し始めることができます(サッカーの試合のライブストリーミングを考えてみましょう)。

最もわかりやすい用語では、コレクションとストリームの違いは、計算時に関係します。コレクションはインメモリデータ構造で、データ構造が現在持つすべての値を保持します。コレクションに追加する前に、コレクション内のすべての要素を計算する必要があります。一方、ストリームは、要素がオンデマンドで計算される概念的に固定されたデータ構造です。

Collectionインタフェースを使用するには、ユーザーが反復処理を実行する必要があります(たとえば、foreachという拡張forループを使用します)。これは外部反復と呼ばれます。

対照的に、Streamsライブラリでは内部反復が使用され、反復処理が実行され、結果のストリーム値がどこかに保存されます。実行する処理を指定する関数を提供するだけです。リスト4のコード(コレクションでの外部反復)とリスト5(ストリームでの内部反復)は、この違いを示しています。

List<String> transactionIds = new ArrayList<>(); 
for(Transaction t: transactions){
    transactionIds.add(t.getId()); 
}

リスト4

List<Integer> transactionIds = 
    transactions.stream()
                .map(Transaction::getId)
                .collect(toList());

リスト5

リスト4では、トランザクションのリストを明示的に反復して各トランザクションIDを抽出し、それを累計に追加します。対照的に、ストリームを使用する場合、明示的な反復はありません。リスト5のコードでは問合せが作成され、map操作はトランザクションIDを抽出するためにパラメータ化され、collect操作は結果のStreamListに変換します。

ストリームとは何か、およびストリームで何ができるかをよく理解する必要があります。ストリームでサポートされている様々な操作を見て、独自のデータ処理問合せを表現できるようにします。

ストリーム操作: ストリームの展開によるデータの処理

java.util .stream.StreamStreamインタフェースは、2つのカテゴリにグループ化できる多くの操作を定義します。図1の例では、次の操作を確認できます。

  • filtersortedおよびmap。パイプラインを形成するために接続できます。
  • collect: パイプラインを閉じ、結果を返しました

接続可能なストリーム操作は、中間操作と呼ばれます。戻り型がStreamであるため、これらをまとめて接続できます。ストリーム・パイプラインをクローズする操作は、ターミナル操作と呼ばれます。ListIntegervoid (Stream以外の任意の型)などのパイプラインから結果が生成されます。

なぜこの区別が重要なのか、不思議に思われるかもしれません。中間操作は、ストリーム・パイプラインで終端操作が呼び出されるまで処理を実行しません。これは遅延です。これは、通常、中間操作をマージして、端末操作によって単一のパスに処理できるためです。

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
List<Integer> twoEvenSquares = 
    numbers.stream()
           .filter(n -> {
                    System.out.println("filtering " + n); 
                    return n % 2 == 0;
                  })
           .map(n -> {
                    System.out.println("mapping " + n);
                    return n * n;
                  })
           .limit(2)
           .collect(toList());

リスト6

たとえば、リスト6のコードで、特定の数値リストから2つの偶数正方形を計算するとします。次のように表示されることに当然ながら驚かれることでしょう。

filtering 1
filtering 2
mapping 2
filtering 3
filtering 4
mapping 4

これは、limit(2)short-circuitingを使用しているためです。結果を返すには、ストリームの一部のみを処理する必要があり、すべてを処理する必要はありません。これは、and演算子で連鎖した大規模なブール式の評価に似ています。1つの式がfalseを返すとすぐに、すべての式を評価せずに式全体がfalseであると推測できます。ここで、操作limitはサイズ2のストリームを返します。

また、操作filtermapは同じパスでマージされています。

これまでに学習した内容を要約するには、一般的にストリームを操作するには、次の3つのことが必要です。

  • クエリを実行するデータソース(コレクションなど)
  • ストリーム・パイプラインを形成する中間操作のチェーン
  • 1つの端末操作。ストリーム・パイプラインを実行し、結果を生成します。

ストリームAPIは、クエリを内部的に分解して、コンピュータ上の複数のコアを活用します。

それでは、ストリームで利用可能な操作をいくつか見てみましょう。その他の例については、完全なリストおよびこの記事の最後にあるリソースについては、java.util .stream.Streamインタフェースを参照してください。

フィルタリング。ストリームから要素をフィルタするために使用できる操作はいくつかあります。

  • filter(Predicate): 引数として述語(java.util.function.Predicate)を取り、与えられた述語にマッチする全ての要素を含むストリームを返します。
  • distinct: 一意な要素を含むストリームを返しますequals(ストリーム要素に対するequalsの実装に従います)。
  • limit(n): 指定されたサイズn以下のストリームを返します
  • skip(n): 最初のn個の要素が破棄されたストリームを返します

検索とマッチング一般的なデータ処理パターンは、一部の要素が特定のプロパティと一致するかどうかを決定することです。これを支援するために、anyMatchallMatchおよびnoneMatchの各操作を使用できます。これらはすべて、引数として述語を取得し、結果としてbooleanを返します(つまり、ターミナル操作です)。たとえば、リスト7に示すように、allMatchを使用して、トランザクションのストリーム内のすべての要素の値が100より大きいことを確認できます。

boolean expensive =
    transactions.stream()
                .allMatch(t -> t.getValue() > 100);

リスト7

また、Streamインタフェースは、ストリームから任意の要素を取得するための操作findFirstおよびfindAnyを提供します。これらは、filterなどの他のストリーム操作と組み合わせて使用できます。リスト8で示すように、findFirstfindAnyの両方がOptionalオブジェクトを返します。

Optional<Transaction> = 
    transactions.stream()
                .filter(t -> t.getType() == Transaction.GROCERY)
                .findAny();

リスト8

Optional<T>クラス(java.util .Optional)は、値の有無を表すコンテナ・クラスです。リスト8では、findAnygrocery型のトランザクションを検出しない可能性があります。Optionalクラスには、要素の存在をテストするための複数のメソッドが含まれています。たとえば、トランザクションが存在する場合、リスト9(ここでは単にトランザクションを表示します)で示されるように、ifPresentメソッドを使用することで、オプションのオブジェクトに操作を適用することを選択できます。

transactions.stream()
              .filter(t -> t.getType() == Transaction.GROCERY)
              .findAny()
              .ifPresent(System.out::println);

リスト9

マッピング。ストリームはメソッドmapをサポートしており、関数(java.util.function.Function)を引数に取り、ストリームの要素を別の形式に投影します。関数は各要素に適用され、新しい要素にマッピングされます。

たとえば、ストリームの各要素から情報を抽出するために使用できます。リスト10の例では、リストから各単語の長さのリストを返します。削減これまでに見た端末操作は、boolean (allMatchなど)、void (forEach)またはOptionalオブジェクト(findAnyなど)を返します。また、collectを使用して、Stream内のすべての要素をListに結合しています。

List<String> words = Arrays.asList("Oracle", "Java", "Magazine");
 List<Integer> wordLengths = 
    words.stream()
         .map(String::length)
         .collect(toList());

リスト10

ただし、ストリーム内の全てのエレメントを組み合わせて、"IDが最も高いトランザクションとは、"全てのトランザクションの値の合計を計算"など、より複雑なプロセス・クエリを作成することもできます。これは、ストリームに対してreduce操作を使用することで可能であり、結果が生成されるまで、各要素に対して操作を繰り返し適用します(たとえば、2つの数値を追加します)。関数型プログラミングではフォールド操作と呼ばれることがよくあります。これは、この操作を、折り返し操作の結果である1つの小さな正方形になるまで、繰り返し長い用紙(ストリーム)として繰り返すことができるためです。

これは、まず、forループを使用してリストの合計を計算する方法を調べるのに役立ちます。

int sum = 0;
for (int x : numbers) {
    sum += x; 
}

数値のリストの各要素は、加算演算子を使用して反復的に結合され、結果が生成されます。基本的に、数字のリストを1つの数字に減らしました。このコードには2つのパラメータがあります。sum変数の初期値(この場合は0)と、リストのすべての要素(この場合は+)を組み合せる操作です。

ストリームでreduceメソッドを使用すると、リスト11に示すように、ストリームのすべての要素を合計できます。reduceメソッドは、次の2つの引数を取ります。

int sum = numbers.stream().reduce(0, (a, b) -> a + b);

リスト11 

  • 初期値(ここでは0)
  • 2つの要素を組み合せて新しい値を生成するBinaryOperator<T>

reduceメソッドは、基本的に繰り返されるアプリケーションのパターンを抽象化します。「製品の計算」や「最大値の計算」などの他の問合せ(リスト12を参照)は、reduceメソッドの特殊なユースケースになります。

int product = numbers.stream().reduce(1, (a, b) -> a * b);
int product = numbers.stream().reduce(1, Integer::max);

リスト12

数値ストリーム

reduceメソッドを使用して整数のストリームの合計を計算できることがわかりました。ただし、多くのボクシング操作を実行して、Integerオブジェクトを繰り返し追加するコストが発生します。リスト13に示すように、sumメソッドを呼び出して、コードの意図をより明確にすることは適切ではないでしょうか。

int statement = 
    transactions.stream()
                .map(Transaction::getValue)
                .sum(); // error since Stream has no sum method

リスト13

Java SE 8では、この問題(IntStreamDoubleStreamおよびLongStream)に取り組むための3つのプリミティブ専用ストリーム・インタフェースが導入され、それぞれストリームの要素をintdoubleおよびlongに特殊化しています。

ストリームを特殊なバージョンに変換するために使用する最も一般的なメソッドは、mapToIntmapToDoubleおよびmapToLongです。これらのメソッドは、前に見たメソッドmapとまったく同じように機能しますが、Stream<T>のかわりに特殊なストリームを返します。たとえば、リスト14に示すように、リスト13のコードを改善できます。boxed操作を使用して、プリミティブ・ストリームからオブジェクトのストリームに変換することもできます。

int statementSum = 
    transactions.stream()
                .mapToInt(Transaction::getValue)
                .sum(); // works!

リスト14

最後に、数値ストリームのもう1つの有用な形式は数値範囲です。たとえば、1から100までのすべての数値を生成できます。Java SE 8では、IntStreamDoubleStreamおよびLongStreamで使用可能な2つの静的メソッドが導入され、このような範囲(rangeおよびrangeClosed)の生成を支援します。

どちらのメソッドも、最初のパラメータとして範囲の開始値を、2番目のパラメータとして範囲の終了値を受け取ります。ただし、rangeは排他的ですが、rangeClosedは包含的です。リスト15は、rangeClosedを使用して、10から30までのすべての奇数のストリームを返す例です。

IntStream oddNumbers = 
    IntStream.rangeClosed(10, 30)
             .filter(n -> n % 2 == 1);

リスト15

ストリームの作成

ストリームを作成するには、いくつかの方法があります。コレクションからストリームを取得する方法はおわかりいただけたと思います。さらに、数値のストリームも使いました。値、配列またはファイルからストリームを作成することもできます。また、関数からストリームを生成して無限のストリームを生成することもできます。

値または配列からのストリームの作成は簡単です。リスト16に示すように、値には静的メソッドStream .of、配列にはArrays.streamを使用します。

Stream<Integer> numbersFromValues = Stream.of(1, 2, 3, 4);
int[] numbers = {1, 2, 3, 4};
IntStream numbersFromArray = Arrays.stream(numbers);

リスト16

明示的に反復されるコレクション(外部反復)とは対照的に、ストリーム操作はバックグラウンドで反復を実行します。

Files.lines静的メソッドを使用して、行のストリーム内のファイルを変換することもできます。たとえば、リスト17では、ファイル内の行数をカウントします。

long numberOfLines = 
    Files.lines(Paths.get(“yourFile.txt”), Charset.defaultCharset())
         .count();

リスト17

無限ストリーム最後に、ストリームに関するこの最初の記事を締めくくる前に、素晴らしいアイデアをご紹介しましょう。ここまでで、ストリームの要素はオンデマンドで生成されることを理解したはずです。(Stream.iterateおよびStream .generate)という2つの静的メソッドがあり、関数からストリームを生成できます。しかし、要素はオンデマンドで計算されるため、これら2つの操作は要素を「永遠に」生成することができます。これは無限ストリームと呼ばれるもので、固定されたコレクションからストリームを作成するときのように、固定されたサイズを持たないストリームです。

リスト18は、iterateを使用して、10の倍数であるすべての数値のストリームを作成する例です。The iterateメソッドは初期値(ここでは 0)とラムダ(UnaryOperator<T>型)を受け取り、生成された新しい値に対して連続して適用します。

Stream<Integer> number = Stream.iterate(0, n -> n + 10);

リスト18

limit操作を使用して、無限ストリームを固定サイズのストリームに変換できます。たとえば、リスト19に示すように、ストリームのサイズを5に制限できます。

numbers.limit(5).forEach(System.out::println); // 0、10、20、30、40

リスト19

結論

Java SE 8では、高度なデータ処理クエリを表現できるStreams APIが導入されています。この記事では、ストリームがfiltermapreduceiterateなど、簡潔で表現的なデータ処理クエリを記述するために結合できる多くの操作をサポートしていることがわかりました。この新しいコードの記述方法は、Java SE 8より前のコレクションの処理方法とは大きく異なります。しかし、それには多くのメリットがあります。第1に、Streams APIでは、遅延や短絡などのいくつかの手法を使用してデータ処理クエリを最適化します。次に、ストリームを自動的にパラレル化して、マルチコア・アーキテクチャを活用できます。このシリーズの次の記事では、flatMapcollectなど、より高度な操作について説明します。ご期待ください

Raoul-Gabriel Urma
Raoul-Gabriel Urmaは現在、ケンブリッジ大学のコンピュータ・サイエンスでPhDを完了しており、そこでプログラミング言語の研究を行っています。また、Java 8 in Action: Lambdas、 Streams and Functional-style Programming (Manning、 2014)の著者でもあります。