diff --git a/juvenile-prosecution-boot/.gitignore b/juvenile-prosecution-boot/.gitignore
index e69de29..b144cb4 100644
--- a/juvenile-prosecution-boot/.gitignore
+++ b/juvenile-prosecution-boot/.gitignore
@@ -0,0 +1,4 @@
+/target/
+/.idea/
+*.iml
+rebel.xml
\ No newline at end of file
diff --git a/juvenile-prosecution-boot/jeecg-boot-module-system/pom.xml b/juvenile-prosecution-boot/jeecg-boot-module-system/pom.xml
index d8ddb46..8b538e2 100644
--- a/juvenile-prosecution-boot/jeecg-boot-module-system/pom.xml
+++ b/juvenile-prosecution-boot/jeecg-boot-module-system/pom.xml
@@ -45,7 +45,49 @@
1.4.7
-
+
+ org.apache.spark
+ spark-core_2.12
+ 3.3.0
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+
+
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+
+
+
+
+
+ org.apache.spark
+ spark-hive_2.12
+ 3.3.0
+
+
+
+ org.apache.spark
+ spark-sql_2.12
+ 3.3.0
+
+
+
+ org.codehaus.janino
+ janino
+ 3.0.8
+
+
+
diff --git a/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/SparkUtil.java b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/SparkUtil.java
new file mode 100644
index 0000000..a3a868f
--- /dev/null
+++ b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/SparkUtil.java
@@ -0,0 +1,110 @@
+package org.jeecg.modules.handle;
+
+import cn.hutool.core.map.MapUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.SparkSession;
+
+import java.net.URL;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * 类描述:Spark工具类
+ *
+ * @Author shaohu
+ * @Date 2022-07-18 16:55
+ */
+public class SparkUtil {
+
+ private static SparkSession spark;
+ private static SparkSession.Builder builder;
+ private static Configuration config;
+
+ static {
+ builder = SparkSession
+ .builder()
+ .master("local[*]")
+ .appName("Spark Prosecution")
+ .config("spark.ui.enabled", false);
+ //.config("spark.driver.memory","2g")
+ }
+
+ /**
+ * 配置spark环境
+ *
+ * @param
+ * @return
+ */
+ public static void setConfig(String key, String value) {
+ if (config == null) {
+ config = new Configuration();
+ }
+ config.set(key, value);
+ }
+
+ /**
+ * 配置spark环境
+ *
+ * @param configMap
+ */
+ public static void setConfig(Map configMap) {
+ if (config == null) {
+ config = new Configuration();
+ }
+ if (MapUtil.isNotEmpty(configMap)) {
+ configMap.entrySet().stream().forEach(entry -> {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ config.set(key, value);
+ });
+ }
+ }
+
+ /**
+ * 添加资源,比如hive-site.xml
+ *
+ * @param url
+ */
+ public static void addResource(URL url) {
+ if (config == null) {
+ config = new Configuration();
+ }
+ config.addResource(url);
+ }
+
+ /**
+ * 支持hive
+ */
+ public static void enableHiveSupport() {
+ builder.enableHiveSupport();
+ }
+
+
+ /**
+ * 获取spark环境
+ *
+ * @return
+ */
+ public static SparkSession getSparkInstance() {
+ if (spark == null) {
+ if (config != null) {
+ Iterator> iterator = config.iterator();
+ while (iterator.hasNext()) {
+ Map.Entry configEntry = iterator.next();
+ builder.config(configEntry.getKey(), configEntry.getValue());
+ }
+ }
+ spark = builder.getOrCreate();
+ //spark.sparkContext().setLogLevel(Level.ERROR.name());
+ }
+ return spark;
+ }
+
+ /**
+ * 销毁spark
+ */
+ public static void close() {
+ spark.close();
+ }
+
+}
diff --git a/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/service/SparkEtlService.java b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/service/SparkEtlService.java
new file mode 100644
index 0000000..548e6d5
--- /dev/null
+++ b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/service/SparkEtlService.java
@@ -0,0 +1,402 @@
+package org.jeecg.modules.handle.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.jeecg.common.util.DateUtils;
+import org.jeecg.modules.handle.SparkUtil;
+import org.jeecg.modules.system.entity.Family;
+import org.jeecg.modules.system.entity.GaDemographic;
+import org.jeecg.modules.system.entity.Minor;
+import org.jeecg.modules.system.entity.MrSchool;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import scala.Tuple2;
+
+import javax.annotation.PostConstruct;
+import java.io.Serializable;
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * 类描述:
+ *
+ * @Author shaohu
+ * @Date 2022-07-27 14:39
+ */
+@Slf4j
+@Service
+public class SparkEtlService implements Serializable {
+
+ @Value("${spark.datasource.url}")
+ private String jdbcUrl;
+ @Value("${spark.datasource.username}")
+ private String username;
+ @Value("${spark.datasource.password}")
+ private String password;
+ @Value("${spark.datasource.driver}")
+ private String driver;
+ private int SPARK_NUM_PARTITIONS = 10;
+
+ private Properties properties = new Properties();
+
+ //Spark数据存入Mysql 几种模式
+ //默认为SaveMode.ErrorIfExists模式,该模式下,若数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库;
+ //SaveMode.Append 若表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
+ //SaveMode.Overwrite 重写模式,其本质是先将已有的表及其数据全都删除,再重新创建该表,然后插入新的数据;
+ //SaveMode.Ignore 若表不存在,则创建表,并存入数据;若表存在的情况下,直接跳过数据的存储,不会报错
+ private SaveMode SPARK_MODE = SaveMode.Append;
+
+ private String TABLE_MINOR = "mr_minor_copy1";
+ private String TABLE_FAMILY = "mr_family_copy1";
+ private String TABLE_SCHOOL = "mr_school_copy1";
+
+ @PostConstruct
+ private void init() {
+ SparkUtil.setConfig("spark.sql.shuffle.partitions", "20");
+ //创建Properties对象
+ properties.setProperty("user", username); // 用户名
+ properties.setProperty("password", password); // 密码
+ properties.setProperty("driver", driver);
+ //设置分区数
+ properties.setProperty("numPartitions", String.valueOf(SPARK_NUM_PARTITIONS));
+ }
+
+ /**
+ * 数据入库
+ *
+ * @param minors
+ * @param mrSchools
+ */
+ public void dataToDb(List minors, List mrSchools) {
+ long startTimeMillis1 = System.currentTimeMillis();
+ log.info("-----------分析数据入库 开始:" + startTimeMillis1);
+ //家庭成员数据
+ List families = new ArrayList();
+ for (Minor minor : minors) {
+ if (null != minor.getFamilies() && !minor.getFamilies().isEmpty()) {
+ families.addAll(minor.getFamilies());
+ }
+ }
+ this.saveMinor(minors);
+ //保存家庭成员数据
+ if (!families.isEmpty()) {
+ this.saveFamilies(families);
+ }
+ this.saveMrSchool(mrSchools);
+ long endTimeMillis1 = System.currentTimeMillis();
+ log.info("-----------分析数据入库 结束:" + endTimeMillis1);
+ log.info("-----------分析数据入库 耗时:" + (endTimeMillis1 - startTimeMillis1));
+ SparkUtil.close();
+ minors = null;
+ mrSchools = null;
+ }
+
+ /**
+ * 根据表名清空指定表
+ *
+ * @param tableName
+ */
+ private void truncateTable(String tableName) {
+ Connection conn = null;
+ try {
+ Class.forName(driver);
+ conn = DriverManager.getConnection(jdbcUrl, properties);
+ CallableStatement sm = conn.prepareCall("truncate table " + tableName);
+ sm.execute();
+ sm.close();
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ } finally {
+ if (null != conn) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ log.error(e.getMessage());
+ }
+ }
+ }
+ }
+
+ /**
+ * 保存未成年人和家庭成员数据
+ *
+ * @param minors
+ */
+ public void saveMinor(List minors) {
+ long startTimeMillis1 = System.currentTimeMillis();
+ log.info("-----------未成年人入库 开始:" + startTimeMillis1);
+ JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkUtil.getSparkInstance().sparkContext());
+ JavaRDD minorsJavaRDD = javaSparkContext.parallelize(minors, SPARK_NUM_PARTITIONS);
+ String createTime = DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get());
+
+ JavaRDD rowRDD = minorsJavaRDD.zipWithUniqueId().map(new Function, Row>() {
+ @Override
+ public Row call(Tuple2 v1) throws Exception {
+ String id = v1._2.toString();
+ Minor minor = v1._1;
+ String name = minor.getName();
+ String sys_org_code = minor.getSysOrgCode();
+ String household_num = minor.getHouseholdNum();
+ String number = minor.getNumber();
+ Integer gender = minor.getGender();
+ String address = minor.getAddress();
+ String identity = minor.getIdentity();
+ Integer school = minor.getSchool();
+ String school_name = minor.getSchoolName();
+ String start_year = minor.getStartYear();
+ String guardian = minor.getGuardian();
+ String relation = minor.getRelation();
+ String reason = minor.getReason();
+ String special_reason = minor.getSpecialReason();
+ String division = minor.getDivision();
+ String remark = minor.getRemark();
+ String create_by = minor.getCreateBy();
+ String create_time = createTime;
+ String update_by = minor.getUpdateBy();
+ String update_time = null;
+ return RowFactory.create(id, name, sys_org_code, household_num, number, gender, address, identity, school, school_name,
+ start_year, guardian, relation, reason, special_reason, division, remark, create_by, create_time, update_by, update_time);
+ }
+ });
+
+// schemaFields.add(DataTypes.createStructField("id", DataTypes.LongType, false));
+// schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("sysOrgCode", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("householdNum", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true));
+// schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("school", DataTypes.IntegerType, true));
+// schemaFields.add(DataTypes.createStructField("schoolName", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("startYear", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("guardian", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("specialReason", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("remark", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("createBy", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("createTime", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("updateBy", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("updateTime", DataTypes.StringType, true));
+
+ List schemaFields = new ArrayList();
+ schemaFields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
+ schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("sys_org_code", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("household_num", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true));
+ schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("school", DataTypes.IntegerType, true));
+ schemaFields.add(DataTypes.createStructField("school_name", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("start_year", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("guardian", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("special_reason", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("remark", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("create_by", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("create_time", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("update_by", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("update_time", DataTypes.StringType, true));
+ StructType schema = DataTypes.createStructType(schemaFields);
+
+ Dataset minorsDs = SparkUtil.getSparkInstance().createDataFrame(rowRDD.rdd(), schema);
+ String table = TABLE_MINOR;
+ this.truncateTable(table);
+ minorsDs.write().mode(SPARK_MODE).jdbc(jdbcUrl, table, properties);
+ long endTimeMillis1 = System.currentTimeMillis();
+ log.info("-----------未成年人入库 结束:" + endTimeMillis1);
+ log.info("-----------未成年人入库 耗时:" + (endTimeMillis1 - startTimeMillis1));
+ }
+
+ /**
+ * 保存家庭成员数据
+ *
+ * @param families
+ */
+ public void saveFamilies(List families) {
+ long startTimeMillis1 = System.currentTimeMillis();
+ log.info("-----------家庭成员数据入库 开始:" + startTimeMillis1);
+ JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkUtil.getSparkInstance().sparkContext());
+ JavaRDD familiesJavaRDD = javaSparkContext.parallelize(families, SPARK_NUM_PARTITIONS);
+ String createTime = DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get());
+ JavaRDD rowRDD = familiesJavaRDD.zipWithUniqueId().map(new Function, Row>() {
+ @Override
+ public Row call(Tuple2 v1) throws Exception {
+ String id = v1._2.toString();
+ Family family = v1._1;
+ String name = family.getName();
+ String sys_org_code = family.getSysOrgCode();
+ String household_num = family.getHouseholdNum();
+ Integer gender = family.getGender();
+ String identity = family.getIdentity();
+ String number = family.getNumber();
+ String relation = family.getRelation();
+ String division = family.getDivision();
+ String address = family.getAddress();
+ Integer crime = family.getCrime();
+ String reason = family.getReason();
+ String other = family.getOther();
+ String create_by = family.getCreateBy();
+ String create_time = createTime;
+ String update_by = family.getUpdateBy();
+ String update_time = null;
+ return RowFactory.create(id, name, sys_org_code, household_num, gender, identity, number, relation, division,
+ address, crime, reason, other, create_by, create_time, update_by, update_time);
+ }
+ });
+
+ List schemaFields = new ArrayList();
+// schemaFields.add(DataTypes.createStructField("id", DataTypes.LongType, false));
+// schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("sysOrgCode", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("householdNum", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true));
+// schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("crime", DataTypes.IntegerType, true));
+// schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("other", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("createTime", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
+ schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("sys_org_code", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("household_num", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true));
+ schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("crime", DataTypes.IntegerType, true));
+ schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("other", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("create_by", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("create_time", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("update_by", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("update_time", DataTypes.StringType, true));
+
+ StructType schema = DataTypes.createStructType(schemaFields);
+
+ Dataset familiesDs = SparkUtil.getSparkInstance().createDataFrame(rowRDD.rdd(), schema);
+ String table = TABLE_FAMILY;
+ this.truncateTable(table);
+ familiesDs.write().mode(SPARK_MODE).jdbc(jdbcUrl, table, properties);
+ long endTimeMillis1 = System.currentTimeMillis();
+ log.info("-----------家庭成员数据入库 结束:" + endTimeMillis1);
+ log.info("-----------家庭成员数据入库 耗时:" + (endTimeMillis1 - startTimeMillis1));
+
+
+ }
+
+ /**
+ * 批量保存学校数据
+ *
+ * @param mrSchools
+ */
+ public void saveMrSchool(List mrSchools) {
+ long startTimeMillis1 = System.currentTimeMillis();
+ log.info("-----------学校数据入库 开始:" + startTimeMillis1);
+ log.info("saveMrSchool 开始:" + startTimeMillis1);
+ JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkUtil.getSparkInstance().sparkContext());
+ JavaRDD mrSchoolsJavaRDD = javaSparkContext.parallelize(mrSchools, SPARK_NUM_PARTITIONS);
+ String createTime = DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get());
+ JavaRDD rowRDD = mrSchoolsJavaRDD.zipWithUniqueId().map(new Function, Row>() {
+ @Override
+ public Row call(Tuple2 v1) throws Exception {
+ String id = v1._2.toString();
+ MrSchool mrSchool = v1._1;
+ String name = mrSchool.getName();
+ String sys_org_code = mrSchool.getSysOrgCode();
+ String identity = mrSchool.getIdentity();
+ String school = mrSchool.getSchool();
+ String admission_date = mrSchool.getAdmissionDate();
+ String class_name = mrSchool.getClassName();
+ String address = mrSchool.getAddress();
+ String member_one = mrSchool.getMemberOne();
+ String connect_one = mrSchool.getConnectOne();
+ String phone_one = mrSchool.getPhoneOne();
+ String member_two = mrSchool.getMemberTwo();
+ String connect_two = mrSchool.getConnectTwo();
+ String phone_two = mrSchool.getPhoneTwo();
+ String phone = mrSchool.getPhone();
+ String create_by = mrSchool.getCreateBy();
+ String create_time = createTime;
+ String update_by = mrSchool.getUpdateBy();
+ String update_time = null;
+ return RowFactory.create(id, name, sys_org_code, identity, school, admission_date, class_name,
+ address, member_one, connect_one, phone_one, member_two, connect_two, phone_two, phone,
+ create_by, create_time, update_by, update_time);
+ }
+ });
+
+ List schemaFields = new ArrayList();
+// schemaFields.add(DataTypes.createStructField("id", DataTypes.LongType, false));
+// schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("sysOrgCode", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("school", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("admissionDate", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("className", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("memberOne", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("connectOne", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("phoneOne", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("memberTwo", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("connectTwo", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("phoneTwo", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("phone", DataTypes.StringType, true));
+// schemaFields.add(DataTypes.createStructField("createTime", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
+ schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("sys_org_code", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("school", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("admission_date", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("class_name", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("member_one", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("connect_one", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("phone_one", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("member_two", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("connect_two", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("phone_two", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("phone", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("create_by", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("create_time", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("update_by", DataTypes.StringType, true));
+ schemaFields.add(DataTypes.createStructField("update_time", DataTypes.StringType, true));
+ StructType schema = DataTypes.createStructType(schemaFields);
+
+ Dataset schoolDs = SparkUtil.getSparkInstance().createDataFrame(rowRDD.rdd(), schema);
+ String table = TABLE_SCHOOL;
+ this.truncateTable(table);
+ schoolDs.write().mode(SPARK_MODE).jdbc(jdbcUrl, table, properties);
+ long endTimeMillis1 = System.currentTimeMillis();
+ log.info("-----------学校数据入库 结束:" + endTimeMillis1);
+ log.info("-----------学校数据入库 耗时:" + (endTimeMillis1 - startTimeMillis1));
+ }
+}
diff --git a/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/system/controller/AnalyzeController.java b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/system/controller/AnalyzeController.java
index adf2be2..a2401cd 100644
--- a/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/system/controller/AnalyzeController.java
+++ b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/system/controller/AnalyzeController.java
@@ -10,12 +10,14 @@ import org.jeecg.common.aspect.annotation.AutoLog;
import org.jeecg.common.system.base.controller.JeecgController;
import org.jeecg.common.system.vo.LoginUser;
import org.jeecg.common.util.oConvertUtils;
+import org.jeecg.modules.handle.service.SparkEtlService;
import org.jeecg.modules.system.entity.*;
import org.jeecg.modules.system.service.*;
import org.jeecg.modules.system.util.IDNumberUtil;
import org.jeecg.modules.system.util.getRandomId;
import org.jeecg.modules.system.vo.GaPunishVo;
import org.jeecg.modules.system.vo.SpecialStudentVo;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@@ -23,7 +25,6 @@ import java.util.*;
import java.util.concurrent.*;
-
/**
* @Description: 数据分析
* @Author: jeecg-boot
@@ -70,51 +71,60 @@ public class AnalyzeController extends JeecgController resultFamily = new ArrayList<>();//分析后家庭的数据
private List resultMinor = new ArrayList<>();//分析后未成年人的信息
private List resultSchools = new ArrayList<>();//分析后学籍的信息
private Set testIdCards = new HashSet<>();//存储户号
private static final int corePoolSize = Runtime.getRuntime().availableProcessors();
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize + 1, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue(1000));
- private Integer random_number=000000;
+ @Value("${spark.enabled}")
+ private Boolean sparkEnabled;
+ @Resource
+ private SparkEtlService etlService;
+ private Integer random_number = 000000;
+
@AutoLog(value = "来源数据管理-分页列表查询")
@ApiOperation(value = "来源数据管理-分页列表查询", notes = "来源数据管理-分页列表查询")
@PutMapping(value = "/doAnalyzeData")
public void doAnalyzeData() {
try {
- System.out.println("开始数据分析");
+ log.info("开始数据分析");
long startTime = System.currentTimeMillis();
List departs = sysDepartService.querySysDeparts();
getData(departs);
- System.out.println("开始数据分析" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
-// System.out.println("家庭成员数据批量新增总条数" + resultFamily.size());
- System.out.println("未成年人数据批量新增总条数" + resultMinor.size());
- System.out.println("学籍信息数据批量新增总条数" + resultSchools.size());
- long startTime2 = System.currentTimeMillis();
- minorService.insertBatch(resultMinor);
- System.out.println("未成年人数据批量新增所用时间" + (System.currentTimeMillis() - startTime2) / 1000 + "秒");
- long startTime1 = System.currentTimeMillis();
+
+ if (null != sparkEnabled && sparkEnabled) {
+ etlService.dataToDb(resultMinor, resultSchools);
+ } else {
+ log.info("开始数据分析" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
+// log.info("家庭成员数据批量新增总条数" + resultFamily.size());
+ log.info("未成年人数据批量新增总条数" + resultMinor.size());
+ log.info("学籍信息数据批量新增总条数" + resultSchools.size());
+ long startTime2 = System.currentTimeMillis();
+ minorService.insertBatch(resultMinor);
+ log.info("未成年人数据批量新增所用时间" + (System.currentTimeMillis() - startTime2) / 1000 + "秒");
+ long startTime1 = System.currentTimeMillis();
// familyService.insertBatch(resultFamily);
- System.out.println("家庭成员数据批量新增所用时间" + (System.currentTimeMillis() - startTime1) / 1000 + "秒");
- long startTime3 = System.currentTimeMillis();
- schoolService.insertBatch(resultSchools);
- System.out.println("学籍信息入库时间" + (System.currentTimeMillis() - startTime3) / 1000 + "秒");
+ log.info("家庭成员数据批量新增所用时间" + (System.currentTimeMillis() - startTime1) / 1000 + "秒");
+ long startTime3 = System.currentTimeMillis();
+ schoolService.insertBatch(resultSchools);
+ log.info("学籍信息入库时间" + (System.currentTimeMillis() - startTime3) / 1000 + "秒");
+ }
long startTime4 = System.currentTimeMillis();
getSourceData();
- System.out.println("来源数据入库时间" + (System.currentTimeMillis() - startTime4) / 1000 + "秒");
- System.out.println("*********************************总用时" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
-// return Result.OK("操作成功");
+ log.info("来源数据入库时间" + (System.currentTimeMillis() - startTime4) / 1000 + "秒");
+ log.info("*********************************总用时" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
} catch (Exception e) {
e.printStackTrace();
log.info(String.valueOf(e));
-// return Result.OK("操作失败");
}
}
public void getData(List departs) {
try {
- System.out.println("开始数据分析");
+ log.info("开始数据分析");
long startTime = System.currentTimeMillis();
ExecutorService pool = Executors.newFixedThreadPool(50);
CompletableFuture