11package com .javaedge .java .chapter3 ;
22
3- import org .apache .flink .api .common .functions .MapFunction ;
3+ import com .javaedge .scala .chapter3 .DBUtils ;
4+ import org .apache .flink .api .common .functions .*;
5+ import org .apache .flink .api .common .operators .Order ;
46import org .apache .flink .api .java .ExecutionEnvironment ;
57import org .apache .flink .api .java .operators .DataSource ;
8+ import org .apache .flink .api .java .tuple .Tuple2 ;
9+ import org .apache .flink .api .java .tuple .Tuple3 ;
10+ import org .apache .flink .util .Collector ;
611
712import java .util .ArrayList ;
813import java .util .List ;
@@ -15,7 +20,215 @@ public class DataSetTransformationApp {
1520
1621 public static void main (String [] args ) throws Exception {
1722 ExecutionEnvironment env = ExecutionEnvironment .getExecutionEnvironment ();
18- mapFunction (env );
23+ // mapFunction(env);
24+ // filterFunction(env);
25+ // mapPartitionFunction(env);
26+ // firstFunction(env);
27+ // flatMapFunction(env);
28+ // distinctFunction(env);
29+ // joinFunction(env);
30+ crossFunction (env );
31+
32+ }
33+
34+
35+ public static void crossFunction (ExecutionEnvironment env ) throws Exception {
36+ List <String > info1 = new ArrayList <>();
37+ info1 .add ("曼联" );
38+ info1 .add ("曼城" );
39+
40+ List <String > info2 = new ArrayList <>();
41+ info2 .add ("3" );
42+ info2 .add ("1" );
43+ info2 .add ("0" );
44+
45+ DataSource <String > data1 = env .fromCollection (info1 );
46+ DataSource <String > data2 = env .fromCollection (info2 );
47+
48+ data1 .cross (data2 ).print ();
49+ }
50+
51+ public static void outerJoinFunction (ExecutionEnvironment env ) throws Exception {
52+ List <Tuple2 <Integer , String >> info1 = new ArrayList <>();
53+ info1 .add (new Tuple2 (1 , "JavaEdge" ));
54+ info1 .add (new Tuple2 (2 , "公众号" ));
55+ info1 .add (new Tuple2 (3 , "全是干货" ));
56+ info1 .add (new Tuple2 (4 , "进击架构师" ));
57+
58+
59+ List <Tuple2 <Integer , String >> info2 = new ArrayList <>();
60+ info2 .add (new Tuple2 (1 , "北京" ));
61+ info2 .add (new Tuple2 (2 , "上海" ));
62+ info2 .add (new Tuple2 (3 , "成都" ));
63+ info2 .add (new Tuple2 (5 , "杭州" ));
64+
65+ DataSource <Tuple2 <Integer , String >> data1 = env .fromCollection (info1 );
66+ DataSource <Tuple2 <Integer , String >> data2 = env .fromCollection (info2 );
67+
68+ data1 .leftOuterJoin (data2 ).where (0 ).equalTo (0 ).with (new JoinFunction <Tuple2 <Integer , String >, Tuple2 <Integer , String >, Tuple3 <Integer , String , String >>() {
69+ @ Override
70+ public Tuple3 <Integer , String , String > join (Tuple2 <Integer , String > first , Tuple2 <Integer , String > second ) throws Exception {
71+ if (second == null ) {
72+ return new Tuple3 <Integer , String , String >(first .f0 , first .f1 , "-" );
73+
74+ } else {
75+ return new Tuple3 <Integer , String , String >(first .f0 , first .f1 , second .f1 );
76+ }
77+ }
78+ }).print ();
79+
80+ data1 .rightOuterJoin (data2 ).where (0 ).equalTo (0 ).with (new JoinFunction <Tuple2 <Integer , String >, Tuple2 <Integer , String >, Tuple3 <Integer , String , String >>() {
81+ @ Override
82+ public Tuple3 <Integer , String , String > join (Tuple2 <Integer , String > first , Tuple2 <Integer , String > second ) throws Exception {
83+ if (first == null ) {
84+ return new Tuple3 <Integer , String , String >(second .f0 , "-" , second .f1 );
85+ } else {
86+ return new Tuple3 <Integer , String , String >(first .f0 , first .f1 , second .f1 );
87+ }
88+ }
89+ }).print ();
90+
91+
92+ data1 .fullOuterJoin (data2 ).where (0 ).equalTo (0 ).with (new JoinFunction <Tuple2 <Integer , String >, Tuple2 <Integer , String >, Tuple3 <Integer , String , String >>() {
93+ @ Override
94+ public Tuple3 <Integer , String , String > join (Tuple2 <Integer , String > first , Tuple2 <Integer , String > second ) throws Exception {
95+ if (first == null ) {
96+ return new Tuple3 <Integer , String , String >(second .f0 , "-" , second .f1 );
97+ } else if (second == null ) {
98+ return new Tuple3 <Integer , String , String >(first .f0 , first .f1 , "-" );
99+ } else {
100+ return new Tuple3 <Integer , String , String >(first .f0 , first .f1 , second .f1 );
101+ }
102+ }
103+ }).print ();
104+
105+ }
106+
107+ public static void joinFunction (ExecutionEnvironment env ) throws Exception {
108+ List <Tuple2 <Integer , String >> info1 = new ArrayList <>();
109+ info1 .add (new Tuple2 (1 , "JavaEdge" ));
110+ info1 .add (new Tuple2 (2 , "公众号" ));
111+ info1 .add (new Tuple2 (3 , "全是干货" ));
112+ info1 .add (new Tuple2 (4 , "进击架构师" ));
113+
114+
115+ List <Tuple2 <Integer , String >> info2 = new ArrayList <>();
116+ info2 .add (new Tuple2 (1 , "北京" ));
117+ info2 .add (new Tuple2 (2 , "上海" ));
118+ info2 .add (new Tuple2 (3 , "成都" ));
119+ info2 .add (new Tuple2 (5 , "杭州" ));
120+
121+ DataSource <Tuple2 <Integer , String >> data1 = env .fromCollection (info1 );
122+ DataSource <Tuple2 <Integer , String >> data2 = env .fromCollection (info2 );
123+
124+ data1 .join (data2 ).where (0 ).equalTo (0 ).with (new JoinFunction <Tuple2 <Integer , String >, Tuple2 <Integer , String >, Tuple3 <Integer , String , String >>() {
125+ @ Override
126+ public Tuple3 <Integer , String , String > join (Tuple2 <Integer , String > first , Tuple2 <Integer , String > second ) throws Exception {
127+ return new Tuple3 <>(first .f0 , first .f1 , second .f1 );
128+ }
129+ }).print ();
130+
131+ }
132+
133+ public static void distinctFunction (ExecutionEnvironment env ) throws Exception {
134+ List <String > info = new ArrayList <>();
135+ info .add ("hadoop,spark" );
136+ info .add ("hadoop,flink" );
137+ info .add ("flink,flink" );
138+
139+ DataSource <String > data = env .fromCollection (info );
140+
141+ data .flatMap (new FlatMapFunction <String , String >() {
142+ @ Override
143+ public void flatMap (String input , Collector <String > collector ) throws Exception {
144+ String [] splits = input .split ("," );
145+ for (String split : splits ) {
146+ collector .collect (split );
147+ }
148+ }
149+ }).distinct ().print ();
150+ }
151+
152+ public static void flatMapFunction (ExecutionEnvironment env ) throws Exception {
153+ List <String > info = new ArrayList <>();
154+ info .add ("hadoop,spark" );
155+ info .add ("hadoop,flink" );
156+ info .add ("flink,flink" );
157+
158+ DataSource <String > data = env .fromCollection (info );
159+
160+ data .flatMap (new FlatMapFunction <String , String >() {
161+ @ Override
162+ public void flatMap (String input , Collector <String > collector ) throws Exception {
163+ String [] splits = input .split ("," );
164+ for (String split : splits ) {
165+ collector .collect (split );
166+ }
167+ }
168+ }).map (new MapFunction <String , Tuple2 <String , Integer >>() {
169+ @ Override
170+ public Tuple2 <String , Integer > map (String s ) throws Exception {
171+ return new Tuple2 <>(s , 1 );
172+ }
173+ }).groupBy (0 ).sum (1 ).print ();
174+
175+ }
176+
177+ public static void firstFunction (ExecutionEnvironment env ) throws Exception {
178+ List <Tuple2 <Integer , String >> info = new ArrayList <>();
179+ info .add (new Tuple2 (1 , "HDFS" ));
180+ info .add (new Tuple2 (1 , "Spark" ));
181+ info .add (new Tuple2 (1 , "Flink" ));
182+ info .add (new Tuple2 (2 , "Java" ));
183+ info .add (new Tuple2 (2 , "Spring Cloud" ));
184+ info .add (new Tuple2 (3 , "Linux" ));
185+ info .add (new Tuple2 (4 , "Vue" ));
186+
187+ DataSource <Tuple2 <Integer , String >> data = env .fromCollection (info );
188+
189+ data .first (3 ).print ();
190+ System .out .println ("======~这只是一道分割线~======" );
191+
192+ data .groupBy (0 ).first (2 ).print ();
193+ System .out .println ("======~这只是一道分割线~======" );
194+
195+ data .groupBy (0 ).sortGroup (1 , Order .DESCENDING ).first (2 ).print ();
196+
197+ }
198+
199+ public static void mapPartitionFunction (ExecutionEnvironment env ) throws Exception {
200+ List <String > list = new ArrayList <>(10 );
201+ for (int i = 0 ; i < 100 ; i ++) {
202+ list .add ("student: " + i );
203+ }
204+ DataSource <String > data = env .fromCollection (list ).setParallelism (6 );
205+
206+ // data.map((MapFunction<String, String>) input -> {
207+ // String connection = DBUtils.getConnection();
208+ // System.out.println("connection = [" + connection + "]");
209+ // DBUtils.returnConnection(connection);
210+ // return input;
211+ // }).print();
212+
213+ data .mapPartition (new MapPartitionFunction <String , String >() {
214+ @ Override
215+ public void mapPartition (Iterable <String > values , Collector <String > out ) throws Exception {
216+ String connection = DBUtils .getConnection ();
217+ System .out .println ("connection = [" + connection + "]" );
218+ DBUtils .returnConnection (connection );
219+ }
220+ }).print ();
221+ }
222+
223+ public static void filterFunction (ExecutionEnvironment env ) throws Exception {
224+ List <Integer > list = new ArrayList <>(10 );
225+ for (int i = 0 ; i < 10 ; i ++) {
226+ list .add (i );
227+ }
228+ DataSource <Integer > data = env .fromCollection (list );
229+ data .map ((MapFunction <Integer , Integer >) input -> input + 1 )
230+ .filter ((FilterFunction <Integer >) input -> input > 5 )
231+ .print ();
19232 }
20233
21234 public static void mapFunction (ExecutionEnvironment env ) throws Exception {
0 commit comments