Skip to content

Commit c3ee72b

Browse files
committed
Streaming Connectors 编程
1 parent e9e2ae0 commit c3ee72b

4 files changed

Lines changed: 148 additions & 0 deletions

File tree

pom.xml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
<flink.version>1.8.0</flink.version>
3131
<scala.binary.version>2.11</scala.binary.version>
3232
<scala.version>2.11.12</scala.version>
33+
<hadoop.version>2.6.0</hadoop.version>
3334
</properties>
3435

3536
<dependencies>
@@ -105,6 +106,37 @@
105106
<artifactId>lombok</artifactId>
106107
<version>1.18.8</version>
107108
</dependency>
109+
110+
<dependency>
111+
<groupId>org.apache.flink</groupId>
112+
<artifactId>flink-connector-filesystem_2.11</artifactId>
113+
<version>${flink.version}</version>
114+
</dependency>
115+
116+
<dependency>
117+
<groupId>org.apache.hadoop</groupId>
118+
<artifactId>hadoop-client</artifactId>
119+
<version>${hadoop.version}</version>
120+
</dependency>
121+
122+
<dependency>
123+
<groupId>org.apache.flink</groupId>
124+
<artifactId>flink-connector-kafka_2.11</artifactId>
125+
<version>${flink.version}</version>
126+
</dependency>
127+
128+
<dependency>
129+
<groupId>org.apache.flink</groupId>
130+
<artifactId>flink-connector-kafka_2.11</artifactId>
131+
<version>${flink.version}</version>
132+
</dependency>
133+
134+
<!-- <dependency>-->
135+
<!-- <groupId>org.apache.flink</groupId>-->
136+
<!-- <artifactId>flink-avro</artifactId>-->
137+
<!-- <version>${flink.version}</version>-->
138+
<!-- </dependency>-->
139+
108140
</dependencies>
109141

110142
<build>
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.javaedge.scala.chapter7
2+
3+
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
4+
import org.apache.flink.streaming.connectors.fs.StringWriter
5+
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer}
6+
7+
/**
8+
* @author JavaEdge
9+
* @date 2019-07-23
10+
*/
11+
object FileSystemSinkApp {
12+
13+
def main(args: Array[String]): Unit = {
14+
val env = StreamExecutionEnvironment.getExecutionEnvironment
15+
val data = env.socketTextStream("localhost", 9999)
16+
17+
val filePath = "file:///Volumes/doc/data/HDFSSink"
18+
val sink = new BucketingSink[String](filePath)
19+
20+
sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
21+
sink.setWriter(new StringWriter())
22+
sink.setBatchRolloverInterval(2000)
23+
24+
data.addSink(sink)
25+
env.execute("FileSystemSinkApp")
26+
}
27+
}
28+
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.javaedge.scala.chapter7
2+
3+
import java.util.Properties
4+
5+
import org.apache.flink.api.common.serialization.SimpleStringSchema
6+
import org.apache.flink.streaming.api.CheckpointingMode
7+
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
8+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
9+
10+
import org.apache.flink.api.scala._
11+
12+
/**
13+
* @author JavaEdge
14+
*
15+
* @date 2019-07-26
16+
*/
17+
object KafkaConnectorConsumerApp {
18+
def main(args: Array[String]): Unit = {
19+
val env = StreamExecutionEnvironment.getExecutionEnvironment
20+
21+
// checkpoint常用设置参数
22+
env.enableCheckpointing(4000)
23+
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
24+
env.getCheckpointConfig.setCheckpointTimeout(10000)
25+
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
26+
27+
val topic = "testJavaEdge"
28+
val properties = new Properties()
29+
30+
// 如果用JavaEdge主机名:需配置IDE所在机器的hostname和ip的映射关系
31+
properties.setProperty("bootstrap.servers", "192.168.0.106:9092")
32+
properties.setProperty("group.id", "test")
33+
34+
val data = env.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(), properties))
35+
36+
data.print()
37+
38+
env.execute("KafkaConnectorConsumerApp")
39+
}
40+
}
41+
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.javaedge.scala.chapter7
2+
3+
import java.util.Properties
4+
5+
import org.apache.flink.api.common.serialization.SimpleStringSchema
6+
import org.apache.flink.streaming.api.CheckpointingMode
7+
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
8+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
9+
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
10+
11+
12+
/**
13+
* @author JavaEdge
14+
* @date 2019-07-26
15+
*/
16+
object KafkaConnectorProducerApp {
17+
18+
def main(args: Array[String]): Unit = {
19+
20+
val env = StreamExecutionEnvironment.getExecutionEnvironment
21+
22+
// 常用检查点设置参数
23+
env.enableCheckpointing(4000)
24+
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
25+
env.getCheckpointConfig.setCheckpointTimeout(10000)
26+
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
27+
28+
// 从socket接收数据,通过Flink将数据Sink到Kafka
29+
val data = env.socketTextStream("localhost", 9999)
30+
31+
val topic = "testJavaEdge"
32+
val properties = new Properties()
33+
properties.setProperty("bootstrap.servers", "192.168.0.106:9092")
34+
35+
// val kafkaSink = new FlinkKafkaProducer[String](topic,
36+
// new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),
37+
// properties)
38+
val kafkaSink = new FlinkKafkaProducer[String](topic,
39+
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()),
40+
properties,
41+
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
42+
data.addSink(kafkaSink)
43+
44+
env.execute("KafkaConnectorProducerApp")
45+
}
46+
}
47+

0 commit comments

Comments
 (0)