Lambda (λ)表达式

学习stream之前先了解 Lambda 表达式, 因为大量的流式API配合Lambda 表达式, 起到简化易读的效果

λ表达式某种情况下可以被当做是一个Object (注意是某种情况下); λ表达式的类型, 叫做”目标类型(target type)”;

λ表达式的目标类型是”函数接口(functional interface)”, 这是JDK8新引入的概念; 它的定义是:一个接口, 如果只有一个显式声明的抽象方法, 那么它就是一个函数接口; 一般用@FunctionalInterface标注出来(也可以不标);

或者说它是目标类型的一个函数对象, 因为它的只能是一个接口, 只有一个抽象方法 它表示这个方法/函数
例如: Runnable接口只有一个 run()方法:

新建一个线程可以这样表示

new Thread(
    ()->{
       System.out.println("hello word");
    }
).start();

()->{ ... } 这部分被自行推断为Runable接口,而 System.out.println("hello word"); 就是run方法的代码内容

下面代码是合法的

Runnable runable = ()->{
	System.out.println("hello word");
};

一个λ表达式只有在转型成一个函数接口后才能被当做Object使用;

所以下面这句不能编译:

//错误! 目标类型不明  
System.out.println( () -> {} ); 

必须先转型:

// 正确
System.out.println( (Runnable)() -> {} ); 

假设你自己写了一个函数接口, 长的跟Runnable一模一样:

@FunctionalInterface
public interface MyRunnable {
    public void run();
}

  那么

Runnable r1 = () -> {System.out.println("Hello Lambda!");};
MyRunnable2 r2 = () -> {System.out.println("Hello Lambda!");};

都是正确的写法; 这说明一个λ表达式可以有多个目标类型(函数接口), 只要函数匹配成功即可; 但需注意一个λ表达式必须至少有一个目标类型;

方法引用(Method reference)

任何一个λ表达式都可以代表某个函数接口的唯一方法的匿名描述符; 我们也可以使用某个类的某个具体方法来代表这个描述符, 叫做方法引用; 例如:

Integer::parseInt //静态方法引用
String::length //实例方法引用
Person::new    //构造器引用

通常是调用的方法都是已有的方法, 直接引用已有的方法作为目标类型, 请注意该方法必须能被转为”目标类型”

[> 参考](http://blog.csdn.net/ioriogami/article/details/12782141/)

Stream API

无存储; stream不是一种数据结构, 它只是某种数据源的一个视图, 数据源可以是一个数组, Java容器或I/O channel等; 为函数式编程而生; 对stream的任何修改都不会修改背后的数据源, 比如对stream执行过滤操作并不会删除被过滤的元素, 而是会产生一个不包含被过滤元素的新stream; 惰式执行; stream上的操作并不会立即执行, 只有等到用户真正需要结果的时候才会执行; 可消费性; stream只能被”消费”一次, 一旦遍历过就会失效, 就像容器的迭代器那样, 想要再次遍历必须重新生成;

中间操作

  • 中间操作的一些方法()
concat()
distinct()//去重
filter()//过滤Stream 中的某些数据; (函数表达式返回true即要保留的数据)
flatMap()//将多个Stream 合并成一个 Stream; (函数表达式返回Stream即可)
limit()
map()//将 Stream 中每一个数据, 映射为另外一个数据; (函数表达式返回映射的目标对象)
peek()
skip()
sorted()
parallel()
sequential()
unordered()

结束操作

  • 结束操作的一些方法()
allMatch()//是否至少匹配一个元素
anyMatch()//是否匹配所有元素
noneMatch()//是否所有都不匹配
collect()
count()
findAny()//返回任意元素
findFirst()//有顺序, 返回集合中第一个元素, 一般使用findAny 性能较好一点
forEach()
forEachOrdered()//按顺序
max()
min()
reduce()
toArray()

规约(reduce)

reduce 操作可以实现从一组元素中生成一个值, 比如: sum(), max(), min(), count()等都是reduce操作, 将他们单独设为函数只是因为经常使用;

求流中元素所有字符串的长度值

String[] arr = new String[] {"a","bb","ccc","dddd"};
Stream<String> stream= Arrays.stream(arr);
//stream.reduce 第一个function指明初始值, 第二个function,指明计算方式, 参sum是上下文累加值; (还有第三个function是并发的时候如何计算
int total = stream.reduce(0,  (sum, str) -> sum+str.length(), (a, b) -> a+b).intValue();
System.out.println( total );

过滤(filter)

用于从流中去除不符合条件的元素,只保留那些满足特定条件的元素。

 
List<Integer> numbers = Arrays.asList(1, 2, 3, 10, 15, 20, 5);
// 使用stream API过滤出大于10的数字
List<Integer> filteredNumbers = numbers.stream()
		 .filter(n -> n > 10) // 返回true则保留
		 .collect(Collectors.toList()); // 收集过滤后的结果到一个新的列表
System.out.println(filteredNumbers); // 输出:[15, 20]
 

收集(collect)

String[] arr = new String[] {"a","bb","ccc","dddd"};
Stream<String> stream= Arrays.stream(arr);
//Collectors.toMap 第一个function指明key;这个只接受实例方法的引用,例String::toUpperCase, 无法访问上下文的, 第二个function指明value, 这个可以访问上下文; 还可接受第三个参数, 是key冲突的如何处理的
Map<Integer, Integer> map = stream.collect(Collectors.toMap( String::length ,  t->t.length())); //两种方式的Lambda
System.out.println( map );
 
//转为数组
String[] roleCodes = roles.stream().map(RoleSmallDTO::getCode).toArray(String[]::new);

复杂的自定义

<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner); supplier 是一个生成目标类型实例的方法, 代表着目标容器是什么; accumulator 是将操作的目标数据填充到supplier 生成的目标类型实例中去的方法, 代表着如何将元素添加到容器中; combiner 是将多个supplier 生成的实例整合到一起的方法, 代表着规约操作, 将多个结果合并;

