Skip to content

Commit cea70a4

Browse files
committed
DataSet Sink
1 parent 33aac02 commit cea70a4

3 files changed

Lines changed: 56 additions & 1 deletion

File tree

src/main/java/com/javaedge/java/chapter3/DataSetTransformationApp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public static void crossFunction(ExecutionEnvironment env) throws Exception {
3636
List<String> info1 = new ArrayList<>();
3737
info1.add("曼联");
3838
info1.add("曼城");
39-
39+
4040
List<String> info2 = new ArrayList<>();
4141
info2.add("3");
4242
info2.add("1");
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.javaedge.java.chapter3;
2+
3+
/**
4+
* @author sss
5+
* @date 2019-07-18
6+
*/
7+
8+
import org.apache.flink.api.java.ExecutionEnvironment;
9+
import org.apache.flink.api.java.operators.DataSource;
10+
import org.apache.flink.core.fs.FileSystem;
11+
12+
import java.util.ArrayList;
13+
import java.util.List;
14+
15+
/**
16+
* @author JavaEdge
17+
*/
18+
public class JavaDataSetSinkApp {
19+
20+
public static void main(String[] args) throws Exception {
21+
22+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
23+
List<Integer> info = new ArrayList<>();
24+
for (int i = 1; i <= 10; i++) {
25+
info.add(i);
26+
}
27+
28+
String filePath = "/Volumes/doc/data/cp3/sink-out/";
29+
DataSource<Integer> data = env.fromCollection(info);
30+
data.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE);
31+
env.execute("JavaDataSetSinkApp");
32+
}
33+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.javaedge.scala.chapter3
2+
3+
import org.apache.flink.api.scala.ExecutionEnvironment
4+
import org.apache.flink.core.fs.FileSystem.WriteMode
5+
import org.apache.flink.api.scala._
6+
/**
7+
* @author JavaEdge
8+
*
9+
* @date 2019-07-18
10+
*/
11+
object DataSetSinkApp {
12+
def main(args: Array[String]): Unit = {
13+
14+
val env = ExecutionEnvironment.getExecutionEnvironment
15+
val data = 1.to(10)
16+
val text = env.fromCollection(data)
17+
18+
val filePath = "/Volumes/doc/data/cp3/sink-out/"
19+
text.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(5)
20+
env.execute("DataSetSinkApp")
21+
}
22+
}

0 commit comments

Comments
 (0)