Skip to content

Commit e9e2ae0

Browse files
committed
Time & Windows编程
1 parent 77c9db7 commit e9e2ae0

11 files changed

Lines changed: 322 additions & 3 deletions

File tree

.idea/dictionaries/sss.xml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/inspectionProfiles/Project_Default.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pom.xml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<version>1.0</version>
99
<packaging>jar</packaging>
1010

11-
<name>Flink Quickstart Job</name>
11+
<name>Flink Tutorial</name>
1212
<url>http://www.myorganization.org</url>
1313

1414
<repositories>
@@ -48,6 +48,18 @@
4848
<scope>provided</scope>
4949
</dependency>
5050

51+
<dependency>
52+
<groupId>org.apache.flink</groupId>
53+
<artifactId>flink-table-planner_2.11</artifactId>
54+
<version>${flink.version}</version>
55+
</dependency>
56+
57+
<dependency>
58+
<groupId>org.apache.flink</groupId>
59+
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
60+
<version>${flink.version}</version>
61+
</dependency>
62+
5163
<!-- Scala Library, provided by Flink as well. -->
5264
<dependency>
5365
<groupId>org.scala-lang</groupId>

src/main/java/TestJava.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,27 @@
1+
import org.relaxng.datatype.DatatypeException;
2+
3+
import java.text.ParseException;
4+
import java.text.SimpleDateFormat;
5+
import java.util.Date;
6+
17
/**
28
* @author JavaEdge
39
*
410
* @date 2019-07-16
511
*/
612
public class TestJava {
713

8-
public static void main(String[] args) {
9-
System.out.println("Hello Java!");
14+
public static void main(String[] args) throws ParseException {
15+
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/DD HH:mm:ss");
16+
Date begin = sdf.parse("1900/01/01 00:00:00");
17+
18+
System.out.println(begin);
19+
Date target = sdf.parse("1900/01/02 00:00:01");
20+
System.out.println(target);
21+
22+
long mid = target.getTime() - begin.getTime();
23+
System.out.println(mid);
1024
}
25+
26+
1127
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.javaedge.java.chapter5;
2+
3+
import org.apache.flink.api.java.DataSet;
4+
import org.apache.flink.api.java.ExecutionEnvironment;
5+
import org.apache.flink.table.api.Table;
6+
import org.apache.flink.table.api.java.BatchTableEnvironment;
7+
import org.apache.flink.types.Row;
8+
9+
/**
10+
* @author JavaEdge
11+
* @date 2019-07-21
12+
*/
13+
public class JavaTableSQLAPI {
14+
15+
public static void main(String[] args) throws Exception {
16+
17+
18+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
19+
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
20+
21+
String filePath = "file:///Volumes/doc/IDEAProjects/Flink-Tuitiorial/flink-train-scala/src/main/resources/sales.csv";
22+
23+
DataSet<Sales> csv = env.readCsvFile(filePath)
24+
.ignoreFirstLine()
25+
.pojoType(Sales.class,"transactionId","customerId","itemId","amountPaid");
26+
//csv.print();
27+
28+
Table sales = tableEnv.fromDataSet(csv);
29+
tableEnv.registerTable("sales", sales);
30+
Table resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId");
31+
32+
DataSet<Row> result = tableEnv.toDataSet(resultTable, Row.class);
33+
result.print();
34+
}
35+
36+
public static class Sales{
37+
public String transactionId;
38+
public String customerId;
39+
public String itemId;
40+
public Double amountPaid;
41+
}
42+
}
43+
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.javaedge.java.chapter6;
2+
3+
import org.apache.flink.api.common.functions.FlatMapFunction;
4+
import org.apache.flink.api.java.tuple.Tuple2;
5+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
6+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
7+
import org.apache.flink.streaming.api.windowing.time.Time;
8+
import org.apache.flink.util.Collector;
9+
10+
/**
11+
* @author JavaEdge
12+
*
13+
* @date 2019-07-23
14+
*/
15+
public class JavaWindowsApp {
16+
17+
public static void main(String[] args) throws Exception {
18+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
19+
DataStreamSource<String> text = env.socketTextStream("localhost",9999);
20+
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
21+
@Override
22+
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
23+
String[] tokens = value.toLowerCase().split(",");
24+
for(String token : tokens) {
25+
if(token.length() > 0) {
26+
out.collect(new Tuple2<>(token, 1));
27+
}
28+
}
29+
}
30+
}).keyBy(0)
31+
.timeWindow(Time.seconds(5))
32+
.sum(1)
33+
.print()
34+
.setParallelism(1);
35+
env.execute("JavaWindowsApp");
36+
}
37+
}
38+
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.javaedge.java.chapter6;
2+
3+
import org.apache.flink.api.common.functions.FlatMapFunction;
4+
import org.apache.flink.api.java.tuple.Tuple;
5+
import org.apache.flink.api.java.tuple.Tuple2;
6+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
7+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
8+
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
9+
import org.apache.flink.streaming.api.windowing.time.Time;
10+
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
11+
import org.apache.flink.util.Collector;
12+
13+
/**
14+
* @author JavaEdge
15+
*
16+
* @date 2019-07-23
17+
*/
18+
public class JavaWindowsProcessApp {
19+
20+
public static void main(String[] args) throws Exception {
21+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
22+
DataStreamSource<String> text = env.socketTextStream("localhost", 9999);
23+
24+
text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() {
25+
@Override
26+
public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
27+
String[] tokens = value.toLowerCase().split(",");
28+
for (String token : tokens) {
29+
if (token.length() > 0) {
30+
out.collect(new Tuple2<>(1, Integer.parseInt(token)));
31+
}
32+
}
33+
}
34+
}).keyBy(0)
35+
.timeWindow(Time.seconds(5))
36+
.process(new ProcessWindowFunction<Tuple2<Integer, Integer>, Object, Tuple, TimeWindow>() {
37+
@Override
38+
public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<Object> out) throws Exception {
39+
long count = 0;
40+
for (Tuple2<Integer, Integer> in : elements) {
41+
count++;
42+
}
43+
out.collect("Window: " + context.window() + "count: " + count);
44+
}
45+
})
46+
.print()
47+
.setParallelism(1);
48+
env.execute("JavaWindowsReduceApp");
49+
}
50+
}
51+
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.javaedge.java.chapter6;
2+
3+
import org.apache.flink.api.common.functions.FlatMapFunction;
4+
import org.apache.flink.api.common.functions.ReduceFunction;
5+
import org.apache.flink.api.java.tuple.Tuple2;
6+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
7+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
8+
import org.apache.flink.streaming.api.windowing.time.Time;
9+
import org.apache.flink.util.Collector;
10+
11+
/**
12+
* @author JavaEdge
13+
*
14+
* @date 2019-07-23
15+
*/
16+
public class JavaWindowsReduceApp {
17+
18+
public static void main(String[] args) throws Exception {
19+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
20+
DataStreamSource<String> text = env.socketTextStream("localhost",9999);
21+
22+
text.flatMap(new FlatMapFunction<String, Tuple2<Integer,Integer>>() {
23+
@Override
24+
public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception {
25+
String[] tokens = value.toLowerCase().split(",");
26+
for(String token : tokens) {
27+
if(token.length() > 0) {
28+
out.collect(new Tuple2<>(1, Integer.parseInt(token)));
29+
}
30+
}
31+
}
32+
}).keyBy(0)
33+
.timeWindow(Time.seconds(5))
34+
.reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
35+
@Override
36+
public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
37+
System.out.println("value1 = [" + value1 + "], value2 = [" + value2 + "]");
38+
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
39+
}
40+
})
41+
.print()
42+
.setParallelism(1);
43+
env.execute("JavaWindowsReduceApp");
44+
}
45+
}
46+
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.javaedge.scala.chapter5
2+
3+
import org.apache.flink.api.scala.ExecutionEnvironment
4+
import org.apache.flink.table.api.TableEnvironment
5+
import org.apache.flink.types.Row
6+
import org.apache.flink.api.scala._
7+
8+
/**
9+
* @author JavaEdge
10+
* @date 2019-07-21
11+
*
12+
*/
13+
object TableSQLAPI {
14+
15+
def main(args: Array[String]): Unit = {
16+
17+
val env = ExecutionEnvironment.getExecutionEnvironment
18+
val tableEnv = TableEnvironment.getTableEnvironment(env)
19+
val filePath = "file:///Volumes/doc/IDEAProjects/Flink-Tuitiorial/flink-train-scala/src/main/resources/sales.csv"
20+
21+
// 已经拿到DataSet
22+
val csv = env.readCsvFile[SalesLog](filePath, ignoreFirstLine = true)
23+
//csv.print()
24+
25+
// DataSet ==> Table
26+
val salesTable = tableEnv.fromDataSet(csv)
27+
28+
// Table ==> table
29+
tableEnv.registerTable("sales", salesTable)
30+
31+
// SQL
32+
val resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId")
33+
34+
tableEnv.toDataSet[Row](resultTable).print()
35+
36+
}
37+
38+
case class SalesLog(transactionId: String,
39+
customerId: String,
40+
itemId: String,
41+
amountPaid: Double)
42+
43+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.javaedge.scala.chapter6
2+
3+
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
4+
import org.apache.flink.streaming.api.windowing.time.Time
5+
import org.apache.flink.api.scala._
6+
7+
/**
8+
* @author JavaEdge
9+
*
10+
* @date 2019-07-23
11+
*/
12+
object WindowsApp {
13+
14+
def main(args: Array[String]): Unit = {
15+
16+
val env = StreamExecutionEnvironment.getExecutionEnvironment
17+
val text = env.socketTextStream("localhost", 9999)
18+
19+
text.flatMap(_.split(","))
20+
.map((_,1))
21+
.keyBy(0)
22+
//.timeWindow(Time.seconds(5))
23+
.timeWindow(Time.seconds(10), Time.seconds(5))
24+
.sum(1)
25+
.print()
26+
.setParallelism(1)
27+
env.execute("WindowsApp")
28+
}
29+
}

0 commit comments

Comments
 (0)