Java: 使用Lambda表达式与Stream API

对于习惯使用函数式编程语言(比如Clojure)的程序员来说,Java 8提供的Lambda表达式及Stream API无疑是个福音。虽说仍缺少immutable数据,也没有简单的async支持,但想对于早年的Java已经算是有很大的进步了。本文从Modern Java Recipes一书中摘取部分要点介绍。

Lambda表达式

Lambda表达式本质上是Java的匿名类的对象。下面是一些创建/声明Lambda表达式的方法:

# 完整形式的Lambda表达式定义
Function<Long, Long> add2 = (x) -> {return x +2;};

# 函数体只有一个语句时可以简写为
Function<Long, Long> add2 = x -> x +2;

# 带两个参数的函数
BiFunction<Long, Long, Long> a = (x, y) -> x + y;

# 无返回值的函数
Consumer<Object> printer = x -> System.out.println("--------\n" + x);
printer.accept(add2.andThen(time3).apply(8L));

# 对象方法做为Lambda表达式使用, String::length等同于 s -> s.length()
# 注意用了`var`关键字作了类型推导
var compLen = Comparator.comparingInt(String::length);

# 类方法做为Lambda表达式使用,System.out::println 等同于 s -> System.out::println(s)
Stream.generate(Math::random).limit(10).forEach(System.out::println);

从上面的例子可见,Lambda表达式实现了Function, BiFunction或Consumer等接口。 这些接口的相同点是它们都使用了@FunctionInterface标注。

自定义Lambda表达式接口十分简单,比如定义可带多个参数的Lambda表达式接口

Lambda表达式在Stream API中使用很广泛,使用时我们很少关注表达式的对象类型,但明确一个Lambda表达式即为一个Java对象这一点十分重要。

组合多个函数

你可以把多个Lambda表达式通过andThen, compose等方法组合为一个:

// 在App.Java里:
private static void timedPrinter(Object o) {
        System.out.println(LocalTime.now() +" " + o);
}

// 在main函数:
// ...
Consumer<Object> printer = x -> System.out.println("-------------------------\n" + x);
printer = printer.andThen(App::timedPrinter);
printer.accept("这是带时间的打印输出");
// ...

捕获异常

正是因为Lambda表达式是匿名对象,实现Lambda表达式时必须要在方法体类捕获异常。

  • 对于Checked Exception,编译器会强制你在Lambda表达式内捕获异常;
  • 对于需要处理的Runtime Exception,如果不做异常捕获处理,在出错时你可能看不到任何错误信息。

Stream API

Stream的几个特点

  1. 延迟运算,利用这点可以生成无限序列
  2. 一个Stream对象只能被使用一次
  3. 可转换为并行运算

创建Stream

你可以使用Stream接口的一些静态方法创建Stream对象,比如of(), generate(), iterate()等:

// 创建Stream并按字符串长度排序
Stream.of("apple", "banana", "candy")
    .sorted(Comparator.comparnigInt(String::length))
    .forEach(System.out::println);

//找出最短的词语
Stream.of("apple", "banana", "candy")
    .min(Comparator.comparingInt(String::length)) // 这里返回的是Optional
    .ifPresent(System.out::println);

也可以通过集合对象的工具方法把集合转换为Stream,例如List接口的默认方法stream()parallelStream()可把List分别转换为串行与并行Stream。

List<Monkey> sorted = monkeys.stream() // 把List转换为Stream

Stream的延迟运算

Stream默认不使用并行,并且延迟运算:

Stream.generate(Math::random) // 无穷随机数序列
    .limit(8)                 // 取前8个
    .mapToDouble(x->x)        // 将Stream<Double>转换为DoubleStream
    .sum();                   // 求和
    
// 使用iterate创建无穷序列。这里生成间隔一周的日期序列
Stream<LocalDate> dates = Stream.iterate(LocalDate.now(), (d)-> d.plusWeeks(1));
// 获取并打印2020年剩余的周数
TemporalField woy = WeekFields.of(Locale.getDefault()).weekOfWeekBasedYear();
List<Integer> wks = dates.takeWhile((d)->d.getYear()<2021)
    .map((d)-> d.get(woy))
    .collect(Collectors.toList());  // 由于Stream只能被使用一次,我们想获取返回数据必须要转换为集合类对象。
    System.out.println(wks);

对Stream排序

// 按Monkey的年龄与名字排序:
List<Monkey> monkeys = Arrays.asList(new Monkey(12, "Jonh", Arrays.asList("cat", "dog")),
    new Monkey(13, "Doh", Arrays.asList("cat", "pig")),
    new Monkey(28, "Apple", Arrays.asList("cat", "pig")),
    new Monkey(28, "Sam", Arrays.asList("bird", "pig")));
List<Monkey> sorted = monkeys.stream() // 把List转换为Stream
    .sorted(Comparator.comparingInt(Monkey::getAge) // 按年龄
        .reversed()                                 // 从大到小排序
        .thenComparing(Comparator.comparing(Monkey::getName).reversed())) 年龄相同时按姓名排序
    .collect(Collectors.toList());     // 把Stream转换为List返回

