maven构建Flink项目读取本地文件统计单词

By qq84628151 没有评论

0.安装maven,并创建项目

1.Java代码

package com.test.maven;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class App 
{
    public static void main( String[] args )
    {
        try{
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource<String> dataSource = env.readTextFile("E:\\hello.txt");
  
            AggregateOperator<Tuple2<String, Integer>> dataSourceSum = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] words = value.split(" ");
                    for (String word : words) {
                        out.collect(new Tuple2<>(word, 1));
                    }
                }
            }).groupBy(0).sum(1);
            dataSourceSum.print("dataSourceSum: ");

            env.execute();
        }
        catch(Exception e)
        {
        }
    }
}

2.pom.xml添加flink依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

3.pom.xml添加插件打包jar,mainClass修改为当前工程的类

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>1.2.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.test.maven.App</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

4.打开cmd执行mvn install

5.打开Flink UI,然后加载jar

6.提交运行

7.在E盘创建一个文本文件hello.txt,然后随便输入一些字母单词,用空格隔开

8.成功后,启动Flink的2个窗口其中一个会出现统计的信息