分区(partitioningBy)

partitioningBy将流中的元素按照给定的条件分成 两个分区(true 和 false)。

List<Student> students = Arrays.asList(
	new Student("Alice", 85, "CS"),
	new Student("Bob", 55, "Math"),
	new Student("Charlie", 92, "CS"),
	new Student("David", 78, "Physics"),
	new Student("Eve", 45, "Math"),
	new Student("Frank", 88, "CS")
);
 
// 1. 基本分区:按成绩是否及格分区
Map<Boolean, List<Student>> passedFailed = students.stream()
	.collect(partitioningBy(s -> s.getScore() >= 60));
 
System.out.println("=== 按成绩是否及格分区 ===");
System.out.println("及格: " + passedFailed.get(true));
System.out.println("不及格: " + passedFailed.get(false));
 
// 2. 分区并计算统计信息
Map<Boolean, Long> countByPass = students.stream()
	.collect(partitioningBy(s -> s.getScore() >= 60, counting()));
 
System.out.println("\n=== 及格/不及格人数统计 ===");
System.out.println("及格人数: " + countByPass.get(true));
System.out.println("不及格人数: " + countByPass.get(false));
 
// 3. 分区并计算平均分
Map<Boolean, Double> avgScoreByPass = students.stream()
	.collect(partitioningBy(
		s -> s.getScore() >= 60, 
		averagingInt(Student::getScore)
	));
 
System.out.println("\n=== 及格/不及格平均分 ===");
System.out.println("及格平均分: " + avgScoreByPass.get(true));
System.out.println("不及格平均分: " + avgScoreByPass.get(false));
 
// 4. 分区并只获取姓名列表
Map<Boolean, List<String>> namesByPass = students.stream()
	.collect(partitioningBy(
		s -> s.getScore() >= 60,
		mapping(Student::getName, toList())
	));
 
System.out.println("\n=== 及格/不及格学生姓名 ===");
System.out.println("及格学生: " + namesByPass.get(true));
System.out.println("不及格学生: " + namesByPass.get(false));
}

分组(groupingBy)

groupingBy根据指定的分类函数将元素分组,可以分成 多个组

 List<Student> students = Arrays.asList(
	new Student("Alice", 85, "CS"),
	new Student("Bob", 55, "Math"),
	new Student("Charlie", 92, "CS"),
	new Student("David", 78, "Physics"),
	new Student("Eve", 45, "Math"),
	new Student("Frank", 88, "CS"),
	new Student("Grace", 91, "Physics")
);
 
// 1. 基本分组:按院系分组
Map<String, List<Student>> byDepartment = students.stream()
	.collect(groupingBy(Student::getDepartment));
 
System.out.println("=== 按院系分组 ===");
byDepartment.forEach((dept, list) -> 
	System.out.println(dept + ": " + list));
 
