Skip to content

Commit 77c9db7

Browse files
committed
DataStream 中的 算子及DataSet Sinks
1 parent 448ef3a commit 77c9db7

10 files changed

Lines changed: 298 additions & 1 deletion

File tree

.idea/dataSources.xml

Lines changed: 19 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/dictionaries/sss.xml

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/inspectionProfiles/Project_Default.xml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/misc.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@
8282
<scope>runtime</scope>
8383
</dependency>
8484

85+
<dependency>
86+
<groupId>mysql</groupId>
87+
<artifactId>mysql-connector-java</artifactId>
88+
<version>8.0.13</version>
89+
</dependency>
90+
8591
<dependency>
8692
<groupId>org.projectlombok</groupId>
8793
<artifactId>lombok</artifactId>
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.javaedge.java.chapter4;
2+
3+
import org.apache.flink.api.common.functions.MapFunction;
4+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
5+
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
6+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
7+
8+
/**
9+
* @author JavaEdge
10+
*
11+
* @date 2019-07-21
12+
*/
13+
public class JavaCustomSinkToMySQL {
14+
public static void main(String[] args) throws Exception {
15+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
16+
17+
DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
18+
19+
SingleOutputStreamOperator<Student> studentStream = source.map(new MapFunction<String, Student>() {
20+
@Override
21+
public Student map(String value) throws Exception {
22+
String[] splits = value.split(",");
23+
Student stu = new Student();
24+
stu.setId(Integer.parseInt(splits[0]));
25+
stu.setName(splits[1]);
26+
stu.setAge(Integer.parseInt(splits[2]));
27+
return stu;
28+
}
29+
});
30+
studentStream.addSink(new SinkToMySQL());
31+
env.execute("JavaCustomSinkToMySQL");
32+
}
33+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.javaedge.java.chapter4;
2+
3+
import org.apache.flink.api.common.functions.FilterFunction;
4+
import org.apache.flink.api.common.functions.MapFunction;
5+
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
6+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
7+
import org.apache.flink.streaming.api.datastream.SplitStream;
8+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
9+
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
13+
/**
14+
* @author JavaEdge
15+
* @date 2019-07-20
16+
*/
17+
public class JavaDataStreamTransformationApp {
18+
public static void main(String[] args) throws Exception {
19+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
20+
// filterFunction(env);
21+
// unionFunction(env);
22+
splitSelectFunction(env);
23+
env.execute("JavaDataStreamTransformationApp");
24+
}
25+
26+
public static void splitSelectFunction(StreamExecutionEnvironment env) {
27+
DataStreamSource<Long> data = env.addSource(new JavaCustomNonParallelSourceFunction());
28+
29+
SplitStream<Long> splits = data.split(new OutputSelector<Long>() {
30+
@Override
31+
public Iterable<String> select(Long value) {
32+
List<String> output = new ArrayList<>();
33+
if (value % 2 == 0) {
34+
output.add("even");
35+
} else {
36+
output.add("odd");
37+
}
38+
return output;
39+
}
40+
});
41+
42+
splits.select("odd").print().setParallelism(1);
43+
}
44+
45+
public static void unionFunction(StreamExecutionEnvironment env) {
46+
DataStreamSource<Long> data1 = env.addSource(new JavaCustomNonParallelSourceFunction());
47+
DataStreamSource<Long> data2 = env.addSource(new JavaCustomNonParallelSourceFunction());
48+
data1.union(data2).print().setParallelism(1);
49+
}
50+
51+
public static void filterFunction(StreamExecutionEnvironment env) {
52+
DataStreamSource<Long> data = env.addSource(new JavaCustomNonParallelSourceFunction());
53+
data.map(new MapFunction<Long, Long>() {
54+
@Override
55+
public Long map(Long value) throws Exception {
56+
System.out.println("value = [" + value + "]");
57+
return value;
58+
}
59+
}).filter(new FilterFunction<Long>() {
60+
@Override
61+
public boolean filter(Long value) throws Exception {
62+
return value % 2 == 0;
63+
}
64+
}).print().setParallelism(1);
65+
}
66+
67+
68+
}
69+
70+
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.javaedge.java.chapter4;
2+
3+
import org.apache.flink.configuration.Configuration;
4+
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
5+
6+
import java.sql.Connection;
7+
import java.sql.DriverManager;
8+
import java.sql.PreparedStatement;
9+
10+
/**
11+
* @author JavaEdge
12+
*
13+
* @date 2019-07-21
14+
*/
15+
public class SinkToMySQL extends RichSinkFunction<Student>{
16+
17+
Connection connection;
18+
PreparedStatement pstmt;
19+
20+
21+
private Connection getConnection() {
22+
Connection conn = null;
23+
try {
24+
Class.forName("com.mysql.jdbc.Driver");
25+
26+
String url = "jdbc:mysql://localhost:3306/javaedge_flink";
27+
28+
conn = DriverManager.getConnection(url,"root","root");
29+
30+
} catch (Exception e) {
31+
e.printStackTrace();
32+
}
33+
34+
return conn;
35+
}
36+
37+
/**
38+
* 在open方法中建立connection
39+
* @param parameters
40+
* @throws Exception
41+
*/
42+
@Override
43+
public void open(Configuration parameters) throws Exception {
44+
super.open(parameters);
45+
46+
connection = getConnection();
47+
String sql = "insert into student(id,name,age) values (?,?,?)";
48+
pstmt = connection.prepareStatement(sql);
49+
50+
51+
System.out.println("open");
52+
53+
}
54+
55+
/**
56+
* 每条记录插入时调用一次
57+
*
58+
* @param value
59+
* @param context
60+
* @throws Exception
61+
*/
62+
@Override
63+
public void invoke(Student value, Context context) throws Exception {
64+
System.out.println("invoke~~~~~~~~~");
65+
// 未前面的占位符赋值
66+
pstmt.setInt(1, value.getId());
67+
pstmt.setString(2, value.getName());
68+
pstmt.setInt(3, value.getAge());
69+
70+
pstmt.executeUpdate();
71+
72+
}
73+
74+
/**
75+
* 在close方法中要释放资源
76+
* @throws Exception
77+
*/
78+
@Override
79+
public void close() throws Exception {
80+
super.close();
81+
82+
if(pstmt != null) {
83+
pstmt.close();
84+
}
85+
86+
if(connection != null) {
87+
connection.close();
88+
}
89+
}
90+
}
91+
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.javaedge.java.chapter4;
2+
3+
import lombok.Data;
4+
5+
/**
6+
* @author JavaEdge
7+
* @date 2019-07-21
8+
*/
9+
@Data
10+
public class Student {
11+
12+
private int id;
13+
14+
private String name;
15+
16+
private int age;
17+
}
18+
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.javaedge.scala.chapter4
2+
3+
import java.{lang, util}
4+
5+
import org.apache.flink.streaming.api.collector.selector.OutputSelector
6+
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
7+
8+
/**
9+
* @author JavaEdge
10+
* @date 2019-07-20
11+
*/
12+
object DataStreamTransformationApp {
13+
14+
def main(args: Array[String]): Unit = {
15+
val env = StreamExecutionEnvironment.getExecutionEnvironment
16+
// filterFunction(env)
17+
unionFunction(env)
18+
env.execute("DataStreamTransformationApp")
19+
}
20+
21+
def splitSelectFunction(env: StreamExecutionEnvironment): Unit = {
22+
import org.apache.flink.api.scala._
23+
val data = env.addSource(new CustomNonParallelSourceFunction)
24+
25+
val splits = data.split(new OutputSelector[Long] {
26+
override def select(value: Long): lang.Iterable[String] = {
27+
val list = new util.ArrayList[String]()
28+
if (value % 2 == 0) {
29+
list.add("even")
30+
} else {
31+
list.add("odd")
32+
}
33+
list
34+
}
35+
})
36+
splits.select("even", "odd").print().setParallelism(1)
37+
}
38+
39+
def unionFunction(env: StreamExecutionEnvironment): Unit = {
40+
import org.apache.flink.api.scala._
41+
val data1 = env.addSource(new CustomNonParallelSourceFunction)
42+
val data2 = env.addSource(new CustomNonParallelSourceFunction)
43+
data1.union(data2).print().setParallelism(1)
44+
}
45+
46+
def filterFunction(env: StreamExecutionEnvironment): Unit = {
47+
import org.apache.flink.api.scala._
48+
val data = env.addSource(new CustomNonParallelSourceFunction)
49+
50+
data.map(x => {
51+
println("received: " + x)
52+
x
53+
}).filter(_ % 2 == 0).print().setParallelism(1)
54+
}
55+
56+
57+
}

0 commit comments

Comments
 (0)