parallel with groupBy (partitioning by first char) #

Map<String, Integer> map = Maps.newHashMap();
map.put("a", 0);
map.put("b", 0);
map.put("c", 0);
System.out.println(map);
System.out.println(LocalDateTime.now());
Flux<GroupedFlux<Character, String>> groupedFluxFlux = Flux.just("a1", "b1", "c1", "a2", "b2", "c2")
		.groupBy(s -> s.charAt(0));
groupedFluxFlux
		.parallel(10)
		.runOn(Schedulers.boundedElastic())
		.flatMap(groupedFlux ->
				groupedFlux.flatMap(str -> {
					String k = str.substring(0, 1);
					int v = Integer.parseInt(str.substring(1));
					int oldV = map.get(k);
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						System.err.println(e);
					}
					int newV = oldV + v;
					map.put(k, newV);
					return Mono.just(groupedFlux.key() + "=" + str);
				})
		)
		.doOnNext(System.out::println)
		.collectSortedList(Comparator.naturalOrder())
		.blockOptional();
System.out.println(map);
System.out.println(LocalDateTime.now());

  • output
{a=0, b=0, c=0}
2023-09-18T23:30:52.853950
a=a1
b=b1
c=c1
a=a2
c=c2
b=b2
{a=3, b=3, c=3}
2023-09-18T23:30:55.244905


parallel without groupBy #

Map<String, Integer> map = Maps.newHashMap();
map.put("a", 0);
map.put("b", 0);
map.put("c", 0);
System.out.println(map);
System.out.println(LocalDateTime.now());
Flux.just("a1", "b1", "c1", "a2", "b2", "c2")
		.parallel(10)
		.runOn(Schedulers.boundedElastic())
		.flatMap(str -> {
			String k = str.substring(0, 1);
			int v = Integer.parseInt(str.substring(1));
			int oldV = map.get(k);
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				System.err.println(e);
			}
			int newV = oldV + v;
			map.put(k, newV);
			return Mono.just(str);
		})
		.doOnNext(System.out::println)
		.collectSortedList(Comparator.naturalOrder())
		.blockOptional();
System.out.println(map);
System.out.println(LocalDateTime.now());

  • output
{a=0, b=0, c=0}
2023-09-18T23:25:23.627444
b1
a1
b2
c2
a2
c1
{a=1, b=2, c=1}
2023-09-18T23:25:24.925120
Valid XHTML 1.0! Valid CSS! powered by MoniWiki
last modified 2023-09-18 23:33:40
Processing time 0.0055 sec