Skip to content

Commit f53f25b

Browse files
committed
transformation算子
1 parent d6b3f6e commit f53f25b

10 files changed

Lines changed: 218 additions & 28 deletions

File tree

.idea/misc.xml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# 0 Flink 从入门到实战 - Scala版本
1+
# Flink 从入门到实战 - Scala/Java双语言版本
22

33
# 1 核心知识点
44
![](https://upload-images.jianshu.io/upload_images/16782311-dc4156dc0a34d557.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
@@ -79,19 +79,11 @@ Flink常见的监控方式
7979

8080
常见的Flink调优策略
8181

82-
# 2 实战项目:基于Flink的互联网直播平台日志分析
83-
#### 互联网直播平台日志分析(以Scala语言开发)
84-
![](https://upload-images.jianshu.io/upload_images/16782311-ff3bd3a8d89089ba.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
85-
86-
# 3 以下知名公司都在使用Flink
82+
# 2 以下知名公司都在使用Flink
8783
![](https://upload-images.jianshu.io/upload_images/16782311-9447a7c96178832b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
8884

8985
# X 交流学习
9086
![](https://upload-images.jianshu.io/upload_images/16782311-8d7acde57fdce062.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
91-
9287
## [Java交流群](https://jq.qq.com/?_wv=1027&k=5UB4P1T)
93-
![](https://upload-images.jianshu.io/upload_images/16782311-11d3533436ffbbdf.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
94-
9588
## [博客](http://www.shishusheng.com)
96-
9789
## [Github](https://github.com/Wasabi1234)

pom.xml

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,3 @@
1-
<!--
2-
Licensed to the Apache Software Foundation (ASF) under one
3-
or more contributor license agreements. See the NOTICE file
4-
distributed with this work for additional information
5-
regarding copyright ownership. The ASF licenses this file
6-
to you under the Apache License, Version 2.0 (the
7-
"License"); you may not use this file except in compliance
8-
with the License. You may obtain a copy of the License at
9-
10-
http://www.apache.org/licenses/LICENSE-2.0
11-
12-
Unless required by applicable law or agreed to in writing,
13-
software distributed under the License is distributed on an
14-
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15-
KIND, either express or implied. See the License for the
16-
specific language governing permissions and limitations
17-
under the License.
18-
-->
191
<project xmlns="http://maven.apache.org/POM/4.0.0"
202
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
213
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -99,6 +81,12 @@ under the License.
9981
<version>1.2.17</version>
10082
<scope>runtime</scope>
10183
</dependency>
84+
85+
<dependency>
86+
<groupId>org.projectlombok</groupId>
87+
<artifactId>lombok</artifactId>
88+
<version>1.18.8</version>
89+
</dependency>
10290
</dependencies>
10391

10492
<build>

src/main/java/TestJava.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/**
2+
* @author JavaEdge
3+
*
4+
* @date 2019-07-16
5+
*/
6+
public class TestJava {
7+
8+
public static void main(String[] args) {
9+
System.out.println("Hello Java!");
10+
}
11+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.javaedge.java.chapter3;
2+
3+
import org.apache.flink.api.common.functions.MapFunction;
4+
import org.apache.flink.api.java.ExecutionEnvironment;
5+
import org.apache.flink.api.java.operators.DataSource;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
10+
/**
11+
* @author JavaEdge
12+
* @date 2019-07-17
13+
*/
14+
public class DataSetTransformationApp {
15+
16+
public static void main(String[] args) throws Exception {
17+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
18+
mapFunction(env);
19+
}
20+
21+
public static void mapFunction(ExecutionEnvironment env) throws Exception {
22+
List<Integer> list = new ArrayList<>(10);
23+
for (int i = 0; i < 10; i++) {
24+
list.add(i);
25+
}
26+
DataSource<Integer> data = env.fromCollection(list);
27+
data.map((MapFunction<Integer, Integer>) input -> input + 1).print();
28+
}
29+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.javaedge.java.chapter3;
2+
3+
import org.apache.flink.api.java.ExecutionEnvironment;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
8+
/**
9+
* @author JavaEdge
10+
*
11+
* @date 2019-07-16
12+
*/
13+
public class JavaDataSetDataSourceApp {
14+
15+
public static void main(String[] args) throws Exception {
16+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
17+
// fromCollection(env);
18+
textFile(env);
19+
}
20+
21+
public static void textFile(ExecutionEnvironment env) throws Exception {
22+
String filePath = "file:///Volumes/doc/data/data.txt";
23+
env.readTextFile(filePath).print();
24+
System.out.println("===========~这是一个分割线~============");
25+
26+
filePath = "file:///Volumes/doc/data/inputs";
27+
env.readTextFile(filePath).print();
28+
}
29+
public static void fromCollection(ExecutionEnvironment env) throws Exception {
30+
List<Integer> list = new ArrayList<>(10);
31+
for (int i = 0; i < 10; i++) {
32+
list.add(i);
33+
}
34+
env.fromCollection(list).print();
35+
}
36+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.javaedge.java.chapter3;
2+
3+
import lombok.Data;
4+
import lombok.ToString;
5+
6+
/**
7+
* @author JavaEdge
8+
*
9+
* @date 2019-07-17
10+
*/
11+
@Data
12+
@ToString
13+
public class Person {
14+
private String name;
15+
private int age;
16+
private String job;
17+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.javaedge.scala.chapter3
2+
3+
import org.apache.flink.api.scala.ExecutionEnvironment
4+
import org.apache.flink.api.scala._
5+
6+
/**
7+
* @author JavaEdge
8+
* @date 2019-07-17
9+
*
10+
*/
11+
object DataSetTransformationApp {
12+
13+
def main(args: Array[String]): Unit = {
14+
val env = ExecutionEnvironment.getExecutionEnvironment
15+
// mapFunction(env)
16+
filterFunction(env)
17+
}
18+
19+
def filterFunction(env: ExecutionEnvironment): Unit = {
20+
// val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
21+
// data.map(_ + 1).filter(_ > 5).print()
22+
23+
env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
24+
.map(_ + 1)
25+
.filter(_ > 5)
26+
.print()
27+
}
28+
29+
def mapFunction(env: ExecutionEnvironment): Unit = {
30+
import org.apache.flink.api.scala._
31+
val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
32+
33+
// data.print()
34+
// 对data中的元素都做加一处理
35+
// data.map((x: Int) => x + 1).print()
36+
// data.map((x) => x + 1).print()
37+
// data.map(x => x + 1).print()
38+
// 终极写法
39+
data.map(_ + 1).print()
40+
}
41+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.javaedge.scala.chapter3
2+
3+
import com.javaedge.java.chapter3.Person
4+
import org.apache.flink.api.scala.ExecutionEnvironment
5+
import org.apache.flink.configuration.Configuration
6+
7+
/**
8+
* @author JavaEdge
9+
* @date 2019-07-16
10+
*/
11+
object ScalaDataSetDataSourceApp {
12+
def main(args: Array[String]): Unit = {
13+
val env = ExecutionEnvironment.getExecutionEnvironment
14+
// fromCollection(env)
15+
// textFile(env)
16+
// csvFile(env)
17+
readCompressionFiles(env)
18+
}
19+
20+
def readCompressionFiles(env:ExecutionEnvironment): Unit ={
21+
val filePath = "file:///Volumes/doc/data/compress"
22+
env.readTextFile(filePath).print()
23+
}
24+
25+
def readRecursiveFiles(env: ExecutionEnvironment): Unit = {
26+
//
27+
val filePath = "file:///Volumes/doc/data/nested"
28+
env.readTextFile(filePath).print()
29+
println("===========~这是一个分割线~============")
30+
31+
// ✔️
32+
val parameters = new Configuration
33+
parameters.setBoolean("recursive.file.enumeration", true)
34+
env.readTextFile(filePath).withParameters(parameters).print()
35+
}
36+
37+
def csvFile(env: ExecutionEnvironment): Unit = {
38+
import org.apache.flink.api.scala._
39+
val filePath = "file:///Volumes/doc/data/data.csv"
40+
// env.readCsvFile[(String, Int, String)](filePath, ignoreFirstLine = true).print()
41+
// env.readCsvFile[(String, Int)](filePath, ignoreFirstLine = true,includedFields = Array(0, 1)).print()
42+
43+
/*无法正常运行
44+
case class MyCaseClass(name: String, age: Int)
45+
env.readCsvFile[MyCaseClass](filePath,ignoreFirstLine = true,includedFields = Array(0, 1)).print()
46+
*/
47+
env.readCsvFile[Person](filePath, ignoreFirstLine = true, pojoFields = Array("name", "age", "job")).print()
48+
}
49+
50+
def textFile(env: ExecutionEnvironment): Unit = {
51+
val filePath = "file:///Volumes/doc/data/inputs"
52+
env.readTextFile(filePath).print()
53+
}
54+
55+
def fromCollection(env: ExecutionEnvironment): Unit = {
56+
import org.apache.flink.api.scala._
57+
val data = 1 to 10
58+
env.fromCollection(data).print()
59+
}
60+
}

src/test/scala/TestScala.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/**
2+
* @author JavaEdge
3+
*
4+
* @date 2019-07-16
5+
*
6+
*/
7+
object TestScala {
8+
9+
def main(args: Array[String]): Unit = {
10+
println("Hello Scala!")
11+
}
12+
13+
}

0 commit comments

Comments
 (0)