本篇内容主要讲解“eclipse开发spark的详细过程”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“eclipse开发spark的详细过程”吧!
一、搭建环境
eclispe安装scala-ide插件
二、读取es和mysql
首先添加pom:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test</groupId>
<artifactId>test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spark</name>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.2.0</spark.version>
<spark.artifactId.version>2.11</spark.artifactId.version>
<guava.version>18.0</guava.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${spark.artifactId.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${spark.artifactId.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
<!--scope>compile</scope -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.29</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_${spark.artifactId.version}</artifactId>
<version>6.2.0</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- log API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>spark.example.Main</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
然后写主函数:
package test
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import java.util.Properties
object querySql {
def main(args: Array[String]): Unit = {
// 读取mysql数据:
val spark = SparkSession.builder().appName("Java Spark MYSQL basic example")
.master("local")
.config("es.nodes", "127.0.0.1")
.config("es.port", "9200")
.config("es.mapping.date.rich", "false") //不解析日期类型
.getOrCreate()
val url = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8"
val table = "sys_user";
val props = new Properties()
props.setProperty("dbtable", table) // 设置表
props.setProperty("user", "root") // 设置用户名
props.setProperty("password", "123456") // 设置密码
// val df = spark.read.jdbc(url, table, props)
// df.show()
//添加筛选条件
// val filter = df.filter(col("TABLE_ID").gt("10"));
// System.out.println("mysql count:" + filter.count());
val esRows = spark.read.format("org.elasticsearch.spark.sql").load("visitlog/_doc")
// esRows.show()
esRows.createOrReplaceGlobalTempView("table1");
// val subDf = spark.sql("SELECT userId,ip,createTime,createTime2 FROM global_temp.table1")
val subDf = spark.sql("SELECT userId,count(userId) FROM global_temp.table1 group by userId")
subDf.show();
spark.close();
}
}
三、打包执行
打包命令:mvn clean scala:compile package
执行命令:java -Djava.ext.dirs=lib -cp test-0.0.1-SNAPSHOT.jar test.querySql
到此,相信大家对“eclipse开发spark的详细过程”有了更深的了解,不妨来实际操作一番吧!这里是天达云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!