// 2. 分组并计数
Map<String, Long> countByDept = students.stream()
	.collect(groupingBy(Student::getDepartment, counting()));
 
System.out.println("\n=== 各院系学生人数 ===");
countByDept.forEach((dept, count) -> 
	System.out.println(dept + ": " + count + "人"));
 
// 3. 分组并计算平均分
Map<String, Double> avgScoreByDept = students.stream()
	.collect(groupingBy(
		Student::getDepartment, 
		averagingInt(Student::getScore)
	));
 
System.out.println("\n=== 各院系平均分 ===");
avgScoreByDept.forEach((dept, avg) -> 
	System.out.println(dept + ": " + String.format("%.2f", avg)));
 
// 4. 分组并获取最高分学生
Map<String, Optional<Student>> topStudentByDept = students.stream()
	.collect(groupingBy(
		Student::getDepartment,
		maxBy(Comparator.comparingInt(Student::getScore))
	));
 
System.out.println("\n=== 各院系最高分学生 ===");
topStudentByDept.forEach((dept, student) -> 
	System.out.println(dept + ": " + student.orElse(null)));
 
// 5. 多级分组:先按院系,再按成绩等级
Map<String, Map<String, List<Student>>> multiLevelGrouping = students.stream()
	.collect(groupingBy(
		Student::getDepartment,
		groupingBy(s -> {
			if (s.getScore() >= 90) return "优秀";
			else if (s.getScore() >= 70) return "良好";
			else if (s.getScore() >= 60) return "及格";
			else return "不及格";
		})
	));
 
System.out.println("\n=== 多级分组:院系 -> 成绩等级 ===");
multiLevelGrouping.forEach((dept, gradeMap) -> {
	System.out.println(dept + ":");
	gradeMap.forEach((grade, list) -> 
		System.out.println("  " + grade + ": " + list));
});
 
// 6. 分组并只获取姓名列表
Map<String, List<String>> namesByDept = students.stream()
	.collect(groupingBy(
		Student::getDepartment,
		mapping(Student::getName, toList())
	));
 
System.out.println("\n=== 各院系学生姓名 ===");
namesByDept.forEach((dept, names) -> 
	System.out.println(dept + ": " + names));

并发流

默认的 ForkJoinPool

String[] arr = new String[] {"a","bb","ccc","dddd"};
Stream<String> stream= Arrays.stream(arr);
  • parallelStream()方法​​:在集合上调用此方法会返回一个并行流,例如 list.parallelStream()
  • parallel()方法​​:从普通流升级成为并发流,例如 stream.parallel()

​底层机制​​:基于 ForkJoinPool 框架,默认使用公共的 ForkJoinPool.commonPool(),这个线程池是由整个应用程序共享的。该池的大小通常等于可用处理器核心数减一(但可通过系统属性调整)。它不适合阻塞型任务.

ForkJoinPool对阻塞 I/O 任务 不友好?​​

根本原因在于 ​ForkJoinPool的工作窃取(Work-Stealing)算法是为 CPU 密集型任务设计的​​,而阻塞 I/O 会破坏其调度。

public static void main(String[] args) {
	List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
	long currentTimeMillis = System.currentTimeMillis();
	System.out.println("开始时间:" + currentTimeMillis);
	List<Integer> collect = numbers.parallelStream().map(n -> {
		try {
			System.out.println("线程名称:" + Thread.currentThread().getName()+" 处理:"+n);
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
		return n * n;
	}).toList();
	//如果所有线程阻塞, 整个池会完全停滞, 换句话说: 最大的并发数量就是 ForkJoinPool 该池的大小, 超出后将会无限等待, 直至有空闲的线程处理
	System.out.println("结束耗时:" + (System.currentTimeMillis() - currentTimeMillis));// 结束耗时:10031
}
  1. 每个线程维护一个双端队列(Deque),优先处理自己队列中的任务(LIFO)。
  2. 当自己的队列为空时,会从其他线程队列的​​尾部​​窃取任务(FIFO)。
  3. **如果所有线程都因 I/O 阻塞,整个池会完全停滞。

自定义线程池

方法 1:使用 ForkJoinPool.submit()ForkJoinTask.get()

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.Collectors;
 
public class ParallelStreamWithCustomPool {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 
        // 创建自定义线程池(例如,设置并行度为4)
        ForkJoinPool customThreadPool = new ForkJoinPool(4);
 
        try {
            // 将并行流操作封装成任务提交到自定义池
            List<Integer> squaredNumbers = customThreadPool.submit(() ->
                numbers.parallelStream() // 在自定义池中并行执行
                       .map(n -> {
                           System.out.println(Thread.currentThread().getName());
                           return n * n;
                       })
                       .collect(Collectors.toList())
            ).get(); // 阻塞等待结果
 
            System.out.println(squaredNumbers);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            customThreadPool.shutdown(); // 关闭池
        }
    }
}

 方法 2:使用 CompletableFuture.supplyAsync()(更灵活)

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
 
