对于习惯使用函数式编程语言(比如Clojure)的程序员来说,Java 8提供的Lambda表达式及Stream API无疑是个福音。虽说仍缺少immutable数据,也没有简单的async支持,但想对于早年的Java已经算是有很大的进步了。本文从Modern Java Recipes一书中摘取部分要点介绍。
Lambda表达式
Lambda表达式本质上是Java的匿名类的对象。下面是一些创建/声明Lambda表达式的方法:
# 完整形式的Lambda表达式定义
Function<Long, Long> add2 = (x) -> {return x +2;};
# 函数体只有一个语句时可以简写为
Function<Long, Long> add2 = x -> x +2;
# 带两个参数的函数
BiFunction<Long, Long, Long> a = (x, y) -> x + y;
# 无返回值的函数
Consumer<Object> printer = x -> System.out.println("--------\n" + x);
printer.accept(add2.andThen(time3).apply(8L));
# 对象方法做为Lambda表达式使用, String::length等同于 s -> s.length()
# 注意用了`var`关键字作了类型推导
var compLen = Comparator.comparingInt(String::length);
# 类方法做为Lambda表达式使用,System.out::println 等同于 s -> System.out::println(s)
Stream.generate(Math::random).limit(10).forEach(System.out::println);
从上面的例子可见,Lambda表达式实现了Function, BiFunction或Consumer等接口。 这些接口的相同点是它们都使用了@FunctionInterface
标注。
自定义Lambda表达式接口十分简单,比如定义可带多个参数的Lambda表达式接口。
Lambda表达式在Stream API中使用很广泛,使用时我们很少关注表达式的对象类型,但明确一个Lambda表达式即为一个Java对象这一点十分重要。
组合多个函数
你可以把多个Lambda表达式通过andThen, compose
等方法组合为一个:
// 在App.Java里:
private static void timedPrinter(Object o) {
System.out.println(LocalTime.now() +" " + o);
}
// 在main函数:
// ...
Consumer<Object> printer = x -> System.out.println("-------------------------\n" + x);
printer = printer.andThen(App::timedPrinter);
printer.accept("这是带时间的打印输出");
// ...
捕获异常
正是因为Lambda表达式是匿名对象,实现Lambda表达式时必须要在方法体类捕获异常。
- 对于Checked Exception,编译器会强制你在Lambda表达式内捕获异常;
- 对于需要处理的Runtime Exception,如果不做异常捕获处理,在出错时你可能看不到任何错误信息。
Stream API
Stream的几个特点
- 延迟运算,利用这点可以生成无限序列
- 一个Stream对象只能被使用一次
- 可转换为并行运算
创建Stream
你可以使用Stream接口的一些静态方法创建Stream对象,比如of(), generate(), iterate()
等:
// 创建Stream并按字符串长度排序
Stream.of("apple", "banana", "candy")
.sorted(Comparator.comparnigInt(String::length))
.forEach(System.out::println);
//找出最短的词语
Stream.of("apple", "banana", "candy")
.min(Comparator.comparingInt(String::length)) // 这里返回的是Optional
.ifPresent(System.out::println);
也可以通过集合对象的工具方法把集合转换为Stream,例如List
接口的默认方法stream()
和parallelStream()
可把List
分别转换为串行与并行Stream。
List<Monkey> sorted = monkeys.stream() // 把List转换为Stream
Stream的延迟运算
Stream默认不使用并行,并且延迟运算:
Stream.generate(Math::random) // 无穷随机数序列
.limit(8) // 取前8个
.mapToDouble(x->x) // 将Stream<Double>转换为DoubleStream
.sum(); // 求和
// 使用iterate创建无穷序列。这里生成间隔一周的日期序列
Stream<LocalDate> dates = Stream.iterate(LocalDate.now(), (d)-> d.plusWeeks(1));
// 获取并打印2020年剩余的周数
TemporalField woy = WeekFields.of(Locale.getDefault()).weekOfWeekBasedYear();
List<Integer> wks = dates.takeWhile((d)->d.getYear()<2021)
.map((d)-> d.get(woy))
.collect(Collectors.toList()); // 由于Stream只能被使用一次,我们想获取返回数据必须要转换为集合类对象。
System.out.println(wks);
对Stream排序
// 按Monkey的年龄与名字排序:
List<Monkey> monkeys = Arrays.asList(new Monkey(12, "Jonh", Arrays.asList("cat", "dog")),
new Monkey(13, "Doh", Arrays.asList("cat", "pig")),
new Monkey(28, "Apple", Arrays.asList("cat", "pig")),
new Monkey(28, "Sam", Arrays.asList("bird", "pig")));
List<Monkey> sorted = monkeys.stream() // 把List转换为Stream
.sorted(Comparator.comparingInt(Monkey::getAge) // 按年龄
.reversed() // 从大到小排序
.thenComparing(Comparator.comparing(Monkey::getName).reversed())) 年龄相同时按姓名排序
.collect(Collectors.toList()); // 把Stream转换为List返回
使用flatMap合并Stream
使用flatMap合并Stream,与Clojure的mapcat
类似。
Set<String> toys = monkeys.stream()
.flatMap((d)->d.getToys().stream())
.collect(Collectors.toSet());
System.out.println(toys);
使用groupingBy按条件函数分类
使用groupingBy按条件函数(pred-function)分类,类似于SQL的GROUP BY
或Clojure的group-by
函数。
// 从系统读取单词列表,按单词长度统计词频
try(Stream<String> lines = Files.lines(Paths.get("/usr/share/dict/words"))){
Map<Integer, Long> byLen = lines
.collect(
// groupingBy的第二个参数可选,不提供时返回列表
Collectors.groupingBy(String::length, Collectors.counting()));
System.out.println(byLen);
}
使用collect将Stream转换为集合对象
// 把List转换为Map对象是开发时常见的一个需求
// 例如这里把monkeys对象列表转换为Map,Key/Value分别对应姓名与Monkey对象:
Map<String, Monkey> monkeyMap = monkeys.stream()
.collect(Collectors.toMap(Monkey::getName, Function.identity()));
修改对象的注意事项
// # Stream虽然是不可修改的,但你依然可以修改对象
// 如果涉及到对象共享或并发时,需要小心。
monkeys.stream()
.forEach((m)->m.setAge(Double.valueOf(Math.random()*88).intValue()));
System.out.println(monkeys); // monkeys已被修改!
并行运算
你可以注释掉下面.parallel()一行,对比查看串行与并行的差异。
monkeys.stream()
// .sequential() // 默认即是串行,不必调用此方法
.parallel() // 后面的处理使用并行
.forEach(Monkey::sleep);
Stream并行运算使用的是一个全局共享的ForkJoinPool 线程池,存在两个问题
- 此线程池默认创建CPU数量等同的线程,一旦存在多个耗时的任务,新提交的任务会等待
- 此线程池默认创建的是Daemon线程,程序退出时会直接放弃当前任务退出
对第一个问题,可以使用自定义的ForkJoinPool:
ForkJoinPool pool = new ForkJoinPool(4);
ForkJoinTask<Long> result = pool.submit(() -> LongStream.rangeClosed(1, 1_000)
.parallel()
.map(App::pow2)
.sum());
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
对第二个问题,可在程序退出前调用ForkJoinPool的awaitQuiescence
方法在退出前等待:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
// 等待任务执行完,并且设置最多等待10秒
commonPool.awaitQuiescence(10, TimeUnit.SECONDS);
// 对于Web应用,可注册为ShutdownHook,在程序退出时自动等待
Runtime.getRuntime()
.addShutdownHook(new Thread(() -> commonPool.awaitQuiescence(10, TimeUnit.SECONDS)));
Java.util.Objects工具类
Java程序里null相关的处理一直是个繁琐且让人头疼的问题, Java.util.Objects添加了一些考虑了null情况的方法,如requireNonNull, requireNonNullElse, hashCode, requireNonNullElseGet
等。
// obj为null时会抛出NullPointerException,一般在需要fail-fast的场景使用
Objects.requireNonNull(obj);
// 若obj为null返回第二个参数,否则返回obj
var t = Objects.requireNonNullElse(obj, defaultObj);
// requireNonNullElseGet与requireNonNullElse类似,
// 但第二个参数为 Supplier,与前一种形式的差别为使用此方式可延迟运算。使用此方法避免不不要的运算。
var t = Objects.requireNonNullElse(obj, ()->slowObjectCreator());
Map接口的新方法
Map接口添加一些实用的方法:
- 创建不可修改的Map对象:
// 创建不可修改Map Map<String, Integer> m = Map.of("john", 18, "sam", 22);
- 更新或添加键值:
// 键值不存在时赋值 m.putIfAbsent("sam", 22); // 键值存在时更新 m.computeIfPresent("sam", (k, n)->n+1); // 键值不存在时赋值,通过Lambda表达式延迟运算 m.computeIfAbsent("jerry", k->12);
forEach()
方法对Map的key-val迭代运算,可替代原来需要entrySet()
方法才能实现的代码:Map<String, Integer> m = Map.of("john", 18, "sam", 22); m.forEach((k, v)-> System.out.println(String.format("Name: %s Age: %d", k, v)));
异步运算/求值
CompletableFuture用于在新线程里执行任务并获取运算结果,与Clojure的delay/future类似。
注意CompletableFuture
默认使用全局ForkJoinPool,存在前面讨论过的相同问题。
CompletableFuture<String> cf = CompletableFuture.supplyAsync(App::nextInt)
.thenApply(Object::toString);
System.out.println(cf.get()); // 阻塞当前线程,等待计算结果
另外,你也还可以使用supplyAsync
方法第二个参数使用自定义线程池。