diff --git a/juvenile-prosecution-boot/jeecg-boot-module-system/pom.xml b/juvenile-prosecution-boot/jeecg-boot-module-system/pom.xml index d8ddb46..8b538e2 100644 --- a/juvenile-prosecution-boot/jeecg-boot-module-system/pom.xml +++ b/juvenile-prosecution-boot/jeecg-boot-module-system/pom.xml @@ -45,7 +45,49 @@ 1.4.7 - + + org.apache.spark + spark-core_2.12 + 3.3.0 + + + org.apache.logging.log4j + log4j-slf4j-impl + + + org.apache.logging.log4j + log4j-api + + + + + + + org.apache.logging.log4j + log4j-1.2-api + + + + + + org.apache.spark + spark-hive_2.12 + 3.3.0 + + + + org.apache.spark + spark-sql_2.12 + 3.3.0 + + + + org.codehaus.janino + janino + 3.0.8 + + + diff --git a/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/SparkUtil.java b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/SparkUtil.java new file mode 100644 index 0000000..a3a868f --- /dev/null +++ b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/SparkUtil.java @@ -0,0 +1,110 @@ +package org.jeecg.modules.handle; + +import cn.hutool.core.map.MapUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.sql.SparkSession; + +import java.net.URL; +import java.util.Iterator; +import java.util.Map; + +/** + * 类描述:Spark工具类 + * + * @Author shaohu + * @Date 2022-07-18 16:55 + */ +public class SparkUtil { + + private static SparkSession spark; + private static SparkSession.Builder builder; + private static Configuration config; + + static { + builder = SparkSession + .builder() + .master("local[*]") + .appName("Spark Prosecution") + .config("spark.ui.enabled", false); + //.config("spark.driver.memory","2g") + } + + /** + * 配置spark环境 + * + * @param + * @return + */ + public static void setConfig(String key, String value) { + if (config == null) { + config = new Configuration(); + } + config.set(key, value); + } + + /** + * 配置spark环境 + * + * @param configMap + */ + public static void setConfig(Map configMap) { + if (config == null) { + config = new Configuration(); + } + if (MapUtil.isNotEmpty(configMap)) { + configMap.entrySet().stream().forEach(entry -> { + String key = entry.getKey(); + String value = entry.getValue(); + config.set(key, value); + }); + } + } + + /** + * 添加资源,比如hive-site.xml + * + * @param url + */ + public static void addResource(URL url) { + if (config == null) { + config = new Configuration(); + } + config.addResource(url); + } + + /** + * 支持hive + */ + public static void enableHiveSupport() { + builder.enableHiveSupport(); + } + + + /** + * 获取spark环境 + * + * @return + */ + public static SparkSession getSparkInstance() { + if (spark == null) { + if (config != null) { + Iterator> iterator = config.iterator(); + while (iterator.hasNext()) { + Map.Entry configEntry = iterator.next(); + builder.config(configEntry.getKey(), configEntry.getValue()); + } + } + spark = builder.getOrCreate(); + //spark.sparkContext().setLogLevel(Level.ERROR.name()); + } + return spark; + } + + /** + * 销毁spark + */ + public static void close() { + spark.close(); + } + +} diff --git a/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/service/SparkEtlService.java b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/service/SparkEtlService.java new file mode 100644 index 0000000..548e6d5 --- /dev/null +++ b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/service/SparkEtlService.java @@ -0,0 +1,402 @@ +package org.jeecg.modules.handle.service; + +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.jeecg.common.util.DateUtils; +import org.jeecg.modules.handle.SparkUtil; +import org.jeecg.modules.system.entity.Family; +import org.jeecg.modules.system.entity.GaDemographic; +import org.jeecg.modules.system.entity.Minor; +import org.jeecg.modules.system.entity.MrSchool; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import scala.Tuple2; + +import javax.annotation.PostConstruct; +import java.io.Serializable; +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Properties; + +/** + * 类描述: + * + * @Author shaohu + * @Date 2022-07-27 14:39 + */ +@Slf4j +@Service +public class SparkEtlService implements Serializable { + + @Value("${spark.datasource.url}") + private String jdbcUrl; + @Value("${spark.datasource.username}") + private String username; + @Value("${spark.datasource.password}") + private String password; + @Value("${spark.datasource.driver}") + private String driver; + private int SPARK_NUM_PARTITIONS = 10; + + private Properties properties = new Properties(); + + //Spark数据存入Mysql 几种模式 + //默认为SaveMode.ErrorIfExists模式,该模式下,若数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库; + //SaveMode.Append 若表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据; + //SaveMode.Overwrite 重写模式,其本质是先将已有的表及其数据全都删除,再重新创建该表,然后插入新的数据; + //SaveMode.Ignore 若表不存在,则创建表,并存入数据;若表存在的情况下,直接跳过数据的存储,不会报错 + private SaveMode SPARK_MODE = SaveMode.Append; + + private String TABLE_MINOR = "mr_minor_copy1"; + private String TABLE_FAMILY = "mr_family_copy1"; + private String TABLE_SCHOOL = "mr_school_copy1"; + + @PostConstruct + private void init() { + SparkUtil.setConfig("spark.sql.shuffle.partitions", "20"); + //创建Properties对象 + properties.setProperty("user", username); // 用户名 + properties.setProperty("password", password); // 密码 + properties.setProperty("driver", driver); + //设置分区数 + properties.setProperty("numPartitions", String.valueOf(SPARK_NUM_PARTITIONS)); + } + + /** + * 数据入库 + * + * @param minors + * @param mrSchools + */ + public void dataToDb(List minors, List mrSchools) { + long startTimeMillis1 = System.currentTimeMillis(); + log.info("-----------分析数据入库 开始:" + startTimeMillis1); + //家庭成员数据 + List families = new ArrayList(); + for (Minor minor : minors) { + if (null != minor.getFamilies() && !minor.getFamilies().isEmpty()) { + families.addAll(minor.getFamilies()); + } + } + this.saveMinor(minors); + //保存家庭成员数据 + if (!families.isEmpty()) { + this.saveFamilies(families); + } + this.saveMrSchool(mrSchools); + long endTimeMillis1 = System.currentTimeMillis(); + log.info("-----------分析数据入库 结束:" + endTimeMillis1); + log.info("-----------分析数据入库 耗时:" + (endTimeMillis1 - startTimeMillis1)); + SparkUtil.close(); + minors = null; + mrSchools = null; + } + + /** + * 根据表名清空指定表 + * + * @param tableName + */ + private void truncateTable(String tableName) { + Connection conn = null; + try { + Class.forName(driver); + conn = DriverManager.getConnection(jdbcUrl, properties); + CallableStatement sm = conn.prepareCall("truncate table " + tableName); + sm.execute(); + sm.close(); + } catch (Exception e) { + log.error(e.getMessage()); + } finally { + if (null != conn) { + try { + conn.close(); + } catch (SQLException e) { + log.error(e.getMessage()); + } + } + } + } + + /** + * 保存未成年人和家庭成员数据 + * + * @param minors + */ + public void saveMinor(List minors) { + long startTimeMillis1 = System.currentTimeMillis(); + log.info("-----------未成年人入库 开始:" + startTimeMillis1); + JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkUtil.getSparkInstance().sparkContext()); + JavaRDD minorsJavaRDD = javaSparkContext.parallelize(minors, SPARK_NUM_PARTITIONS); + String createTime = DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get()); + + JavaRDD rowRDD = minorsJavaRDD.zipWithUniqueId().map(new Function, Row>() { + @Override + public Row call(Tuple2 v1) throws Exception { + String id = v1._2.toString(); + Minor minor = v1._1; + String name = minor.getName(); + String sys_org_code = minor.getSysOrgCode(); + String household_num = minor.getHouseholdNum(); + String number = minor.getNumber(); + Integer gender = minor.getGender(); + String address = minor.getAddress(); + String identity = minor.getIdentity(); + Integer school = minor.getSchool(); + String school_name = minor.getSchoolName(); + String start_year = minor.getStartYear(); + String guardian = minor.getGuardian(); + String relation = minor.getRelation(); + String reason = minor.getReason(); + String special_reason = minor.getSpecialReason(); + String division = minor.getDivision(); + String remark = minor.getRemark(); + String create_by = minor.getCreateBy(); + String create_time = createTime; + String update_by = minor.getUpdateBy(); + String update_time = null; + return RowFactory.create(id, name, sys_org_code, household_num, number, gender, address, identity, school, school_name, + start_year, guardian, relation, reason, special_reason, division, remark, create_by, create_time, update_by, update_time); + } + }); + +// schemaFields.add(DataTypes.createStructField("id", DataTypes.LongType, false)); +// schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("sysOrgCode", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("householdNum", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true)); +// schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("school", DataTypes.IntegerType, true)); +// schemaFields.add(DataTypes.createStructField("schoolName", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("startYear", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("guardian", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("specialReason", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("remark", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("createBy", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("createTime", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("updateBy", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("updateTime", DataTypes.StringType, true)); + + List schemaFields = new ArrayList(); + schemaFields.add(DataTypes.createStructField("id", DataTypes.StringType, false)); + schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("sys_org_code", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("household_num", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true)); + schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("school", DataTypes.IntegerType, true)); + schemaFields.add(DataTypes.createStructField("school_name", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("start_year", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("guardian", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("special_reason", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("remark", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("create_by", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("create_time", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("update_by", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("update_time", DataTypes.StringType, true)); + StructType schema = DataTypes.createStructType(schemaFields); + + Dataset minorsDs = SparkUtil.getSparkInstance().createDataFrame(rowRDD.rdd(), schema); + String table = TABLE_MINOR; + this.truncateTable(table); + minorsDs.write().mode(SPARK_MODE).jdbc(jdbcUrl, table, properties); + long endTimeMillis1 = System.currentTimeMillis(); + log.info("-----------未成年人入库 结束:" + endTimeMillis1); + log.info("-----------未成年人入库 耗时:" + (endTimeMillis1 - startTimeMillis1)); + } + + /** + * 保存家庭成员数据 + * + * @param families + */ + public void saveFamilies(List families) { + long startTimeMillis1 = System.currentTimeMillis(); + log.info("-----------家庭成员数据入库 开始:" + startTimeMillis1); + JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkUtil.getSparkInstance().sparkContext()); + JavaRDD familiesJavaRDD = javaSparkContext.parallelize(families, SPARK_NUM_PARTITIONS); + String createTime = DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get()); + JavaRDD rowRDD = familiesJavaRDD.zipWithUniqueId().map(new Function, Row>() { + @Override + public Row call(Tuple2 v1) throws Exception { + String id = v1._2.toString(); + Family family = v1._1; + String name = family.getName(); + String sys_org_code = family.getSysOrgCode(); + String household_num = family.getHouseholdNum(); + Integer gender = family.getGender(); + String identity = family.getIdentity(); + String number = family.getNumber(); + String relation = family.getRelation(); + String division = family.getDivision(); + String address = family.getAddress(); + Integer crime = family.getCrime(); + String reason = family.getReason(); + String other = family.getOther(); + String create_by = family.getCreateBy(); + String create_time = createTime; + String update_by = family.getUpdateBy(); + String update_time = null; + return RowFactory.create(id, name, sys_org_code, household_num, gender, identity, number, relation, division, + address, crime, reason, other, create_by, create_time, update_by, update_time); + } + }); + + List schemaFields = new ArrayList(); +// schemaFields.add(DataTypes.createStructField("id", DataTypes.LongType, false)); +// schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("sysOrgCode", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("householdNum", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true)); +// schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("crime", DataTypes.IntegerType, true)); +// schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("other", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("createTime", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("id", DataTypes.StringType, false)); + schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("sys_org_code", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("household_num", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true)); + schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("crime", DataTypes.IntegerType, true)); + schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("other", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("create_by", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("create_time", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("update_by", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("update_time", DataTypes.StringType, true)); + + StructType schema = DataTypes.createStructType(schemaFields); + + Dataset familiesDs = SparkUtil.getSparkInstance().createDataFrame(rowRDD.rdd(), schema); + String table = TABLE_FAMILY; + this.truncateTable(table); + familiesDs.write().mode(SPARK_MODE).jdbc(jdbcUrl, table, properties); + long endTimeMillis1 = System.currentTimeMillis(); + log.info("-----------家庭成员数据入库 结束:" + endTimeMillis1); + log.info("-----------家庭成员数据入库 耗时:" + (endTimeMillis1 - startTimeMillis1)); + + + } + + /** + * 批量保存学校数据 + * + * @param mrSchools + */ + public void saveMrSchool(List mrSchools) { + long startTimeMillis1 = System.currentTimeMillis(); + log.info("-----------学校数据入库 开始:" + startTimeMillis1); + log.info("saveMrSchool 开始:" + startTimeMillis1); + JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkUtil.getSparkInstance().sparkContext()); + JavaRDD mrSchoolsJavaRDD = javaSparkContext.parallelize(mrSchools, SPARK_NUM_PARTITIONS); + String createTime = DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get()); + JavaRDD rowRDD = mrSchoolsJavaRDD.zipWithUniqueId().map(new Function, Row>() { + @Override + public Row call(Tuple2 v1) throws Exception { + String id = v1._2.toString(); + MrSchool mrSchool = v1._1; + String name = mrSchool.getName(); + String sys_org_code = mrSchool.getSysOrgCode(); + String identity = mrSchool.getIdentity(); + String school = mrSchool.getSchool(); + String admission_date = mrSchool.getAdmissionDate(); + String class_name = mrSchool.getClassName(); + String address = mrSchool.getAddress(); + String member_one = mrSchool.getMemberOne(); + String connect_one = mrSchool.getConnectOne(); + String phone_one = mrSchool.getPhoneOne(); + String member_two = mrSchool.getMemberTwo(); + String connect_two = mrSchool.getConnectTwo(); + String phone_two = mrSchool.getPhoneTwo(); + String phone = mrSchool.getPhone(); + String create_by = mrSchool.getCreateBy(); + String create_time = createTime; + String update_by = mrSchool.getUpdateBy(); + String update_time = null; + return RowFactory.create(id, name, sys_org_code, identity, school, admission_date, class_name, + address, member_one, connect_one, phone_one, member_two, connect_two, phone_two, phone, + create_by, create_time, update_by, update_time); + } + }); + + List schemaFields = new ArrayList(); +// schemaFields.add(DataTypes.createStructField("id", DataTypes.LongType, false)); +// schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("sysOrgCode", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("school", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("admissionDate", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("className", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("memberOne", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("connectOne", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("phoneOne", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("memberTwo", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("connectTwo", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("phoneTwo", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("phone", DataTypes.StringType, true)); +// schemaFields.add(DataTypes.createStructField("createTime", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("id", DataTypes.StringType, false)); + schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("sys_org_code", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("school", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("admission_date", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("class_name", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("member_one", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("connect_one", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("phone_one", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("member_two", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("connect_two", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("phone_two", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("phone", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("create_by", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("create_time", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("update_by", DataTypes.StringType, true)); + schemaFields.add(DataTypes.createStructField("update_time", DataTypes.StringType, true)); + StructType schema = DataTypes.createStructType(schemaFields); + + Dataset schoolDs = SparkUtil.getSparkInstance().createDataFrame(rowRDD.rdd(), schema); + String table = TABLE_SCHOOL; + this.truncateTable(table); + schoolDs.write().mode(SPARK_MODE).jdbc(jdbcUrl, table, properties); + long endTimeMillis1 = System.currentTimeMillis(); + log.info("-----------学校数据入库 结束:" + endTimeMillis1); + log.info("-----------学校数据入库 耗时:" + (endTimeMillis1 - startTimeMillis1)); + } +} diff --git a/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/system/controller/AnalyzeController.java b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/system/controller/AnalyzeController.java index 78861c7..915f9f8 100644 --- a/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/system/controller/AnalyzeController.java +++ b/juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/system/controller/AnalyzeController.java @@ -10,12 +10,14 @@ import org.jeecg.common.aspect.annotation.AutoLog; import org.jeecg.common.system.base.controller.JeecgController; import org.jeecg.common.system.vo.LoginUser; import org.jeecg.common.util.oConvertUtils; +import org.jeecg.modules.handle.service.SparkEtlService; import org.jeecg.modules.system.entity.*; import org.jeecg.modules.system.service.*; import org.jeecg.modules.system.util.IDNumberUtil; import org.jeecg.modules.system.util.getRandomId; import org.jeecg.modules.system.vo.GaPunishVo; import org.jeecg.modules.system.vo.SpecialStudentVo; +import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; @@ -23,7 +25,6 @@ import java.util.*; import java.util.concurrent.*; - /** * @Description: 数据分析 * @Author: jeecg-boot @@ -35,6 +36,12 @@ import java.util.concurrent.*; @RequestMapping("/sys/analyze") @Slf4j public class AnalyzeController extends JeecgController { + private static final int corePoolSize = Runtime.getRuntime().availableProcessors(); + private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize + 1, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue(1000)); + // private final List resultFamily = new ArrayList<>();//分析后家庭的数据 + private final List resultMinor = new ArrayList<>();//分析后未成年人的信息 + private final List resultSchools = new ArrayList<>();//分析后学籍的信息 + private final Set testIdCards = new HashSet<>();//存储户号 @Resource private IGaDemographicService gaDemographicService; @Resource @@ -69,14 +76,11 @@ public class AnalyzeController extends JeecgController resultFamily = new ArrayList<>();//分析后家庭的数据 - private final List resultMinor = new ArrayList<>();//分析后未成年人的信息 - private final List resultSchools = new ArrayList<>();//分析后学籍的信息 - private final Set testIdCards = new HashSet<>();//存储户号 - private static final int corePoolSize = Runtime.getRuntime().availableProcessors(); - private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize + 1, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue(1000)); - private Integer random_number=000000; + @Value("${spark.enabled}") + private Boolean sparkEnabled; + @Resource + private SparkEtlService etlService; + private Integer random_number = 000000; @AutoLog(value = "来源数据管理-分页列表查询") @ApiOperation(value = "来源数据管理-分页列表查询", notes = "来源数据管理-分页列表查询") @@ -87,23 +91,28 @@ public class AnalyzeController extends JeecgController departs = sysDepartService.querySysDeparts(); getData(departs); - System.out.println("开始数据分析" + (System.currentTimeMillis() - startTime) / 1000 + "秒"); + + if (null != sparkEnabled && sparkEnabled) { + etlService.dataToDb(resultMinor, resultSchools); + } else { + System.out.println("开始数据分析" + (System.currentTimeMillis() - startTime) / 1000 + "秒"); // System.out.println("家庭成员数据批量新增总条数" + resultFamily.size()); - System.out.println("未成年人数据批量新增总条数" + resultMinor.size()); - System.out.println("学籍信息数据批量新增总条数" + resultSchools.size()); - long startTime2 = System.currentTimeMillis(); - minorService.insertBatch(resultMinor); - System.out.println("未成年人数据批量新增所用时间" + (System.currentTimeMillis() - startTime2) / 1000 + "秒"); - long startTime1 = System.currentTimeMillis(); + System.out.println("未成年人数据批量新增总条数" + resultMinor.size()); + System.out.println("学籍信息数据批量新增总条数" + resultSchools.size()); + long startTime2 = System.currentTimeMillis(); + minorService.insertBatch(resultMinor); + System.out.println("未成年人数据批量新增所用时间" + (System.currentTimeMillis() - startTime2) / 1000 + "秒"); + long startTime1 = System.currentTimeMillis(); // familyService.insertBatch(resultFamily); - System.out.println("家庭成员数据批量新增所用时间" + (System.currentTimeMillis() - startTime1) / 1000 + "秒"); - long startTime3 = System.currentTimeMillis(); - schoolService.insertBatch(resultSchools); - System.out.println("学籍信息入库时间" + (System.currentTimeMillis() - startTime3) / 1000 + "秒"); - long startTime4 = System.currentTimeMillis(); - getSourceData(); - System.out.println("来源数据入库时间" + (System.currentTimeMillis() - startTime4) / 1000 + "秒"); - System.out.println("*********************************总用时" + (System.currentTimeMillis() - startTime) / 1000 + "秒"); + System.out.println("家庭成员数据批量新增所用时间" + (System.currentTimeMillis() - startTime1) / 1000 + "秒"); + long startTime3 = System.currentTimeMillis(); + schoolService.insertBatch(resultSchools); + System.out.println("学籍信息入库时间" + (System.currentTimeMillis() - startTime3) / 1000 + "秒"); + long startTime4 = System.currentTimeMillis(); + getSourceData(); + System.out.println("来源数据入库时间" + (System.currentTimeMillis() - startTime4) / 1000 + "秒"); + System.out.println("*********************************总用时" + (System.currentTimeMillis() - startTime) / 1000 + "秒"); + } // return Result.OK("操作成功"); } catch (Exception e) { e.printStackTrace(); @@ -166,7 +175,7 @@ public class AnalyzeController extends JeecgController familyMap = gaHouseholdService.getGaHouseholds(gaDemographics); + Map familyMap = gaHouseholdService.getGaHouseholds(gaDemographics); // Map familyMap = queryByThread(); System.out.println("户籍信息数据查询时间" + (System.currentTimeMillis() - startTime1) / 1000 + "秒"); System.out.println("判断初高中学生信息和人口基本信息的交集和差集开始时间" + (System.currentTimeMillis() - startTime) / 1000 + "秒"); @@ -301,9 +310,9 @@ public class AnalyzeController extends JeecgController highSchools, Map gaDemographics, List departs, List specialStudentVos, List mzUnsupporteds, List mzLeftBehinds, List mzAdoptions, Map gaHouseholds, Map gaPunishVos, List mzOrphans - ) { + ) { try { - System.out.println("判断初高中学生信息和人口基本信息的交集和差集zongsuo"+highSchools.size()); + System.out.println("判断初高中学生信息和人口基本信息的交集和差集zongsuo" + highSchools.size()); long startTime = System.currentTimeMillis(); //不在人口基本信息的初高中学生信息 List highSchoolDifference = new ArrayList<>(); @@ -327,10 +336,10 @@ public class AnalyzeController extends JeecgController primarySchools, Map gaDemographics, List departs, List specialStudentVos, List mzUnsupporteds, List mzLeftBehinds, List mzAdoptions, Map gaHouseholds, Map gaPunishVos, List mzOrphans - ) { + ) { try { - System.out.println("判断幼小学生信息和人口基本信息的交集和差集总数"+primarySchools.size()); + System.out.println("判断幼小学生信息和人口基本信息的交集和差集总数" + primarySchools.size()); long startTime = System.currentTimeMillis(); //不在人口基本信息的幼小学生信息 List primarySchoolDifference = new ArrayList<>(); @@ -371,10 +380,10 @@ public class AnalyzeController extends JeecgController minorMap = new HashMap<>(); - Map schools=new HashMap<>(); + Map schools = new HashMap<>(); System.out.println("***********************交集总数**********************************************" + intersection.size()); System.out.println("***********************人口信息总数**********************************************" + gaHouseholds.size()); System.out.println("交集数据里的所属单位开始" + (System.currentTimeMillis() - startTime) / 1000); @@ -445,20 +454,20 @@ public class AnalyzeController extends JeecgController { Minor minor = new Minor(); - if (oConvertUtils.isNotEmpty(i) && oConvertUtils.isNotEmpty(i.getIdentity()) && IDNumberUtil.checkID(i.getIdentity())) { - String idCard=i.getIdentity(); - if(oConvertUtils.isNotEmpty(minorMap) && oConvertUtils.isNotEmpty(minorMap.get(idCard))){ - Minor minMap=minorMap.get(idCard); - if(oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(minMap.getSchoolName())){ + if (oConvertUtils.isNotEmpty(i) && oConvertUtils.isNotEmpty(i.getIdentity()) && IDNumberUtil.checkID(i.getIdentity())) { + String idCard = i.getIdentity(); + if (oConvertUtils.isNotEmpty(minorMap) && oConvertUtils.isNotEmpty(minorMap.get(idCard))) { + Minor minMap = minorMap.get(idCard); + if (oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(minMap.getSchoolName())) { if (d.getCommonCode().contains(minMap.getSchoolName())) { minMap.setSysOrgCode(d.getOrgCode()); i.setSysOrgCode(d.getOrgCode()); } } - if(oConvertUtils.isNotEmpty(schools) && oConvertUtils.isNotEmpty(schools.get(idCard)) ){ + if (oConvertUtils.isNotEmpty(schools) && oConvertUtils.isNotEmpty(schools.get(idCard))) { schools.get(idCard).setSysOrgCode(i.getSysOrgCode()); } - }else { + } else { if (oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(i.getSchool())) { if (d.getCommonCode().contains(i.getSchool())) { minor.setSysOrgCode(d.getOrgCode()); @@ -493,12 +502,12 @@ public class AnalyzeController extends JeecgController mzOrphans, Map gaHouseholds) { try { Map minorMap = new HashMap<>(); - Map schools=new HashMap<>(); + Map schools = new HashMap<>(); long startTime = System.currentTimeMillis(); System.out.println("根据差集数据判断所属单位" + difference.size()); System.out.println("未成年人总数" + difference.size()); @@ -520,20 +529,20 @@ public class AnalyzeController extends JeecgController { Minor minor = new Minor(); - if (oConvertUtils.isNotEmpty(m) && oConvertUtils.isNotEmpty(m.getIdentity()) && IDNumberUtil.checkID(m.getIdentity())) { - String idCard=m.getIdentity(); - if(oConvertUtils.isNotEmpty(minorMap) && oConvertUtils.isNotEmpty(minorMap.get(idCard))){ - Minor minMap=minorMap.get(idCard); - if(oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(minMap.getSchoolName())){ - if (d.getCommonCode().contains(minMap.getSchoolName())) { - minMap.setSysOrgCode(d.getOrgCode()); - m.setSysOrgCode(d.getOrgCode()); - } - } - if(oConvertUtils.isNotEmpty(schools) && oConvertUtils.isNotEmpty(schools.get(idCard)) ){ + if (oConvertUtils.isNotEmpty(m) && oConvertUtils.isNotEmpty(m.getIdentity()) && IDNumberUtil.checkID(m.getIdentity())) { + String idCard = m.getIdentity(); + if (oConvertUtils.isNotEmpty(minorMap) && oConvertUtils.isNotEmpty(minorMap.get(idCard))) { + Minor minMap = minorMap.get(idCard); + if (oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(minMap.getSchoolName())) { + if (d.getCommonCode().contains(minMap.getSchoolName())) { + minMap.setSysOrgCode(d.getOrgCode()); + m.setSysOrgCode(d.getOrgCode()); + } + } + if (oConvertUtils.isNotEmpty(schools) && oConvertUtils.isNotEmpty(schools.get(idCard))) { schools.get(idCard).setSysOrgCode(m.getSysOrgCode()); } - }else { + } else { if (oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(m.getSchool())) { if (d.getCommonCode().contains(m.getSchool())) { minor.setSysOrgCode(d.getOrgCode()); @@ -574,7 +583,7 @@ public class AnalyzeController extends JeecgController minorList = new ArrayList<>(); Set familys = new HashSet<>(); //存放已经被放入minor里面的户号,避免数据重复 - Set houseSet=new HashSet<>(); + Set houseSet = new HashSet<>(); Date createTime = new Date(); if (oConvertUtils.isNotEmpty(minors)) { for (Map.Entry m : minors.entrySet()) { @@ -818,12 +827,12 @@ public class AnalyzeController extends JeecgController f=new ArrayList<>(); + List f = new ArrayList<>(); //判断是否是一个家庭的人员 if (oConvertUtils.isNotEmpty(minor.getHouseholdNum())) { String houseNum = minor.getHouseholdNum(); - if(oConvertUtils.isNotEmpty(familyMaps) && oConvertUtils.isNotEmpty(familyMaps.get(houseNum)) ){ - if(!testIdCards.contains(houseNum) ){ + if (oConvertUtils.isNotEmpty(familyMaps) && oConvertUtils.isNotEmpty(familyMaps.get(houseNum))) { + if (!testIdCards.contains(houseNum)) { f.addAll(familyMaps.get(houseNum)); // resultFamily.addAll(familyMaps.get(houseNum)); testIdCards.add(houseNum); @@ -834,12 +843,12 @@ public class AnalyzeController extends JeecgController