public class ParallelStreamWithCustomPoolAsync {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 
        // 创建自定义线程池
        ForkJoinPool customThreadPool = new ForkJoinPool(4);
 
        CompletableFuture<List<Integer>> future = CompletableFuture.supplyAsync(() ->
            numbers.parallelStream()
                   .map(n -> {
                       System.out.println(Thread.currentThread().getName());
                       return n * n;
                   })
                   .collect(Collectors.toList()),
            customThreadPool // 指定执行任务的线程池
        );
 
        // 非阻塞获取结果(或使用 future.get() 阻塞获取)
        future.thenAccept(squaredNumbers -> {
            System.out.println("Result: " + squaredNumbers);
            customThreadPool.shutdown(); // 在回调中关闭池
        });
 
        // 主线程继续执行其他任务...
        future.join(); // 或在此阻塞等待(根据需求选择)
    }
}

如果操作包含阻塞I/O,建议用 CompletableFuture+ ExecutorService(如 ThreadPoolExecutor)替代并行流

**

方法 3:使用非并发流, 指定线程池

ExecutorService ioPool = Executors.newFixedThreadPool(50); // 专用 I/O 池
 
List<CompletableFuture<String>> futures = dataList.stream()
    .map(data -> CompletableFuture.supplyAsync(() -> {
        // 模拟阻塞 I/O 操作
        return queryDatabase(data); 
    }, ioPool)) // 指定专用线程池
    .collect(Collectors.toList());
 
// 非阻塞组合结果
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
    .thenApply(v -> futures.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList())
    );
