Lambda (λ)表达式
学习stream之前先了解 Lambda 表达式, 因为大量的流式API配合Lambda 表达式, 起到简化易读的效果
λ表达式某种情况下可以被当做是一个Object (注意是某种情况下); λ表达式的类型, 叫做”目标类型(target type)”;
λ表达式的目标类型是”函数接口(functional interface)”, 这是JDK8新引入的概念;
它的定义是:一个接口, 如果只有一个显式声明的抽象方法, 那么它就是一个函数接口;
一般用@FunctionalInterface标注出来(也可以不标);
或者说它是目标类型的一个函数对象, 因为它的只能是一个接口, 只有一个抽象方法 它表示这个方法/函数
例如: Runnable接口只有一个 run()方法:
新建一个线程可以这样表示
new Thread(
()->{
System.out.println("hello word");
}
).start();
()->{ ... }这部分被自行推断为Runable接口,而System.out.println("hello word");就是run方法的代码内容
下面代码是合法的
Runnable runable = ()->{
System.out.println("hello word");
};一个λ表达式只有在转型成一个函数接口后才能被当做Object使用;
所以下面这句不能编译:
//错误! 目标类型不明
System.out.println( () -> {} ); 必须先转型:
// 正确
System.out.println( (Runnable)() -> {} ); 假设你自己写了一个函数接口, 长的跟Runnable一模一样:
@FunctionalInterface
public interface MyRunnable {
public void run();
}那么
Runnable r1 = () -> {System.out.println("Hello Lambda!");};
MyRunnable2 r2 = () -> {System.out.println("Hello Lambda!");};都是正确的写法; 这说明一个λ表达式可以有多个目标类型(函数接口), 只要函数匹配成功即可; 但需注意一个λ表达式必须至少有一个目标类型;
方法引用(Method reference)
任何一个λ表达式都可以代表某个函数接口的唯一方法的匿名描述符; 我们也可以使用某个类的某个具体方法来代表这个描述符, 叫做方法引用; 例如:
Integer::parseInt //静态方法引用
String::length //实例方法引用
Person::new //构造器引用通常是调用的方法都是已有的方法, 直接引用已有的方法作为目标类型, 请注意该方法必须能被转为”目标类型”
[> 参考](http://blog.csdn.net/ioriogami/article/details/12782141/)
Stream API
无存储; stream不是一种数据结构, 它只是某种数据源的一个视图, 数据源可以是一个数组, Java容器或I/O channel等; 为函数式编程而生; 对stream的任何修改都不会修改背后的数据源, 比如对stream执行过滤操作并不会删除被过滤的元素, 而是会产生一个不包含被过滤元素的新stream; 惰式执行; stream上的操作并不会立即执行, 只有等到用户真正需要结果的时候才会执行; 可消费性; stream只能被”消费”一次, 一旦遍历过就会失效, 就像容器的迭代器那样, 想要再次遍历必须重新生成;
中间操作
- 中间操作的一些方法()
concat()
distinct()//去重
filter()//过滤Stream 中的某些数据; (函数表达式返回true即要保留的数据)
flatMap()//将多个Stream 合并成一个 Stream; (函数表达式返回Stream即可)
limit()
map()//将 Stream 中每一个数据, 映射为另外一个数据; (函数表达式返回映射的目标对象)
peek()
skip()
sorted()
parallel()
sequential()
unordered()结束操作
- 结束操作的一些方法()
allMatch()//是否至少匹配一个元素
anyMatch()//是否匹配所有元素
noneMatch()//是否所有都不匹配
collect()
count()
findAny()//返回任意元素
findFirst()//有顺序, 返回集合中第一个元素, 一般使用findAny 性能较好一点
forEach()
forEachOrdered()//按顺序
max()
min()
reduce()
toArray()规约(reduce)
reduce 操作可以实现从一组元素中生成一个值, 比如: sum(), max(), min(), count()等都是reduce操作, 将他们单独设为函数只是因为经常使用;
求流中元素所有字符串的长度值
String[] arr = new String[] {"a","bb","ccc","dddd"};
Stream<String> stream= Arrays.stream(arr);
//stream.reduce 第一个function指明初始值, 第二个function,指明计算方式, 参sum是上下文累加值; (还有第三个function是并发的时候如何计算
int total = stream.reduce(0, (sum, str) -> sum+str.length(), (a, b) -> a+b).intValue();
System.out.println( total );过滤(filter)
用于从流中去除不符合条件的元素,只保留那些满足特定条件的元素。
List<Integer> numbers = Arrays.asList(1, 2, 3, 10, 15, 20, 5);
// 使用stream API过滤出大于10的数字
List<Integer> filteredNumbers = numbers.stream()
.filter(n -> n > 10) // 返回true则保留
.collect(Collectors.toList()); // 收集过滤后的结果到一个新的列表
System.out.println(filteredNumbers); // 输出:[15, 20]
收集(collect)
String[] arr = new String[] {"a","bb","ccc","dddd"};
Stream<String> stream= Arrays.stream(arr);
//Collectors.toMap 第一个function指明key;这个只接受实例方法的引用,例String::toUpperCase, 无法访问上下文的, 第二个function指明value, 这个可以访问上下文; 还可接受第三个参数, 是key冲突的如何处理的
Map<Integer, Integer> map = stream.collect(Collectors.toMap( String::length , t->t.length())); //两种方式的Lambda
System.out.println( map );
//转为数组
String[] roleCodes = roles.stream().map(RoleSmallDTO::getCode).toArray(String[]::new);复杂的自定义
<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
supplier 是一个生成目标类型实例的方法, 代表着目标容器是什么;
accumulator 是将操作的目标数据填充到supplier 生成的目标类型实例中去的方法, 代表着如何将元素添加到容器中;
combiner 是将多个supplier 生成的实例整合到一起的方法, 代表着规约操作, 将多个结果合并;
分区(partitioningBy)
partitioningBy将流中的元素按照给定的条件分成 两个分区(true 和 false)。
List<Student> students = Arrays.asList(
new Student("Alice", 85, "CS"),
new Student("Bob", 55, "Math"),
new Student("Charlie", 92, "CS"),
new Student("David", 78, "Physics"),
new Student("Eve", 45, "Math"),
new Student("Frank", 88, "CS")
);
// 1. 基本分区:按成绩是否及格分区
Map<Boolean, List<Student>> passedFailed = students.stream()
.collect(partitioningBy(s -> s.getScore() >= 60));
System.out.println("=== 按成绩是否及格分区 ===");
System.out.println("及格: " + passedFailed.get(true));
System.out.println("不及格: " + passedFailed.get(false));
// 2. 分区并计算统计信息
Map<Boolean, Long> countByPass = students.stream()
.collect(partitioningBy(s -> s.getScore() >= 60, counting()));
System.out.println("\n=== 及格/不及格人数统计 ===");
System.out.println("及格人数: " + countByPass.get(true));
System.out.println("不及格人数: " + countByPass.get(false));
// 3. 分区并计算平均分
Map<Boolean, Double> avgScoreByPass = students.stream()
.collect(partitioningBy(
s -> s.getScore() >= 60,
averagingInt(Student::getScore)
));
System.out.println("\n=== 及格/不及格平均分 ===");
System.out.println("及格平均分: " + avgScoreByPass.get(true));
System.out.println("不及格平均分: " + avgScoreByPass.get(false));
// 4. 分区并只获取姓名列表
Map<Boolean, List<String>> namesByPass = students.stream()
.collect(partitioningBy(
s -> s.getScore() >= 60,
mapping(Student::getName, toList())
));
System.out.println("\n=== 及格/不及格学生姓名 ===");
System.out.println("及格学生: " + namesByPass.get(true));
System.out.println("不及格学生: " + namesByPass.get(false));
}分组(groupingBy)
groupingBy根据指定的分类函数将元素分组,可以分成 多个组。
List<Student> students = Arrays.asList(
new Student("Alice", 85, "CS"),
new Student("Bob", 55, "Math"),
new Student("Charlie", 92, "CS"),
new Student("David", 78, "Physics"),
new Student("Eve", 45, "Math"),
new Student("Frank", 88, "CS"),
new Student("Grace", 91, "Physics")
);
// 1. 基本分组:按院系分组
Map<String, List<Student>> byDepartment = students.stream()
.collect(groupingBy(Student::getDepartment));
System.out.println("=== 按院系分组 ===");
byDepartment.forEach((dept, list) ->
System.out.println(dept + ": " + list));
// 2. 分组并计数
Map<String, Long> countByDept = students.stream()
.collect(groupingBy(Student::getDepartment, counting()));
System.out.println("\n=== 各院系学生人数 ===");
countByDept.forEach((dept, count) ->
System.out.println(dept + ": " + count + "人"));
// 3. 分组并计算平均分
Map<String, Double> avgScoreByDept = students.stream()
.collect(groupingBy(
Student::getDepartment,
averagingInt(Student::getScore)
));
System.out.println("\n=== 各院系平均分 ===");
avgScoreByDept.forEach((dept, avg) ->
System.out.println(dept + ": " + String.format("%.2f", avg)));
// 4. 分组并获取最高分学生
Map<String, Optional<Student>> topStudentByDept = students.stream()
.collect(groupingBy(
Student::getDepartment,
maxBy(Comparator.comparingInt(Student::getScore))
));
System.out.println("\n=== 各院系最高分学生 ===");
topStudentByDept.forEach((dept, student) ->
System.out.println(dept + ": " + student.orElse(null)));
// 5. 多级分组:先按院系,再按成绩等级
Map<String, Map<String, List<Student>>> multiLevelGrouping = students.stream()
.collect(groupingBy(
Student::getDepartment,
groupingBy(s -> {
if (s.getScore() >= 90) return "优秀";
else if (s.getScore() >= 70) return "良好";
else if (s.getScore() >= 60) return "及格";
else return "不及格";
})
));
System.out.println("\n=== 多级分组:院系 -> 成绩等级 ===");
multiLevelGrouping.forEach((dept, gradeMap) -> {
System.out.println(dept + ":");
gradeMap.forEach((grade, list) ->
System.out.println(" " + grade + ": " + list));
});
// 6. 分组并只获取姓名列表
Map<String, List<String>> namesByDept = students.stream()
.collect(groupingBy(
Student::getDepartment,
mapping(Student::getName, toList())
));
System.out.println("\n=== 各院系学生姓名 ===");
namesByDept.forEach((dept, names) ->
System.out.println(dept + ": " + names));并发流
默认的 ForkJoinPool
String[] arr = new String[] {"a","bb","ccc","dddd"};
Stream<String> stream= Arrays.stream(arr);parallelStream()方法:在集合上调用此方法会返回一个并行流,例如list.parallelStream()。-
parallel()方法:从普通流升级成为并发流,例如stream.parallel()。
底层机制:基于 ForkJoinPool 框架,默认使用公共的 ForkJoinPool.commonPool(),这个线程池是由整个应用程序共享的。该池的大小通常等于可用处理器核心数减一(但可通过系统属性调整)。它不适合阻塞型任务.
ForkJoinPool对阻塞 I/O 任务 不友好?
根本原因在于 ForkJoinPool的工作窃取(Work-Stealing)算法是为 CPU 密集型任务设计的,而阻塞 I/O 会破坏其调度。
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
long currentTimeMillis = System.currentTimeMillis();
System.out.println("开始时间:" + currentTimeMillis);
List<Integer> collect = numbers.parallelStream().map(n -> {
try {
System.out.println("线程名称:" + Thread.currentThread().getName()+" 处理:"+n);
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return n * n;
}).toList();
//如果所有线程阻塞, 整个池会完全停滞, 换句话说: 最大的并发数量就是 ForkJoinPool 该池的大小, 超出后将会无限等待, 直至有空闲的线程处理
System.out.println("结束耗时:" + (System.currentTimeMillis() - currentTimeMillis));// 结束耗时:10031
}
- 每个线程维护一个双端队列(Deque),优先处理自己队列中的任务(LIFO)。
- 当自己的队列为空时,会从其他线程队列的尾部窃取任务(FIFO)。
- **如果所有线程都因 I/O 阻塞,整个池会完全停滞。
自定义线程池
方法 1:使用 ForkJoinPool.submit()+ ForkJoinTask.get()
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.Collectors;
public class ParallelStreamWithCustomPool {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 创建自定义线程池(例如,设置并行度为4)
ForkJoinPool customThreadPool = new ForkJoinPool(4);
try {
// 将并行流操作封装成任务提交到自定义池
List<Integer> squaredNumbers = customThreadPool.submit(() ->
numbers.parallelStream() // 在自定义池中并行执行
.map(n -> {
System.out.println(Thread.currentThread().getName());
return n * n;
})
.collect(Collectors.toList())
).get(); // 阻塞等待结果
System.out.println(squaredNumbers);
} catch (Exception e) {
e.printStackTrace();
} finally {
customThreadPool.shutdown(); // 关闭池
}
}
} 方法 2:使用 CompletableFuture.supplyAsync()(更灵活)
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
public class ParallelStreamWithCustomPoolAsync {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 创建自定义线程池
ForkJoinPool customThreadPool = new ForkJoinPool(4);
CompletableFuture<List<Integer>> future = CompletableFuture.supplyAsync(() ->
numbers.parallelStream()
.map(n -> {
System.out.println(Thread.currentThread().getName());
return n * n;
})
.collect(Collectors.toList()),
customThreadPool // 指定执行任务的线程池
);
// 非阻塞获取结果(或使用 future.get() 阻塞获取)
future.thenAccept(squaredNumbers -> {
System.out.println("Result: " + squaredNumbers);
customThreadPool.shutdown(); // 在回调中关闭池
});
// 主线程继续执行其他任务...
future.join(); // 或在此阻塞等待(根据需求选择)
}
}如果操作包含阻塞I/O,建议用 CompletableFuture+ ExecutorService(如 ThreadPoolExecutor)替代并行流
**
方法 3:使用非并发流, 指定线程池
ExecutorService ioPool = Executors.newFixedThreadPool(50); // 专用 I/O 池
List<CompletableFuture<String>> futures = dataList.stream()
.map(data -> CompletableFuture.supplyAsync(() -> {
// 模拟阻塞 I/O 操作
return queryDatabase(data);
}, ioPool)) // 指定专用线程池
.collect(Collectors.toList());
// 非阻塞组合结果
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);public static void main(String[] args) {
ExecutorService ioPool = Executors.newFixedThreadPool(50); // 专用 I/O 池
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
long currentTimeMillis = System.currentTimeMillis();
System.out.println("开始时间:" + currentTimeMillis);
List<CompletableFuture<Integer>> futures = numbers.stream()
.map(n -> CompletableFuture.supplyAsync(() -> {
try {
System.out.println("线程名称:" + Thread.currentThread().getName()+" 处理:"+n);
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return n * n;
}, ioPool)) // 指定专用线程池
.collect(Collectors.toList());
System.out.println("结束耗时:" + (System.currentTimeMillis() - currentTimeMillis));//结束耗时:18
ioPool.shutdown();
}| 任务类型 | 推荐方案 | 线程池示例 |
|---|---|---|
| 纯 CPU 计算 | parallelStream() | ForkJoinPool.commonPool() |
| 阻塞 I/O | CompletableFuture+ 专用线程池 | ThreadPoolExecutor |
| 混合型任务 | 拆分 + 不同池处理 | CPU 池 + I/O 池双隔离 |
实例
实例 从迭代器创建流
Stream<String> targetStream = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(sourceIterator, Spliterator.ORDERED),
false // false 表示创建一个顺序流,true 则表示并行流(通常不需要)
);实例 拼接字符串集合 (joining)
//将集合转换成字符串
//方法一:join(Collection var0, String var1)
String a = StringUtils.join(secondCateNameList, ",");
System.out.println("AND eod.second_cate_name in("+ a + ")");
//方法二:joining(CharSequence delimiter, CharSequence prefix, CharSequence suffix) //分隔符, 前缀, 后缀
String b = secondCateNameList.stream().collect(Collectors.joining(",","(",")"));
System.out.println("AND eod.second_cate_name in"+ b);
实例 数据分区 (partitioningBy)
key只能布尔值, 二分数据
Map<Boolean, List<Student>> map1 = list1.stream().collect(Collectors.partitioningBy(item -> item.getScore()>= 90));
System.out.println(map1);实例 数据分组 (Map<Object, List>)
List 分组转为 Map<Object, List>
//如果多分数据的话可以使用 分组操作
List<Employee> employeeList = Arrays.asList(
new Employee("Tom Jones", 45, 15000.00),//name, age, salary
new Employee("Harry Andrews", 45, 7000.00),
new Employee("Ethan Hardy", 65, 8000.00),
new Employee("Nancy Smith", 22, 10000.00),
new Employee("Deborah Sprightly", 29, 9000.00)
)
// Collectors.groupingBy 的第一个参数作为分组条件, 第二个参数(Collectors.mapping)为映射方式
// Collectors.mapping 第一个参数的返回值, 会被其第二个参数收集 Collectors.toList()
Object employee = employeeList.stream()
.collect(
Collectors.groupingBy(
Employee::getAge,
Collectors.mapping(e->e.getName(), Collectors.toList())
)
);
System.out.println("List of names:" + employee);
//List of names:{65=[Ethan Hardy], 22=[Nancy Smith], 29=[Deborah Sprightly], 45=[Tom Jones, Harry Andrews]}
// 实例:
Map<MassifACropListDTO, List<BaseAgriCropDTO>> data = content.stream().collect(
Collectors.groupingBy(
item -> {//分组 (即Key)
MassifACropListDTO mal = new MassifACropListDTO();
mal.setId((String) item.get("id"));
mal.setName((String) item.get("name"));
return mal;
},
Collectors.mapping(item->{//分组元素 (即list)
if(item.get("baseAgriCropId") == null){//null 也会调用一次
return null;
}
BaseAgriCropDTO bac = new BaseAgriCropDTO();
bac.setId((String) item.get("baseAgriCropId"));
bac.setPic((String) item.get("baseAgriCropPic"));
return bac;
}, Collectors.toList())
)
);
final List<MassifACropListDTO> ret = new ArrayList<>(data.size());
data.forEach((mal,lst)->{//设置分组对象的 list
if(lst.get(0) != null){//null 也会有一条
mal.setBaseAgriCrops(lst);// 给map 设置list
}
ret.add(mal);//只要key
});
// 简单属性
Map<String, List<ProProjectFileVo>> data = proProjectFiles.stream().collect(
Collectors.groupingBy(
ProProjectFileVo::getFileType,
Collectors.mapping(item-> item, Collectors.toList())
)
);实例 从集合中查找一个 (filter)
Optional<Sl_std_ref> ref_find = std_refs.stream().filter( ref ->{
return e.getName().equals(ref.getName()) ;
}).findAny();
// findAny()//返回任意元素
// findFirst()//有顺序, 返回集合中第一个元素, 一般使用findAny 性能较好一点
if(ref_find.isPresent() ) {
//存在
}
实例 List 转 Map (toMap)
Map<Integer,String> userMap1 = userList.stream()
.collect(Collectors.toMap(User::getId,User::getName));
//key是对象中的某个属性值, value是对象本身(使用Function.identity()的写法)
Map<String, User> userMap3 = userList.stream()
.collect(Collectors.toMap(User::getId, Function.identity()));
实例 将对象List转另一个对象的List (map)
List<TestSampleDto> lpd = lst.stream().map( (e)-> {
TestSampleDto tsd = new TestSampleDto();
tsd.setName(e.getName());
return tsd;
}).collect(Collectors.toList());实例 收集stream里的list 合并为一个list (flatMap)
类似把二维集合, 转成一维的
projects.stream()
.flatMap(p -> p.getComponents().stream())//这里又是一个集合
.collect(Collectors.toList());//没有外层的(projects)!实例 将对象List的某个属性转数组(map)
String[] asName = hr.getElements().stream().map(e->{
return e.getAlias();
}).toArray(String[]::new);实例 Map转另一个对象的集合(map)
List<Attributes> fields = mappName.entrySet().stream().map( (e)->{
return new Attributes(e.getKey(), e.getValue());
}).collect(Collectors.toList() );
实例 判定是否包含元素 anyMatch
boolean hasTask = allTaskList.stream().anyMatch(e -> STATE_DOWNLOADING == e.getSta());
if (hasTask){//有未完成的任务
}