Skip to content

Commit 448ef3a

Browse files
committed
DataStream 中的 DataSet Source 数据源
1 parent cea70a4 commit 448ef3a

8 files changed

Lines changed: 268 additions & 0 deletions
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.javaedge.java.chapter4;
2+
3+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
4+
5+
/**
6+
* @author JavaEdge
7+
* @date 2019-07-20
8+
*/
9+
public class JavaCustomNonParallelSourceFunction implements SourceFunction<Long> {
10+
boolean isRunning = true;
11+
long count = 1;
12+
13+
@Override
14+
public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
15+
while (true) {
16+
ctx.collect(count);
17+
count += 1;
18+
Thread.sleep(1000);
19+
}
20+
}
21+
22+
@Override
23+
public void cancel() {
24+
isRunning = false;
25+
}
26+
}
27+
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.javaedge.java.chapter4;
2+
3+
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
4+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
5+
6+
/**
7+
* @author JavaEdge
8+
* @date 2019-07-20
9+
*/
10+
public class JavaCustomParallelSourceFunction implements ParallelSourceFunction<Long> {
11+
boolean isRunning = true;
12+
long count = 1;
13+
14+
@Override
15+
public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
16+
while (true) {
17+
ctx.collect(count);
18+
count += 1;
19+
Thread.sleep(1000);
20+
}
21+
}
22+
23+
@Override
24+
public void cancel() {
25+
isRunning = false;
26+
}
27+
}
28+
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.javaedge.java.chapter4;
2+
3+
import org.apache.flink.configuration.Configuration;
4+
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
5+
6+
/**
7+
* @author JavaEdge
8+
* @date 2019-07-20
9+
*/
10+
public class JavaCustomRichParallelSourceFunction extends RichParallelSourceFunction<Long> {
11+
boolean isRunning = true;
12+
long count = 1;
13+
14+
@Override
15+
public void run(SourceContext<Long> ctx) throws Exception {
16+
while (true) {
17+
ctx.collect(count);
18+
count += 1;
19+
Thread.sleep(1000);
20+
}
21+
}
22+
23+
@Override
24+
public void open(Configuration parameters) throws Exception {
25+
super.open(parameters);
26+
}
27+
28+
@Override
29+
public void close() throws Exception {
30+
super.close();
31+
}
32+
33+
@Override
34+
public void cancel() {
35+
isRunning = false;
36+
}
37+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.javaedge.java.chapter4;
2+
3+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
4+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
5+
6+
/**
7+
* @author JavaEdge
8+
* @date 2019-07-19
9+
*/
10+
public class JavaDataStreamSourceApp {
11+
12+
public static void main(String[] args) throws Exception {
13+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
14+
15+
// socketFunction(env);
16+
// nonParallelSourceFunction(env);
17+
parallelSourceFunction(env);
18+
19+
env.execute("JavaDataStreamSourceApp");
20+
}
21+
22+
public static void nonParallelSourceFunction(StreamExecutionEnvironment env) {
23+
DataStreamSource<Long> data = env.addSource(new JavaCustomNonParallelSourceFunction());
24+
data.print().setParallelism(1);
25+
}
26+
27+
public static void socketFunction(StreamExecutionEnvironment env) {
28+
DataStreamSource<String> data = env.socketTextStream("localhost", 9999);
29+
data.print().setParallelism(1);
30+
}
31+
32+
public static void richParallelSourceFunction(StreamExecutionEnvironment env) {
33+
DataStreamSource<Long> data = env.addSource(new JavaCustomRichParallelSourceFunction()).setParallelism(2);
34+
data.print().setParallelism(1);
35+
}
36+
37+
38+
public static void parallelSourceFunction(StreamExecutionEnvironment env) {
39+
DataStreamSource<Long> data = env.addSource(new JavaCustomParallelSourceFunction()).setParallelism(2);
40+
data.print().setParallelism(1);
41+
}
42+
43+
44+
//
45+
}
46+
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.javaedge.scala.chapter4
2+
3+
import org.apache.flink.streaming.api.functions.source.SourceFunction
4+
5+
/**
6+
* 自定义
7+
*
8+
* @author JavaEdge
9+
* @date 2019-07-19
10+
*
11+
*/
12+
class CustomNonParallelSourceFunction extends SourceFunction[Long]{
13+
14+
var count = 1L
15+
var isRunning = true
16+
17+
override def cancel(): Unit = {
18+
isRunning = false
19+
}
20+
21+
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
22+
while(isRunning) {
23+
ctx.collect(count)
24+
count += 1
25+
Thread.sleep(1000)
26+
}
27+
}
28+
}
29+
30+
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.javaedge.scala.chapter4
2+
3+
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
4+
5+
/**
6+
* 实现 ParallelSourceFunction
7+
*
8+
* @author JavaEdge
9+
* @date 2019-07-20
10+
*
11+
*/
12+
class CustomParallelSourceFunction extends ParallelSourceFunction[Long]{
13+
14+
var isRunning = true
15+
var count = 1L
16+
17+
override def cancel(): Unit = {
18+
isRunning = false
19+
}
20+
21+
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
22+
while(isRunning) {
23+
ctx.collect(count)
24+
count += 1
25+
Thread.sleep(1000)
26+
}
27+
}
28+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.javaedge.scala.chapter4
2+
3+
import org.apache.flink.configuration.Configuration
4+
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
5+
6+
/**
7+
* @author JavaEdge
8+
* @date 2019-07-20
9+
*/
10+
class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] {
11+
var isRunning = true
12+
var count = 1L
13+
14+
override def cancel(): Unit = {
15+
isRunning = false
16+
}
17+
18+
override def open(parameters: Configuration): Unit = super.open(parameters)
19+
20+
override def close(): Unit = super.close()
21+
22+
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
23+
while (isRunning) {
24+
ctx.collect(count)
25+
count += 1
26+
Thread.sleep(1000)
27+
}
28+
}
29+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.javaedge.scala.chapter4
2+
3+
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
4+
import org.apache.flink.api.scala._
5+
6+
/**
7+
* @author JavaEdge
8+
* @date 2019-07-19
9+
*/
10+
object DataStreamSourceApp {
11+
12+
def main(args: Array[String]): Unit = {
13+
val env = StreamExecutionEnvironment.getExecutionEnvironment
14+
// socketFunction(env)
15+
nonParallelSourceFunction(env)
16+
env.execute("DataStreamSourceApp")
17+
}
18+
19+
def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {
20+
val data = env.addSource(new CustomNonParallelSourceFunction)
21+
data.print().setParallelism(1)
22+
}
23+
24+
def socketFunction(env: StreamExecutionEnvironment): Unit = {
25+
26+
val data = env.socketTextStream("localhost", 9999)
27+
data.print().setParallelism(1)
28+
}
29+
30+
// def richParallelSourceFunction(env:StreamExecutionEnvironment): Unit = {
31+
// val data = env.addSource(new CustomRichParallelSourceFunction).setParallelism(2)
32+
// data.print()
33+
// }
34+
//
35+
// def parallelSourceFunction(env:StreamExecutionEnvironment): Unit = {
36+
// val data = env.addSource(new CustomParallelSourceFunction).setParallelism(2)
37+
// data.print()
38+
// }
39+
//
40+
//
41+
42+
}
43+

0 commit comments

Comments
 (0)