使用flatMap合并Stream

使用flatMap合并Stream,与Clojure的mapcat类似。

Set<String> toys = monkeys.stream()
    .flatMap((d)->d.getToys().stream())
    .collect(Collectors.toSet());
System.out.println(toys);

使用groupingBy按条件函数分类

使用groupingBy按条件函数(pred-function)分类,类似于SQL的GROUP BY或Clojure的group-by函数。

// 从系统读取单词列表,按单词长度统计词频
try(Stream<String> lines = Files.lines(Paths.get("/usr/share/dict/words"))){
    Map<Integer, Long> byLen = lines
        .collect(
            // groupingBy的第二个参数可选,不提供时返回列表
            Collectors.groupingBy(String::length, Collectors.counting()));
    System.out.println(byLen);
}

使用collect将Stream转换为集合对象

// 把List转换为Map对象是开发时常见的一个需求
// 例如这里把monkeys对象列表转换为Map,Key/Value分别对应姓名与Monkey对象:
Map<String, Monkey> monkeyMap = monkeys.stream()
    .collect(Collectors.toMap(Monkey::getName, Function.identity()));

修改对象的注意事项

// # Stream虽然是不可修改的,但你依然可以修改对象
// 如果涉及到对象共享或并发时,需要小心。
monkeys.stream()
    .forEach((m)->m.setAge(Double.valueOf(Math.random()*88).intValue()));
System.out.println(monkeys); // monkeys已被修改!

并行运算

你可以注释掉下面.parallel()一行,对比查看串行与并行的差异。

monkeys.stream()
    // .sequential() // 默认即是串行,不必调用此方法
    .parallel() // 后面的处理使用并行
    .forEach(Monkey::sleep);

Stream并行运算使用的是一个全局共享的ForkJoinPool 线程池,存在两个问题

  1. 此线程池默认创建CPU数量等同的线程,一旦存在多个耗时的任务,新提交的任务会等待
  2. 此线程池默认创建的是Daemon线程,程序退出时会直接放弃当前任务退出

对第一个问题,可以使用自定义的ForkJoinPool:

ForkJoinPool pool = new ForkJoinPool(4);
ForkJoinTask<Long> result = pool.submit(() -> LongStream.rangeClosed(1, 1_000)
    .parallel()
    .map(App::pow2)
    .sum());

try {
    System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

对第二个问题,可在程序退出前调用ForkJoinPool的awaitQuiescence方法在退出前等待:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
// 等待任务执行完,并且设置最多等待10秒
commonPool.awaitQuiescence(10, TimeUnit.SECONDS);
// 对于Web应用,可注册为ShutdownHook,在程序退出时自动等待
Runtime.getRuntime()
    .addShutdownHook(new Thread(() -> commonPool.awaitQuiescence(10, TimeUnit.SECONDS)));

Java.util.Objects工具类

Java程序里null相关的处理一直是个繁琐且让人头疼的问题, Java.util.Objects添加了一些考虑了null情况的方法,如requireNonNull, requireNonNullElse, hashCode, requireNonNullElseGet​等。

// obj为null时会抛出NullPointerException,一般在需要fail-fast的场景使用
Objects.requireNonNull(obj);

// 若obj为null返回第二个参数,否则返回obj
var t = Objects.requireNonNullElse(obj, defaultObj);
// requireNonNullElseGet​与requireNonNullElse类似,
// 但第二个参数为 Supplier,与前一种形式的差别为使用此方式可延迟运算。使用此方法避免不不要的运算。
var t = Objects.requireNonNullElse(obj, ()->slowObjectCreator());

Map接口的新方法

Map接口添加一些实用的方法:

  • 创建不可修改的Map对象:
    // 创建不可修改Map
    Map<String, Integer> m = Map.of("john", 18, "sam", 22);
    
  • 更新或添加键值:
    // 键值不存在时赋值
    m.putIfAbsent("sam", 22);
    
    // 键值存在时更新
    m.computeIfPresent("sam", (k, n)->n+1);
    
    // 键值不存在时赋值,通过Lambda表达式延迟运算
    m.computeIfAbsent("jerry", k->12);
    
  • forEach()方法对Map的key-val迭代运算,可替代原来需要entrySet()方法才能实现的代码:
    Map<String, Integer> m = Map.of("john", 18, "sam", 22);
    m.forEach((k, v)->
        System.out.println(String.format("Name: %s Age: %d", k, v)));
    

异步运算/求值

CompletableFuture用于在新线程里执行任务并获取运算结果,与Clojure的delay/future类似。

注意CompletableFuture默认使用全局ForkJoinPool,存在前面讨论过的相同问题。

CompletableFuture<String> cf = CompletableFuture.supplyAsync(App::nextInt)
    .thenApply(Object::toString);
System.out.println(cf.get()); // 阻塞当前线程,等待计算结果

另外,你也还可以使用supplyAsync方法第二个参数使用自定义线程池。

参考

Comment