




需要代码实现的就3部分,1.数据源 2.转换 3.sink指定输出格式

val env = ExecutionEnvironment.getExecutionEnvironment
语句比较固定




import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object readText {
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create datasource
val dataStream = env.readTextFile(filePath = "C:\\doc\\temp\\1.正行项目介绍.txt")
//print
dataStream.print()
//execute
env.execute()
}
}

使用Ubantu系统自带的NC生成一个socket数据源
然后编写代码,监听socket数据
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object socketSourceTest {
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create datasource
val socketDataStream = env.socketTextStream("localhost", 9999, '\n')
//print
socketDataStream.print()
//execute
env.execute()
}
}
输出结果


import org.apache.flink.api.scala._
object CollectionSourceTest {
def main(args: Array[String]): Unit = {
//create env
val env = ExecutionEnvironment.getExecutionEnvironment
//collection data source
val collectionDataStream = env.fromElements(Tuple1(1L, 2L), Tuple1(3L, 4L))
//print
collectionDataStream.print()
}
}
输出结果:


Topic是一类数据的集合

是Topic数据的物理分区

负责生成数据到kafka的broker中

为consumer指定对应的consumer Group

kafka_2.12-3.2.0kafkakakakakakakakakakakakaka-Java文档类资源-CSDN下载
下载完成后直接解压
tar -zxvf kafka_2.12-3.2.0.tgz
vi /opt/kafka/config/server.properties
增加3个配置
listeners=PLAINTEXT://10.31.126.10:9092
advertised.listeners=PLAINTEXT://10.31.126.10:9092
zookeeper.connect=10.31.126.10:2181
1.先启动zookeeper服务
cd /opt/kafka
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
开启以后,不要关闭当前窗口,不然的话zookeeper服务会中断
ps: 如果报错:/opt/kafka/bin/kafka-run-class.sh: line 342: /opt/kafka/echo/bin/java: No such file or directory
可以看下 echo $JAVA_HOME 是不是路径打印不出。
解决方案: 执行 source /etc/profile ,再打印下 echo $JAVA_HOME,看下是否正常。
2.开启kafka服务
cd /opt/kafka
./bin/kafka-server-start.sh ./config/server.properties
开启以后,不要关闭当前窗口,不然的话kafka服务会中断
3.测试kafka
bin/kafka-topics.sh --create --bootstrap-server 10.31.126.10:9092 --replication-factor 1 --partitions 1 --topic wordTest
ps:在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代- -zookeeper localhost:2181。
2.2一下版本:./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication 1 --partitions 1 --topic wordsendertest
来启动

bin/kafka-topics.sh --bootstrap-server 10.31.126.10:9092 --list
bin/kafka-topics.sh --describe --bootstrap-server 10.31.126.10:9092 --topic wordTest
bin/kafka-topics.sh -delete --bootstrap-server hp2:9092 --topic hello