public static void main(String[] args) {
        ExecutorService ioPool = Executors.newFixedThreadPool(50); // 专用 I/O 池
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
 
        long currentTimeMillis = System.currentTimeMillis();
        System.out.println("开始时间:" + currentTimeMillis);
        List<CompletableFuture<Integer>> futures = numbers.stream()
                .map(n -> CompletableFuture.supplyAsync(() -> {
                    try {
                        System.out.println("线程名称:" + Thread.currentThread().getName()+" 处理:"+n);
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return n * n;
                }, ioPool)) // 指定专用线程池
                .collect(Collectors.toList());
        System.out.println("结束耗时:" + (System.currentTimeMillis() - currentTimeMillis));//结束耗时:18
        ioPool.shutdown();
    }
任务类型​​推荐方案​​线程池示例​
​纯 CPU 计算​parallelStream()ForkJoinPool.commonPool()
​阻塞 I/O​CompletableFuture+ 专用线程池ThreadPoolExecutor
​混合型任务​拆分 + 不同池处理CPU 池 + I/O 池双隔离

实例

实例 从迭代器创建流

Stream<String> targetStream = StreamSupport.stream(
    Spliterators.spliteratorUnknownSize(sourceIterator, Spliterator.ORDERED),
    false // false 表示创建一个顺序流,true 则表示并行流(通常不需要)
);

实例 拼接字符串集合 (joining)

//将集合转换成字符串
//方法一:join(Collection var0, String var1)
String a = StringUtils.join(secondCateNameList, ",");
System.out.println("AND eod.second_cate_name in("+ a + ")");
 
 
//方法二:joining(CharSequence delimiter, CharSequence prefix, CharSequence suffix) //分隔符, 前缀, 后缀
String b = secondCateNameList.stream().collect(Collectors.joining(",","(",")"));
System.out.println("AND eod.second_cate_name in"+ b);
 

实例 数据分区 (partitioningBy)

key只能布尔值, 二分数据

Map<Boolean, List<Student>> map1 = list1.stream().collect(Collectors.partitioningBy(item -> item.getScore()>= 90));
System.out.println(map1);

实例 数据分组 (Map<Object, List>)

List 分组转为 Map<Object, List>

//如果多分数据的话可以使用 分组操作 
List<Employee> employeeList = Arrays.asList(
    new Employee("Tom Jones", 45, 15000.00),//name, age, salary
    new Employee("Harry Andrews", 45, 7000.00),
    new Employee("Ethan Hardy", 65, 8000.00),
    new Employee("Nancy Smith", 22, 10000.00),
    new Employee("Deborah Sprightly", 29, 9000.00)
)
// Collectors.groupingBy 的第一个参数作为分组条件, 第二个参数(Collectors.mapping)为映射方式
// Collectors.mapping 第一个参数的返回值, 会被其第二个参数收集 Collectors.toList()
Object employee = employeeList.stream()
    .collect(
        Collectors.groupingBy(
            Employee::getAge, 
            Collectors.mapping(e->e.getName(), Collectors.toList())
        )
    );
System.out.println("List of  names:" + employee);
//List of  names:{65=[Ethan Hardy], 22=[Nancy Smith], 29=[Deborah Sprightly], 45=[Tom Jones, Harry Andrews]}
 
// 实例: 
Map<MassifACropListDTO, List<BaseAgriCropDTO>> data = content.stream().collect(
    Collectors.groupingBy(
            item -> {//分组 (即Key)
                MassifACropListDTO mal = new MassifACropListDTO();
                mal.setId((String) item.get("id"));
                mal.setName((String) item.get("name"));
                return mal;
            },
            Collectors.mapping(item->{//分组元素 (即list)
                if(item.get("baseAgriCropId") == null){//null 也会调用一次
                    return null;
                }
                BaseAgriCropDTO bac = new BaseAgriCropDTO();
                bac.setId((String) item.get("baseAgriCropId"));
                bac.setPic((String) item.get("baseAgriCropPic"));
                return bac;
            }, Collectors.toList())
    )
);
final List<MassifACropListDTO> ret = new ArrayList<>(data.size());
data.forEach((mal,lst)->{//设置分组对象的 list
    if(lst.get(0) != null){//null 也会有一条
        mal.setBaseAgriCrops(lst);// 给map 设置list
    }
    ret.add(mal);//只要key
});
 
// 简单属性
Map<String, List<ProProjectFileVo>> data = proProjectFiles.stream().collect(  
        Collectors.groupingBy(  
                ProProjectFileVo::getFileType,  
                Collectors.mapping(item-> item, Collectors.toList())  
        )  
);

实例 从集合中查找一个 (filter)

Optional<Sl_std_ref> ref_find =	std_refs.stream().filter( ref ->{
    return e.getName().equals(ref.getName()) ;
}).findAny();
// findAny()//返回任意元素
// findFirst()//有顺序, 返回集合中第一个元素, 一般使用findAny 性能较好一点
if(ref_find.isPresent() ) {
    //存在
}
 

实例 List 转 Map (toMap)

 Map<Integer,String> userMap1 = userList.stream()
    .collect(Collectors.toMap(User::getId,User::getName));
//key是对象中的某个属性值, value是对象本身(使用Function.identity()的写法)
Map<String, User> userMap3 = userList.stream()
    .collect(Collectors.toMap(User::getId, Function.identity()));
 

实例 将对象List转另一个对象的List (map)

List<TestSampleDto> lpd = lst.stream().map( (e)-> {
        	TestSampleDto tsd = new TestSampleDto();
        	tsd.setName(e.getName());
    
        	return tsd;
        }).collect(Collectors.toList());

实例 收集stream里的list 合并为一个list (flatMap)

类似把二维集合, 转成一维的

projects.stream()
	.flatMap(p -> p.getComponents().stream())//这里又是一个集合
	.collect(Collectors.toList());//没有外层的(projects)!

实例 将对象List的某个属性转数组(map)

String[] asName = hr.getElements().stream().map(e->{
	return e.getAlias();
}).toArray(String[]::new);

实例 Map转另一个对象的集合(map)

List<Attributes> fields = mappName.entrySet().stream().map( (e)->{
	return new Attributes(e.getKey(), e.getValue());
}).collect(Collectors.toList() );
 

实例 判定是否包含元素 anyMatch

boolean hasTask = allTaskList.stream().anyMatch(e -> STATE_DOWNLOADING == e.getSta());
if (hasTask){//有未完成的任务
 
}