Commit 93485ac8bf764baa0afa44316b2c7262bf7bad95

Authored by sh
1 parent 1d604b47

数据入库支持Spark

juvenile-prosecution-boot/jeecg-boot-module-system/pom.xml
... ... @@ -45,7 +45,49 @@
45 45 <version>1.4.7</version>
46 46 </dependency>
47 47  
48   - <!--<dependency>-->
  48 + <dependency>
  49 + <groupId>org.apache.spark</groupId>
  50 + <artifactId>spark-core_2.12</artifactId>
  51 + <version>3.3.0</version>
  52 + <exclusions>
  53 + <exclusion>
  54 + <groupId>org.apache.logging.log4j</groupId>
  55 + <artifactId>log4j-slf4j-impl</artifactId>
  56 + </exclusion>
  57 + <exclusion>
  58 + <groupId>org.apache.logging.log4j</groupId>
  59 + <artifactId>log4j-api</artifactId>
  60 + </exclusion>
  61 + <!--<exclusion>-->
  62 + <!--<groupId>org.apache.logging.log4j</groupId>-->
  63 + <!--<artifactId>log4j-core</artifactId>-->
  64 + <!--</exclusion>-->
  65 + <exclusion>
  66 + <groupId>org.apache.logging.log4j</groupId>
  67 + <artifactId>log4j-1.2-api</artifactId>
  68 + </exclusion>
  69 + </exclusions>
  70 + </dependency>
  71 +
  72 + <dependency>
  73 + <groupId>org.apache.spark</groupId>
  74 + <artifactId>spark-hive_2.12</artifactId>
  75 + <version>3.3.0</version>
  76 + </dependency>
  77 +
  78 + <dependency>
  79 + <groupId>org.apache.spark</groupId>
  80 + <artifactId>spark-sql_2.12</artifactId>
  81 + <version>3.3.0</version>
  82 + </dependency>
  83 +
  84 + <dependency>
  85 + <groupId>org.codehaus.janino</groupId>
  86 + <artifactId>janino</artifactId>
  87 + <version>3.0.8</version>
  88 + </dependency>
  89 +
  90 + <!--<dependency>-->
49 91 <!--<groupId>org.jeecgframework.boot</groupId>-->
50 92 <!--<artifactId>jeecg-boot-module-demo</artifactId>-->
51 93 <!--<version>${jeecgboot.version}</version>-->
... ...
juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/SparkUtil.java 0 → 100644
  1 +package org.jeecg.modules.handle;
  2 +
  3 +import cn.hutool.core.map.MapUtil;
  4 +import org.apache.hadoop.conf.Configuration;
  5 +import org.apache.spark.sql.SparkSession;
  6 +
  7 +import java.net.URL;
  8 +import java.util.Iterator;
  9 +import java.util.Map;
  10 +
  11 +/**
  12 + * 类描述:Spark工具类
  13 + *
  14 + * @Author shaohu
  15 + * @Date 2022-07-18 16:55
  16 + */
  17 +public class SparkUtil {
  18 +
  19 + private static SparkSession spark;
  20 + private static SparkSession.Builder builder;
  21 + private static Configuration config;
  22 +
  23 + static {
  24 + builder = SparkSession
  25 + .builder()
  26 + .master("local[*]")
  27 + .appName("Spark Prosecution")
  28 + .config("spark.ui.enabled", false);
  29 + //.config("spark.driver.memory","2g")
  30 + }
  31 +
  32 + /**
  33 + * 配置spark环境
  34 + *
  35 + * @param
  36 + * @return
  37 + */
  38 + public static void setConfig(String key, String value) {
  39 + if (config == null) {
  40 + config = new Configuration();
  41 + }
  42 + config.set(key, value);
  43 + }
  44 +
  45 + /**
  46 + * 配置spark环境
  47 + *
  48 + * @param configMap
  49 + */
  50 + public static void setConfig(Map<String, String> configMap) {
  51 + if (config == null) {
  52 + config = new Configuration();
  53 + }
  54 + if (MapUtil.isNotEmpty(configMap)) {
  55 + configMap.entrySet().stream().forEach(entry -> {
  56 + String key = entry.getKey();
  57 + String value = entry.getValue();
  58 + config.set(key, value);
  59 + });
  60 + }
  61 + }
  62 +
  63 + /**
  64 + * 添加资源,比如hive-site.xml
  65 + *
  66 + * @param url
  67 + */
  68 + public static void addResource(URL url) {
  69 + if (config == null) {
  70 + config = new Configuration();
  71 + }
  72 + config.addResource(url);
  73 + }
  74 +
  75 + /**
  76 + * 支持hive
  77 + */
  78 + public static void enableHiveSupport() {
  79 + builder.enableHiveSupport();
  80 + }
  81 +
  82 +
  83 + /**
  84 + * 获取spark环境
  85 + *
  86 + * @return
  87 + */
  88 + public static SparkSession getSparkInstance() {
  89 + if (spark == null) {
  90 + if (config != null) {
  91 + Iterator<Map.Entry<String, String>> iterator = config.iterator();
  92 + while (iterator.hasNext()) {
  93 + Map.Entry<String, String> configEntry = iterator.next();
  94 + builder.config(configEntry.getKey(), configEntry.getValue());
  95 + }
  96 + }
  97 + spark = builder.getOrCreate();
  98 + //spark.sparkContext().setLogLevel(Level.ERROR.name());
  99 + }
  100 + return spark;
  101 + }
  102 +
  103 + /**
  104 + * 销毁spark
  105 + */
  106 + public static void close() {
  107 + spark.close();
  108 + }
  109 +
  110 +}
... ...
juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/handle/service/SparkEtlService.java 0 → 100644
  1 +package org.jeecg.modules.handle.service;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +import org.apache.spark.api.java.JavaRDD;
  5 +import org.apache.spark.api.java.JavaSparkContext;
  6 +import org.apache.spark.api.java.function.Function;
  7 +import org.apache.spark.sql.Dataset;
  8 +import org.apache.spark.sql.Row;
  9 +import org.apache.spark.sql.RowFactory;
  10 +import org.apache.spark.sql.SaveMode;
  11 +import org.apache.spark.sql.types.DataTypes;
  12 +import org.apache.spark.sql.types.StructField;
  13 +import org.apache.spark.sql.types.StructType;
  14 +import org.jeecg.common.util.DateUtils;
  15 +import org.jeecg.modules.handle.SparkUtil;
  16 +import org.jeecg.modules.system.entity.Family;
  17 +import org.jeecg.modules.system.entity.GaDemographic;
  18 +import org.jeecg.modules.system.entity.Minor;
  19 +import org.jeecg.modules.system.entity.MrSchool;
  20 +import org.springframework.beans.factory.annotation.Value;
  21 +import org.springframework.stereotype.Service;
  22 +import scala.Tuple2;
  23 +
  24 +import javax.annotation.PostConstruct;
  25 +import java.io.Serializable;
  26 +import java.sql.CallableStatement;
  27 +import java.sql.Connection;
  28 +import java.sql.DriverManager;
  29 +import java.sql.SQLException;
  30 +import java.util.ArrayList;
  31 +import java.util.Date;
  32 +import java.util.List;
  33 +import java.util.Properties;
  34 +
  35 +/**
  36 + * 类描述:
  37 + *
  38 + * @Author shaohu
  39 + * @Date 2022-07-27 14:39
  40 + */
  41 +@Slf4j
  42 +@Service
  43 +public class SparkEtlService implements Serializable {
  44 +
  45 + @Value("${spark.datasource.url}")
  46 + private String jdbcUrl;
  47 + @Value("${spark.datasource.username}")
  48 + private String username;
  49 + @Value("${spark.datasource.password}")
  50 + private String password;
  51 + @Value("${spark.datasource.driver}")
  52 + private String driver;
  53 + private int SPARK_NUM_PARTITIONS = 10;
  54 +
  55 + private Properties properties = new Properties();
  56 +
  57 + //Spark数据存入Mysql 几种模式
  58 + //默认为SaveMode.ErrorIfExists模式,该模式下,若数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库;
  59 + //SaveMode.Append 若表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
  60 + //SaveMode.Overwrite 重写模式,其本质是先将已有的表及其数据全都删除,再重新创建该表,然后插入新的数据;
  61 + //SaveMode.Ignore 若表不存在,则创建表,并存入数据;若表存在的情况下,直接跳过数据的存储,不会报错
  62 + private SaveMode SPARK_MODE = SaveMode.Append;
  63 +
  64 + private String TABLE_MINOR = "mr_minor_copy1";
  65 + private String TABLE_FAMILY = "mr_family_copy1";
  66 + private String TABLE_SCHOOL = "mr_school_copy1";
  67 +
  68 + @PostConstruct
  69 + private void init() {
  70 + SparkUtil.setConfig("spark.sql.shuffle.partitions", "20");
  71 + //创建Properties对象
  72 + properties.setProperty("user", username); // 用户名
  73 + properties.setProperty("password", password); // 密码
  74 + properties.setProperty("driver", driver);
  75 + //设置分区数
  76 + properties.setProperty("numPartitions", String.valueOf(SPARK_NUM_PARTITIONS));
  77 + }
  78 +
  79 + /**
  80 + * 数据入库
  81 + *
  82 + * @param minors
  83 + * @param mrSchools
  84 + */
  85 + public void dataToDb(List<Minor> minors, List<MrSchool> mrSchools) {
  86 + long startTimeMillis1 = System.currentTimeMillis();
  87 + log.info("-----------分析数据入库 开始:" + startTimeMillis1);
  88 + //家庭成员数据
  89 + List<Family> families = new ArrayList<Family>();
  90 + for (Minor minor : minors) {
  91 + if (null != minor.getFamilies() && !minor.getFamilies().isEmpty()) {
  92 + families.addAll(minor.getFamilies());
  93 + }
  94 + }
  95 + this.saveMinor(minors);
  96 + //保存家庭成员数据
  97 + if (!families.isEmpty()) {
  98 + this.saveFamilies(families);
  99 + }
  100 + this.saveMrSchool(mrSchools);
  101 + long endTimeMillis1 = System.currentTimeMillis();
  102 + log.info("-----------分析数据入库 结束:" + endTimeMillis1);
  103 + log.info("-----------分析数据入库 耗时:" + (endTimeMillis1 - startTimeMillis1));
  104 + SparkUtil.close();
  105 + minors = null;
  106 + mrSchools = null;
  107 + }
  108 +
  109 + /**
  110 + * 根据表名清空指定表
  111 + *
  112 + * @param tableName
  113 + */
  114 + private void truncateTable(String tableName) {
  115 + Connection conn = null;
  116 + try {
  117 + Class.forName(driver);
  118 + conn = DriverManager.getConnection(jdbcUrl, properties);
  119 + CallableStatement sm = conn.prepareCall("truncate table " + tableName);
  120 + sm.execute();
  121 + sm.close();
  122 + } catch (Exception e) {
  123 + log.error(e.getMessage());
  124 + } finally {
  125 + if (null != conn) {
  126 + try {
  127 + conn.close();
  128 + } catch (SQLException e) {
  129 + log.error(e.getMessage());
  130 + }
  131 + }
  132 + }
  133 + }
  134 +
  135 + /**
  136 + * 保存未成年人和家庭成员数据
  137 + *
  138 + * @param minors
  139 + */
  140 + public void saveMinor(List<Minor> minors) {
  141 + long startTimeMillis1 = System.currentTimeMillis();
  142 + log.info("-----------未成年人入库 开始:" + startTimeMillis1);
  143 + JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkUtil.getSparkInstance().sparkContext());
  144 + JavaRDD<Minor> minorsJavaRDD = javaSparkContext.parallelize(minors, SPARK_NUM_PARTITIONS);
  145 + String createTime = DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get());
  146 +
  147 + JavaRDD<Row> rowRDD = minorsJavaRDD.zipWithUniqueId().map(new Function<Tuple2<Minor, Long>, Row>() {
  148 + @Override
  149 + public Row call(Tuple2<Minor, Long> v1) throws Exception {
  150 + String id = v1._2.toString();
  151 + Minor minor = v1._1;
  152 + String name = minor.getName();
  153 + String sys_org_code = minor.getSysOrgCode();
  154 + String household_num = minor.getHouseholdNum();
  155 + String number = minor.getNumber();
  156 + Integer gender = minor.getGender();
  157 + String address = minor.getAddress();
  158 + String identity = minor.getIdentity();
  159 + Integer school = minor.getSchool();
  160 + String school_name = minor.getSchoolName();
  161 + String start_year = minor.getStartYear();
  162 + String guardian = minor.getGuardian();
  163 + String relation = minor.getRelation();
  164 + String reason = minor.getReason();
  165 + String special_reason = minor.getSpecialReason();
  166 + String division = minor.getDivision();
  167 + String remark = minor.getRemark();
  168 + String create_by = minor.getCreateBy();
  169 + String create_time = createTime;
  170 + String update_by = minor.getUpdateBy();
  171 + String update_time = null;
  172 + return RowFactory.create(id, name, sys_org_code, household_num, number, gender, address, identity, school, school_name,
  173 + start_year, guardian, relation, reason, special_reason, division, remark, create_by, create_time, update_by, update_time);
  174 + }
  175 + });
  176 +
  177 +// schemaFields.add(DataTypes.createStructField("id", DataTypes.LongType, false));
  178 +// schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
  179 +// schemaFields.add(DataTypes.createStructField("sysOrgCode", DataTypes.StringType, true));
  180 +// schemaFields.add(DataTypes.createStructField("householdNum", DataTypes.StringType, true));
  181 +// schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true));
  182 +// schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true));
  183 +// schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
  184 +// schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
  185 +// schemaFields.add(DataTypes.createStructField("school", DataTypes.IntegerType, true));
  186 +// schemaFields.add(DataTypes.createStructField("schoolName", DataTypes.StringType, true));
  187 +// schemaFields.add(DataTypes.createStructField("startYear", DataTypes.StringType, true));
  188 +// schemaFields.add(DataTypes.createStructField("guardian", DataTypes.StringType, true));
  189 +// schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true));
  190 +// schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true));
  191 +// schemaFields.add(DataTypes.createStructField("specialReason", DataTypes.StringType, true));
  192 +// schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true));
  193 +// schemaFields.add(DataTypes.createStructField("remark", DataTypes.StringType, true));
  194 +// schemaFields.add(DataTypes.createStructField("createBy", DataTypes.StringType, true));
  195 +// schemaFields.add(DataTypes.createStructField("createTime", DataTypes.StringType, true));
  196 +// schemaFields.add(DataTypes.createStructField("updateBy", DataTypes.StringType, true));
  197 +// schemaFields.add(DataTypes.createStructField("updateTime", DataTypes.StringType, true));
  198 +
  199 + List<StructField> schemaFields = new ArrayList<StructField>();
  200 + schemaFields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
  201 + schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
  202 + schemaFields.add(DataTypes.createStructField("sys_org_code", DataTypes.StringType, true));
  203 + schemaFields.add(DataTypes.createStructField("household_num", DataTypes.StringType, true));
  204 + schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true));
  205 + schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true));
  206 + schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
  207 + schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
  208 + schemaFields.add(DataTypes.createStructField("school", DataTypes.IntegerType, true));
  209 + schemaFields.add(DataTypes.createStructField("school_name", DataTypes.StringType, true));
  210 + schemaFields.add(DataTypes.createStructField("start_year", DataTypes.StringType, true));
  211 + schemaFields.add(DataTypes.createStructField("guardian", DataTypes.StringType, true));
  212 + schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true));
  213 + schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true));
  214 + schemaFields.add(DataTypes.createStructField("special_reason", DataTypes.StringType, true));
  215 + schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true));
  216 + schemaFields.add(DataTypes.createStructField("remark", DataTypes.StringType, true));
  217 + schemaFields.add(DataTypes.createStructField("create_by", DataTypes.StringType, true));
  218 + schemaFields.add(DataTypes.createStructField("create_time", DataTypes.StringType, true));
  219 + schemaFields.add(DataTypes.createStructField("update_by", DataTypes.StringType, true));
  220 + schemaFields.add(DataTypes.createStructField("update_time", DataTypes.StringType, true));
  221 + StructType schema = DataTypes.createStructType(schemaFields);
  222 +
  223 + Dataset<Row> minorsDs = SparkUtil.getSparkInstance().createDataFrame(rowRDD.rdd(), schema);
  224 + String table = TABLE_MINOR;
  225 + this.truncateTable(table);
  226 + minorsDs.write().mode(SPARK_MODE).jdbc(jdbcUrl, table, properties);
  227 + long endTimeMillis1 = System.currentTimeMillis();
  228 + log.info("-----------未成年人入库 结束:" + endTimeMillis1);
  229 + log.info("-----------未成年人入库 耗时:" + (endTimeMillis1 - startTimeMillis1));
  230 + }
  231 +
  232 + /**
  233 + * 保存家庭成员数据
  234 + *
  235 + * @param families
  236 + */
  237 + public void saveFamilies(List<Family> families) {
  238 + long startTimeMillis1 = System.currentTimeMillis();
  239 + log.info("-----------家庭成员数据入库 开始:" + startTimeMillis1);
  240 + JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkUtil.getSparkInstance().sparkContext());
  241 + JavaRDD<Family> familiesJavaRDD = javaSparkContext.parallelize(families, SPARK_NUM_PARTITIONS);
  242 + String createTime = DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get());
  243 + JavaRDD<Row> rowRDD = familiesJavaRDD.zipWithUniqueId().map(new Function<Tuple2<Family, Long>, Row>() {
  244 + @Override
  245 + public Row call(Tuple2<Family, Long> v1) throws Exception {
  246 + String id = v1._2.toString();
  247 + Family family = v1._1;
  248 + String name = family.getName();
  249 + String sys_org_code = family.getSysOrgCode();
  250 + String household_num = family.getHouseholdNum();
  251 + Integer gender = family.getGender();
  252 + String identity = family.getIdentity();
  253 + String number = family.getNumber();
  254 + String relation = family.getRelation();
  255 + String division = family.getDivision();
  256 + String address = family.getAddress();
  257 + Integer crime = family.getCrime();
  258 + String reason = family.getReason();
  259 + String other = family.getOther();
  260 + String create_by = family.getCreateBy();
  261 + String create_time = createTime;
  262 + String update_by = family.getUpdateBy();
  263 + String update_time = null;
  264 + return RowFactory.create(id, name, sys_org_code, household_num, gender, identity, number, relation, division,
  265 + address, crime, reason, other, create_by, create_time, update_by, update_time);
  266 + }
  267 + });
  268 +
  269 + List<StructField> schemaFields = new ArrayList<StructField>();
  270 +// schemaFields.add(DataTypes.createStructField("id", DataTypes.LongType, false));
  271 +// schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
  272 +// schemaFields.add(DataTypes.createStructField("sysOrgCode", DataTypes.StringType, true));
  273 +// schemaFields.add(DataTypes.createStructField("householdNum", DataTypes.StringType, true));
  274 +// schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true));
  275 +// schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
  276 +// schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true));
  277 +// schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true));
  278 +// schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true));
  279 +// schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
  280 +// schemaFields.add(DataTypes.createStructField("crime", DataTypes.IntegerType, true));
  281 +// schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true));
  282 +// schemaFields.add(DataTypes.createStructField("other", DataTypes.StringType, true));
  283 +// schemaFields.add(DataTypes.createStructField("createTime", DataTypes.StringType, true));
  284 + schemaFields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
  285 + schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
  286 + schemaFields.add(DataTypes.createStructField("sys_org_code", DataTypes.StringType, true));
  287 + schemaFields.add(DataTypes.createStructField("household_num", DataTypes.StringType, true));
  288 + schemaFields.add(DataTypes.createStructField("gender", DataTypes.IntegerType, true));
  289 + schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
  290 + schemaFields.add(DataTypes.createStructField("number", DataTypes.StringType, true));
  291 + schemaFields.add(DataTypes.createStructField("relation", DataTypes.StringType, true));
  292 + schemaFields.add(DataTypes.createStructField("division", DataTypes.StringType, true));
  293 + schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
  294 + schemaFields.add(DataTypes.createStructField("crime", DataTypes.IntegerType, true));
  295 + schemaFields.add(DataTypes.createStructField("reason", DataTypes.StringType, true));
  296 + schemaFields.add(DataTypes.createStructField("other", DataTypes.StringType, true));
  297 + schemaFields.add(DataTypes.createStructField("create_by", DataTypes.StringType, true));
  298 + schemaFields.add(DataTypes.createStructField("create_time", DataTypes.StringType, true));
  299 + schemaFields.add(DataTypes.createStructField("update_by", DataTypes.StringType, true));
  300 + schemaFields.add(DataTypes.createStructField("update_time", DataTypes.StringType, true));
  301 +
  302 + StructType schema = DataTypes.createStructType(schemaFields);
  303 +
  304 + Dataset<Row> familiesDs = SparkUtil.getSparkInstance().createDataFrame(rowRDD.rdd(), schema);
  305 + String table = TABLE_FAMILY;
  306 + this.truncateTable(table);
  307 + familiesDs.write().mode(SPARK_MODE).jdbc(jdbcUrl, table, properties);
  308 + long endTimeMillis1 = System.currentTimeMillis();
  309 + log.info("-----------家庭成员数据入库 结束:" + endTimeMillis1);
  310 + log.info("-----------家庭成员数据入库 耗时:" + (endTimeMillis1 - startTimeMillis1));
  311 +
  312 +
  313 + }
  314 +
  315 + /**
  316 + * 批量保存学校数据
  317 + *
  318 + * @param mrSchools
  319 + */
  320 + public void saveMrSchool(List<MrSchool> mrSchools) {
  321 + long startTimeMillis1 = System.currentTimeMillis();
  322 + log.info("-----------学校数据入库 开始:" + startTimeMillis1);
  323 + log.info("saveMrSchool 开始:" + startTimeMillis1);
  324 + JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(SparkUtil.getSparkInstance().sparkContext());
  325 + JavaRDD<MrSchool> mrSchoolsJavaRDD = javaSparkContext.parallelize(mrSchools, SPARK_NUM_PARTITIONS);
  326 + String createTime = DateUtils.date2Str(new Date(), DateUtils.datetimeFormat.get());
  327 + JavaRDD<Row> rowRDD = mrSchoolsJavaRDD.zipWithUniqueId().map(new Function<Tuple2<MrSchool, Long>, Row>() {
  328 + @Override
  329 + public Row call(Tuple2<MrSchool, Long> v1) throws Exception {
  330 + String id = v1._2.toString();
  331 + MrSchool mrSchool = v1._1;
  332 + String name = mrSchool.getName();
  333 + String sys_org_code = mrSchool.getSysOrgCode();
  334 + String identity = mrSchool.getIdentity();
  335 + String school = mrSchool.getSchool();
  336 + String admission_date = mrSchool.getAdmissionDate();
  337 + String class_name = mrSchool.getClassName();
  338 + String address = mrSchool.getAddress();
  339 + String member_one = mrSchool.getMemberOne();
  340 + String connect_one = mrSchool.getConnectOne();
  341 + String phone_one = mrSchool.getPhoneOne();
  342 + String member_two = mrSchool.getMemberTwo();
  343 + String connect_two = mrSchool.getConnectTwo();
  344 + String phone_two = mrSchool.getPhoneTwo();
  345 + String phone = mrSchool.getPhone();
  346 + String create_by = mrSchool.getCreateBy();
  347 + String create_time = createTime;
  348 + String update_by = mrSchool.getUpdateBy();
  349 + String update_time = null;
  350 + return RowFactory.create(id, name, sys_org_code, identity, school, admission_date, class_name,
  351 + address, member_one, connect_one, phone_one, member_two, connect_two, phone_two, phone,
  352 + create_by, create_time, update_by, update_time);
  353 + }
  354 + });
  355 +
  356 + List<StructField> schemaFields = new ArrayList<StructField>();
  357 +// schemaFields.add(DataTypes.createStructField("id", DataTypes.LongType, false));
  358 +// schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
  359 +// schemaFields.add(DataTypes.createStructField("sysOrgCode", DataTypes.StringType, true));
  360 +// schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
  361 +// schemaFields.add(DataTypes.createStructField("school", DataTypes.StringType, true));
  362 +// schemaFields.add(DataTypes.createStructField("admissionDate", DataTypes.StringType, true));
  363 +// schemaFields.add(DataTypes.createStructField("className", DataTypes.StringType, true));
  364 +// schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
  365 +// schemaFields.add(DataTypes.createStructField("memberOne", DataTypes.StringType, true));
  366 +// schemaFields.add(DataTypes.createStructField("connectOne", DataTypes.StringType, true));
  367 +// schemaFields.add(DataTypes.createStructField("phoneOne", DataTypes.StringType, true));
  368 +// schemaFields.add(DataTypes.createStructField("memberTwo", DataTypes.StringType, true));
  369 +// schemaFields.add(DataTypes.createStructField("connectTwo", DataTypes.StringType, true));
  370 +// schemaFields.add(DataTypes.createStructField("phoneTwo", DataTypes.StringType, true));
  371 +// schemaFields.add(DataTypes.createStructField("phone", DataTypes.StringType, true));
  372 +// schemaFields.add(DataTypes.createStructField("createTime", DataTypes.StringType, true));
  373 + schemaFields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
  374 + schemaFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
  375 + schemaFields.add(DataTypes.createStructField("sys_org_code", DataTypes.StringType, true));
  376 + schemaFields.add(DataTypes.createStructField("identity", DataTypes.StringType, true));
  377 + schemaFields.add(DataTypes.createStructField("school", DataTypes.StringType, true));
  378 + schemaFields.add(DataTypes.createStructField("admission_date", DataTypes.StringType, true));
  379 + schemaFields.add(DataTypes.createStructField("class_name", DataTypes.StringType, true));
  380 + schemaFields.add(DataTypes.createStructField("address", DataTypes.StringType, true));
  381 + schemaFields.add(DataTypes.createStructField("member_one", DataTypes.StringType, true));
  382 + schemaFields.add(DataTypes.createStructField("connect_one", DataTypes.StringType, true));
  383 + schemaFields.add(DataTypes.createStructField("phone_one", DataTypes.StringType, true));
  384 + schemaFields.add(DataTypes.createStructField("member_two", DataTypes.StringType, true));
  385 + schemaFields.add(DataTypes.createStructField("connect_two", DataTypes.StringType, true));
  386 + schemaFields.add(DataTypes.createStructField("phone_two", DataTypes.StringType, true));
  387 + schemaFields.add(DataTypes.createStructField("phone", DataTypes.StringType, true));
  388 + schemaFields.add(DataTypes.createStructField("create_by", DataTypes.StringType, true));
  389 + schemaFields.add(DataTypes.createStructField("create_time", DataTypes.StringType, true));
  390 + schemaFields.add(DataTypes.createStructField("update_by", DataTypes.StringType, true));
  391 + schemaFields.add(DataTypes.createStructField("update_time", DataTypes.StringType, true));
  392 + StructType schema = DataTypes.createStructType(schemaFields);
  393 +
  394 + Dataset<Row> schoolDs = SparkUtil.getSparkInstance().createDataFrame(rowRDD.rdd(), schema);
  395 + String table = TABLE_SCHOOL;
  396 + this.truncateTable(table);
  397 + schoolDs.write().mode(SPARK_MODE).jdbc(jdbcUrl, table, properties);
  398 + long endTimeMillis1 = System.currentTimeMillis();
  399 + log.info("-----------学校数据入库 结束:" + endTimeMillis1);
  400 + log.info("-----------学校数据入库 耗时:" + (endTimeMillis1 - startTimeMillis1));
  401 + }
  402 +}
... ...
juvenile-prosecution-boot/jeecg-boot-module-system/src/main/java/org/jeecg/modules/system/controller/AnalyzeController.java
... ... @@ -10,12 +10,14 @@ import org.jeecg.common.aspect.annotation.AutoLog;
10 10 import org.jeecg.common.system.base.controller.JeecgController;
11 11 import org.jeecg.common.system.vo.LoginUser;
12 12 import org.jeecg.common.util.oConvertUtils;
  13 +import org.jeecg.modules.handle.service.SparkEtlService;
13 14 import org.jeecg.modules.system.entity.*;
14 15 import org.jeecg.modules.system.service.*;
15 16 import org.jeecg.modules.system.util.IDNumberUtil;
16 17 import org.jeecg.modules.system.util.getRandomId;
17 18 import org.jeecg.modules.system.vo.GaPunishVo;
18 19 import org.jeecg.modules.system.vo.SpecialStudentVo;
  20 +import org.springframework.beans.factory.annotation.Value;
19 21 import org.springframework.web.bind.annotation.*;
20 22  
21 23 import javax.annotation.Resource;
... ... @@ -23,7 +25,6 @@ import java.util.*;
23 25 import java.util.concurrent.*;
24 26  
25 27  
26   -
27 28 /**
28 29 * @Description: 数据分析
29 30 * @Author: jeecg-boot
... ... @@ -35,6 +36,12 @@ import java.util.concurrent.*;
35 36 @RequestMapping("/sys/analyze")
36 37 @Slf4j
37 38 public class AnalyzeController extends JeecgController<GaDemographic, IGaDemographicService> {
  39 + private static final int corePoolSize = Runtime.getRuntime().availableProcessors();
  40 + private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize + 1, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000));
  41 + // private final List<Family> resultFamily = new ArrayList<>();//分析后家庭的数据
  42 + private final List<Minor> resultMinor = new ArrayList<>();//分析后未成年人的信息
  43 + private final List<MrSchool> resultSchools = new ArrayList<>();//分析后学籍的信息
  44 + private final Set<String> testIdCards = new HashSet<>();//存储户号
38 45 @Resource
39 46 private IGaDemographicService gaDemographicService;
40 47 @Resource
... ... @@ -69,14 +76,11 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
69 76 private ISourceService sourceService;
70 77 @Resource
71 78 private ISysDictService dictService;
72   -
73   -// private final List<Family> resultFamily = new ArrayList<>();//分析后家庭的数据
74   - private final List<Minor> resultMinor = new ArrayList<>();//分析后未成年人的信息
75   - private final List<MrSchool> resultSchools = new ArrayList<>();//分析后学籍的信息
76   - private final Set<String> testIdCards = new HashSet<>();//存储户号
77   - private static final int corePoolSize = Runtime.getRuntime().availableProcessors();
78   - private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize + 1, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000));
79   - private Integer random_number=000000;
  79 + @Value("${spark.enabled}")
  80 + private Boolean sparkEnabled;
  81 + @Resource
  82 + private SparkEtlService etlService;
  83 + private Integer random_number = 000000;
80 84  
81 85 @AutoLog(value = "来源数据管理-分页列表查询")
82 86 @ApiOperation(value = "来源数据管理-分页列表查询", notes = "来源数据管理-分页列表查询")
... ... @@ -87,23 +91,28 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
87 91 long startTime = System.currentTimeMillis();
88 92 List<SysDepart> departs = sysDepartService.querySysDeparts();
89 93 getData(departs);
90   - System.out.println("开始数据分析" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
  94 +
  95 + if (null != sparkEnabled && sparkEnabled) {
  96 + etlService.dataToDb(resultMinor, resultSchools);
  97 + } else {
  98 + System.out.println("开始数据分析" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
91 99 // System.out.println("家庭成员数据批量新增总条数" + resultFamily.size());
92   - System.out.println("未成年人数据批量新增总条数" + resultMinor.size());
93   - System.out.println("学籍信息数据批量新增总条数" + resultSchools.size());
94   - long startTime2 = System.currentTimeMillis();
95   - minorService.insertBatch(resultMinor);
96   - System.out.println("未成年人数据批量新增所用时间" + (System.currentTimeMillis() - startTime2) / 1000 + "秒");
97   - long startTime1 = System.currentTimeMillis();
  100 + System.out.println("未成年人数据批量新增总条数" + resultMinor.size());
  101 + System.out.println("学籍信息数据批量新增总条数" + resultSchools.size());
  102 + long startTime2 = System.currentTimeMillis();
  103 + minorService.insertBatch(resultMinor);
  104 + System.out.println("未成年人数据批量新增所用时间" + (System.currentTimeMillis() - startTime2) / 1000 + "秒");
  105 + long startTime1 = System.currentTimeMillis();
98 106 // familyService.insertBatch(resultFamily);
99   - System.out.println("家庭成员数据批量新增所用时间" + (System.currentTimeMillis() - startTime1) / 1000 + "秒");
100   - long startTime3 = System.currentTimeMillis();
101   - schoolService.insertBatch(resultSchools);
102   - System.out.println("学籍信息入库时间" + (System.currentTimeMillis() - startTime3) / 1000 + "秒");
103   - long startTime4 = System.currentTimeMillis();
104   - getSourceData();
105   - System.out.println("来源数据入库时间" + (System.currentTimeMillis() - startTime4) / 1000 + "秒");
106   - System.out.println("*********************************总用时" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
  107 + System.out.println("家庭成员数据批量新增所用时间" + (System.currentTimeMillis() - startTime1) / 1000 + "秒");
  108 + long startTime3 = System.currentTimeMillis();
  109 + schoolService.insertBatch(resultSchools);
  110 + System.out.println("学籍信息入库时间" + (System.currentTimeMillis() - startTime3) / 1000 + "秒");
  111 + long startTime4 = System.currentTimeMillis();
  112 + getSourceData();
  113 + System.out.println("来源数据入库时间" + (System.currentTimeMillis() - startTime4) / 1000 + "秒");
  114 + System.out.println("*********************************总用时" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
  115 + }
107 116 // return Result.OK("操作成功");
108 117 } catch (Exception e) {
109 118 e.printStackTrace();
... ... @@ -166,7 +175,7 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
166 175 System.out.println("线程池查询用时" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
167 176 long startTime1 = System.currentTimeMillis();
168 177 //户籍信息
169   - Map<String,Family> familyMap = gaHouseholdService.getGaHouseholds(gaDemographics);
  178 + Map<String, Family> familyMap = gaHouseholdService.getGaHouseholds(gaDemographics);
170 179 // Map<String,Family> familyMap = queryByThread();
171 180 System.out.println("户籍信息数据查询时间" + (System.currentTimeMillis() - startTime1) / 1000 + "秒");
172 181 System.out.println("判断初高中学生信息和人口基本信息的交集和差集开始时间" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
... ... @@ -301,9 +310,9 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
301 310 public void judgeHighSchoolsAndDemographics(List<MrSchool> highSchools, Map<String, String> gaDemographics, List<SysDepart> departs,
302 311 List<SpecialStudentVo> specialStudentVos, List<String> mzUnsupporteds, List<String> mzLeftBehinds,
303 312 List<String> mzAdoptions, Map<String, Family> gaHouseholds, Map<String, GaPunishVo> gaPunishVos, List<String> mzOrphans
304   - ) {
  313 + ) {
305 314 try {
306   - System.out.println("判断初高中学生信息和人口基本信息的交集和差集zongsuo"+highSchools.size());
  315 + System.out.println("判断初高中学生信息和人口基本信息的交集和差集zongsuo" + highSchools.size());
307 316 long startTime = System.currentTimeMillis();
308 317 //不在人口基本信息的初高中学生信息
309 318 List<MrSchool> highSchoolDifference = new ArrayList<>();
... ... @@ -327,10 +336,10 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
327 336 }
328 337 });
329 338 }
330   - System.out.println("初高中交集数"+highSchoolIntersection.size());
331   - System.out.println("不在初高中学生信息里的的人口基本信息"+gaDemographicDifference.size());
332   - System.out.println("不在人口基本信息的初高中学生信息"+highSchoolDifference.size());
333   - doSchoolDifference(highSchoolDifference, departs, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans,gaHouseholds);
  339 + System.out.println("初高中交集数" + highSchoolIntersection.size());
  340 + System.out.println("不在初高中学生信息里的的人口基本信息" + gaDemographicDifference.size());
  341 + System.out.println("不在人口基本信息的初高中学生信息" + highSchoolDifference.size());
  342 + doSchoolDifference(highSchoolDifference, departs, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans, gaHouseholds);
334 343 doIntersection(highSchoolIntersection, departs, gaHouseholds, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans);
335 344 doDemographicDifference(gaDemographicDifference, departs, gaHouseholds, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans);
336 345 } catch (Exception e) {
... ... @@ -345,9 +354,9 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
345 354 public void judgePrimarySchoolsAndDemographics(List<MrSchool> primarySchools, Map<String, String> gaDemographics, List<SysDepart> departs,
346 355 List<SpecialStudentVo> specialStudentVos, List<String> mzUnsupporteds, List<String> mzLeftBehinds,
347 356 List<String> mzAdoptions, Map<String, Family> gaHouseholds, Map<String, GaPunishVo> gaPunishVos, List<String> mzOrphans
348   - ) {
  357 + ) {
349 358 try {
350   - System.out.println("判断幼小学生信息和人口基本信息的交集和差集总数"+primarySchools.size());
  359 + System.out.println("判断幼小学生信息和人口基本信息的交集和差集总数" + primarySchools.size());
351 360 long startTime = System.currentTimeMillis();
352 361 //不在人口基本信息的幼小学生信息
353 362 List<MrSchool> primarySchoolDifference = new ArrayList<>();
... ... @@ -371,10 +380,10 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
371 380 }
372 381 });
373 382 }
374   - System.out.println("不在人口基本信息的幼小学生信息"+primarySchoolDifference.size());
375   - System.out.println("不在幼小学生信息里的的人口基本信息"+gaDemographicDifference.size());
376   - System.out.println("幼小学生信息和人口基本信息的交集"+primarySchoolIntersection.size());
377   - doSchoolDifference(primarySchoolDifference, departs, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans,gaHouseholds);
  383 + System.out.println("不在人口基本信息的幼小学生信息" + primarySchoolDifference.size());
  384 + System.out.println("不在幼小学生信息里的的人口基本信息" + gaDemographicDifference.size());
  385 + System.out.println("幼小学生信息和人口基本信息的交集" + primarySchoolIntersection.size());
  386 + doSchoolDifference(primarySchoolDifference, departs, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans, gaHouseholds);
378 387 doIntersection(primarySchoolIntersection, departs, gaHouseholds, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans);
379 388 doDemographicDifference(gaDemographicDifference, departs, gaHouseholds, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans);
380 389 } catch (Exception e) {
... ... @@ -422,7 +431,7 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
422 431 }
423 432 System.out.println("处理有人口基本信息但没有学籍信息的数据的所属单位结束" + (System.currentTimeMillis() - startTime) / 1000);
424 433 System.out.println("处理有人口基本信息但没有学籍信息的数据jieshu**********************" + minorMap.size());
425   - getSpecialStudent(minorMap, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans,gaHouseholds);
  434 + getSpecialStudent(minorMap, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans, gaHouseholds);
426 435 } catch (Exception e) {
427 436 e.printStackTrace();
428 437 log.info(String.valueOf(e));
... ... @@ -436,7 +445,7 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
436 445 long startTime = System.currentTimeMillis();
437 446 try {
438 447 Map<String, Minor> minorMap = new HashMap<>();
439   - Map<String, MrSchool> schools=new HashMap<>();
  448 + Map<String, MrSchool> schools = new HashMap<>();
440 449 System.out.println("***********************交集总数**********************************************" + intersection.size());
441 450 System.out.println("***********************人口信息总数**********************************************" + gaHouseholds.size());
442 451 System.out.println("交集数据里的所属单位开始" + (System.currentTimeMillis() - startTime) / 1000);
... ... @@ -445,20 +454,20 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
445 454 for (SysDepart d : departs) {
446 455 intersection.stream().forEach(i -> {
447 456 Minor minor = new Minor();
448   - if (oConvertUtils.isNotEmpty(i) && oConvertUtils.isNotEmpty(i.getIdentity()) && IDNumberUtil.checkID(i.getIdentity())) {
449   - String idCard=i.getIdentity();
450   - if(oConvertUtils.isNotEmpty(minorMap) && oConvertUtils.isNotEmpty(minorMap.get(idCard))){
451   - Minor minMap=minorMap.get(idCard);
452   - if(oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(minMap.getSchoolName())){
  457 + if (oConvertUtils.isNotEmpty(i) && oConvertUtils.isNotEmpty(i.getIdentity()) && IDNumberUtil.checkID(i.getIdentity())) {
  458 + String idCard = i.getIdentity();
  459 + if (oConvertUtils.isNotEmpty(minorMap) && oConvertUtils.isNotEmpty(minorMap.get(idCard))) {
  460 + Minor minMap = minorMap.get(idCard);
  461 + if (oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(minMap.getSchoolName())) {
453 462 if (d.getCommonCode().contains(minMap.getSchoolName())) {
454 463 minMap.setSysOrgCode(d.getOrgCode());
455 464 i.setSysOrgCode(d.getOrgCode());
456 465 }
457 466 }
458   - if(oConvertUtils.isNotEmpty(schools) && oConvertUtils.isNotEmpty(schools.get(idCard)) ){
  467 + if (oConvertUtils.isNotEmpty(schools) && oConvertUtils.isNotEmpty(schools.get(idCard))) {
459 468 schools.get(idCard).setSysOrgCode(i.getSysOrgCode());
460 469 }
461   - }else {
  470 + } else {
462 471 if (oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(i.getSchool())) {
463 472 if (d.getCommonCode().contains(i.getSchool())) {
464 473 minor.setSysOrgCode(d.getOrgCode());
... ... @@ -493,12 +502,12 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
493 502 });
494 503 }
495 504 }
496   - System.out.println("交集数据里的所属单位结束总数"+intersection.size()+"交集数据里的所属单位结束" + (System.currentTimeMillis() - startTime) / 1000);
  505 + System.out.println("交集数据里的所属单位结束总数" + intersection.size() + "交集数据里的所属单位结束" + (System.currentTimeMillis() - startTime) / 1000);
497 506 System.out.println("户籍信息里的未成年人的单位" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
498 507 long startTime1 = System.currentTimeMillis();
499 508 resultSchools.addAll(schools.values());
500 509 System.out.println("人口信息he交集总数总数" + minorMap.size());
501   - getSpecialStudent(minorMap, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans,gaHouseholds);
  510 + getSpecialStudent(minorMap, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans, gaHouseholds);
502 511 } catch (Exception e) {
503 512 e.printStackTrace();
504 513 log.info(String.valueOf(e));
... ... @@ -511,7 +520,7 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
511 520 List<String> mzOrphans, Map<String, Family> gaHouseholds) {
512 521 try {
513 522 Map<String, Minor> minorMap = new HashMap<>();
514   - Map<String, MrSchool> schools=new HashMap<>();
  523 + Map<String, MrSchool> schools = new HashMap<>();
515 524 long startTime = System.currentTimeMillis();
516 525 System.out.println("根据差集数据判断所属单位" + difference.size());
517 526 System.out.println("未成年人总数" + difference.size());
... ... @@ -520,20 +529,20 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
520 529 for (SysDepart d : departs) {
521 530 difference.stream().forEach(m -> {
522 531 Minor minor = new Minor();
523   - if (oConvertUtils.isNotEmpty(m) && oConvertUtils.isNotEmpty(m.getIdentity()) && IDNumberUtil.checkID(m.getIdentity())) {
524   - String idCard=m.getIdentity();
525   - if(oConvertUtils.isNotEmpty(minorMap) && oConvertUtils.isNotEmpty(minorMap.get(idCard))){
526   - Minor minMap=minorMap.get(idCard);
527   - if(oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(minMap.getSchoolName())){
528   - if (d.getCommonCode().contains(minMap.getSchoolName())) {
529   - minMap.setSysOrgCode(d.getOrgCode());
530   - m.setSysOrgCode(d.getOrgCode());
531   - }
532   - }
533   - if(oConvertUtils.isNotEmpty(schools) && oConvertUtils.isNotEmpty(schools.get(idCard)) ){
  532 + if (oConvertUtils.isNotEmpty(m) && oConvertUtils.isNotEmpty(m.getIdentity()) && IDNumberUtil.checkID(m.getIdentity())) {
  533 + String idCard = m.getIdentity();
  534 + if (oConvertUtils.isNotEmpty(minorMap) && oConvertUtils.isNotEmpty(minorMap.get(idCard))) {
  535 + Minor minMap = minorMap.get(idCard);
  536 + if (oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(minMap.getSchoolName())) {
  537 + if (d.getCommonCode().contains(minMap.getSchoolName())) {
  538 + minMap.setSysOrgCode(d.getOrgCode());
  539 + m.setSysOrgCode(d.getOrgCode());
  540 + }
  541 + }
  542 + if (oConvertUtils.isNotEmpty(schools) && oConvertUtils.isNotEmpty(schools.get(idCard))) {
534 543 schools.get(idCard).setSysOrgCode(m.getSysOrgCode());
535 544 }
536   - }else {
  545 + } else {
537 546 if (oConvertUtils.isNotEmpty(d) && oConvertUtils.isNotEmpty(d.getCommonCode()) && oConvertUtils.isNotEmpty(m.getSchool())) {
538 547 if (d.getCommonCode().contains(m.getSchool())) {
539 548 minor.setSysOrgCode(d.getOrgCode());
... ... @@ -574,7 +583,7 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
574 583 // schoolService.insertBatch(difference);
575 584 resultSchools.addAll(schools.values());
576 585 System.out.println("有学籍信息但没有人口基本信息的数据" + minorMap.size());
577   - getSpecialStudent(minorMap, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans,gaHouseholds);
  586 + getSpecialStudent(minorMap, specialStudentVos, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans, gaHouseholds);
578 587 } catch (Exception e) {
579 588 e.printStackTrace();
580 589 log.info(String.valueOf(e));
... ... @@ -602,7 +611,7 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
602 611 }
603 612 }
604 613 System.out.println("根据特殊学校信息判断未成年人重点关注原因和备注" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
605   - getUnsupported(minors, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans,gaHouseholds);
  614 + getUnsupported(minors, mzUnsupporteds, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans, gaHouseholds);
606 615 } catch (Exception e) {
607 616 e.printStackTrace();
608 617 log.info(String.valueOf(e));
... ... @@ -618,20 +627,20 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
618 627 long startTime = System.currentTimeMillis();
619 628 try {
620 629 System.out.println("无人抚养");
621   - String reason="无人抚养";
  630 + String reason = "无人抚养";
622 631 if (oConvertUtils.isNotEmpty(minors) && oConvertUtils.isNotEmpty(mzUnsupporteds)) {
623 632 for (String s : mzUnsupporteds) {
624   - if (oConvertUtils.isNotEmpty(s) && oConvertUtils.isNotEmpty(minors.get(s)) ) {
625   - if(oConvertUtils.isNotEmpty(minors.get(s).getReason()) && !minors.get(s).getReason().contains(reason)){
626   - minors.get(s).setReason(minors.get(s).getReason()+","+reason);
627   - }else {
  633 + if (oConvertUtils.isNotEmpty(s) && oConvertUtils.isNotEmpty(minors.get(s))) {
  634 + if (oConvertUtils.isNotEmpty(minors.get(s).getReason()) && !minors.get(s).getReason().contains(reason)) {
  635 + minors.get(s).setReason(minors.get(s).getReason() + "," + reason);
  636 + } else {
628 637 minors.get(s).setReason(reason);
629 638 }
630 639 }
631 640 }
632 641 }
633 642 System.out.println("无人抚养" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
634   - getAdoption(minors, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans,gaHouseholds);
  643 + getAdoption(minors, mzLeftBehinds, mzAdoptions, gaPunishVos, mzOrphans, gaHouseholds);
635 644 } catch (Exception e) {
636 645 e.printStackTrace();
637 646 log.info(String.valueOf(e));
... ... @@ -646,20 +655,20 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
646 655 long startTime = System.currentTimeMillis();
647 656 try {
648 657 System.out.println("收养");
649   - String reason="收养";
  658 + String reason = "收养";
650 659 if (oConvertUtils.isNotEmpty(minors) && oConvertUtils.isNotEmpty(mzAdoptions)) {
651 660 for (String s : mzAdoptions) {
652   - if (oConvertUtils.isNotEmpty(s) && oConvertUtils.isNotEmpty(minors.get(s)) ) {
653   - if(oConvertUtils.isNotEmpty(minors.get(s).getReason()) && !minors.get(s).getReason().contains(reason)){
654   - minors.get(s).setReason(minors.get(s).getReason()+","+reason);
655   - }else {
  661 + if (oConvertUtils.isNotEmpty(s) && oConvertUtils.isNotEmpty(minors.get(s))) {
  662 + if (oConvertUtils.isNotEmpty(minors.get(s).getReason()) && !minors.get(s).getReason().contains(reason)) {
  663 + minors.get(s).setReason(minors.get(s).getReason() + "," + reason);
  664 + } else {
656 665 minors.get(s).setReason(reason);
657 666 }
658 667 }
659 668 }
660 669 }
661 670 System.out.println("收养" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
662   - getChildren(minors, mzLeftBehinds, gaPunishVos, mzOrphans,gaHouseholds);
  671 + getChildren(minors, mzLeftBehinds, gaPunishVos, mzOrphans, gaHouseholds);
663 672 } catch (Exception e) {
664 673 e.printStackTrace();
665 674 log.info(String.valueOf(e));
... ... @@ -674,20 +683,20 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
674 683 long startTime = System.currentTimeMillis();
675 684 try {
676 685 System.out.println("留守儿童");
677   - String reason="留守儿童";
  686 + String reason = "留守儿童";
678 687 if (oConvertUtils.isNotEmpty(minors) && oConvertUtils.isNotEmpty(mzLeftBehinds)) {
679 688 for (String s : mzLeftBehinds) {
680 689 if (oConvertUtils.isNotEmpty(s) && oConvertUtils.isNotEmpty(minors.get(s))) {
681   - if(oConvertUtils.isNotEmpty(minors.get(s).getReason()) && !minors.get(s).getReason().contains(reason)){
682   - minors.get(s).setReason(minors.get(s).getReason()+","+reason);
683   - }else {
  690 + if (oConvertUtils.isNotEmpty(minors.get(s).getReason()) && !minors.get(s).getReason().contains(reason)) {
  691 + minors.get(s).setReason(minors.get(s).getReason() + "," + reason);
  692 + } else {
684 693 minors.get(s).setReason(reason);
685 694 }
686 695 }
687 696 }
688 697 }
689 698 System.out.println("留守儿童" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
690   - getOrphan(minors, mzOrphans, gaPunishVos,gaHouseholds);
  699 + getOrphan(minors, mzOrphans, gaPunishVos, gaHouseholds);
691 700 } catch (Exception e) {
692 701 e.printStackTrace();
693 702 log.info(String.valueOf(e));
... ... @@ -702,20 +711,20 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
702 711 long startTime = System.currentTimeMillis();
703 712 try {
704 713 System.out.println("孤儿");
705   - String reason="孤儿";
  714 + String reason = "孤儿";
706 715 if (oConvertUtils.isNotEmpty(minors) && oConvertUtils.isNotEmpty(mzOrphans)) {
707 716 for (String s : mzOrphans) {
708   - if (oConvertUtils.isNotEmpty(s) && oConvertUtils.isNotEmpty(minors.get(s)) ) {
709   - if(oConvertUtils.isNotEmpty(minors.get(s).getReason()) && !minors.get(s).getReason().contains(reason)){
710   - minors.get(s).setReason(minors.get(s).getReason()+","+reason);
711   - }else {
  717 + if (oConvertUtils.isNotEmpty(s) && oConvertUtils.isNotEmpty(minors.get(s))) {
  718 + if (oConvertUtils.isNotEmpty(minors.get(s).getReason()) && !minors.get(s).getReason().contains(reason)) {
  719 + minors.get(s).setReason(minors.get(s).getReason() + "," + reason);
  720 + } else {
712 721 minors.get(s).setReason(reason);
713 722 }
714 723 }
715 724 }
716 725 }
717 726 System.out.println("孤儿" + (System.currentTimeMillis() - startTime) / 1000 + "秒");
718   - getFamilyRelationship(minors, gaHouseholds,gaPunishVos);
  727 + getFamilyRelationship(minors, gaHouseholds, gaPunishVos);
719 728 } catch (Exception e) {
720 729 e.printStackTrace();
721 730 log.info(String.valueOf(e));
... ... @@ -758,7 +767,7 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
758 767 } else {
759 768 family.setCrime(Family.IS_NOT_CRIME);
760 769 }
761   - if(IDNumberUtil.checkID(mapKey)){
  770 + if (IDNumberUtil.checkID(mapKey)) {
762 771 family.setGender(Integer.valueOf(IDNumberUtil.judgeGender(mapKey)));
763 772 }
764 773 family.setId(String.valueOf(id));
... ... @@ -777,7 +786,7 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
777 786 familyMaps.put(houseNum, list);
778 787 }
779 788 if (oConvertUtils.isNotEmpty(family.getCrime()) && family.getCrime().equals(Family.IS_CRIME)) {
780   - idCards.put(houseNum,true);
  789 + idCards.put(houseNum, true);
781 790 }
782 791 if (oConvertUtils.isNotEmpty(family.getRelation()) && family.getRelation().equals("户主")) {
783 792 guardians.put(houseNum, family);
... ... @@ -808,7 +817,7 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
808 817 List<Minor> minorList = new ArrayList<>();
809 818 Set<Family> familys = new HashSet<>();
810 819 //存放已经被放入minor里面的户号,避免数据重复
811   - Set<String> houseSet=new HashSet<>();
  820 + Set<String> houseSet = new HashSet<>();
812 821 Date createTime = new Date();
813 822 if (oConvertUtils.isNotEmpty(minors)) {
814 823 for (Map.Entry<String, Minor> m : minors.entrySet()) {
... ... @@ -818,12 +827,12 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
818 827 if (oConvertUtils.isNotEmpty(user) && oConvertUtils.isNotEmpty(user.getId())) {
819 828 minor.setCreateBy(user.getId());
820 829 }
821   - List<Family> f=new ArrayList<>();
  830 + List<Family> f = new ArrayList<>();
822 831 //判断是否是一个家庭的人员
823 832 if (oConvertUtils.isNotEmpty(minor.getHouseholdNum())) {
824 833 String houseNum = minor.getHouseholdNum();
825   - if(oConvertUtils.isNotEmpty(familyMaps) && oConvertUtils.isNotEmpty(familyMaps.get(houseNum)) ){
826   - if(!testIdCards.contains(houseNum) ){
  834 + if (oConvertUtils.isNotEmpty(familyMaps) && oConvertUtils.isNotEmpty(familyMaps.get(houseNum))) {
  835 + if (!testIdCards.contains(houseNum)) {
827 836 f.addAll(familyMaps.get(houseNum));
828 837 // resultFamily.addAll(familyMaps.get(houseNum));
829 838 testIdCards.add(houseNum);
... ... @@ -834,12 +843,12 @@ public class AnalyzeController extends JeecgController&lt;GaDemographic, IGaDemogra
834 843 if (oConvertUtils.isNotEmpty(houseNum) && oConvertUtils.isNotEmpty(guardians) && oConvertUtils.isNotEmpty(guardians.get(houseNum))) {
835 844 minor.setGuardian(guardians.get(houseNum).getName());
836 845 }
837   - String reason="家庭成员有犯罪记录";
  846 + String reason = "家庭成员有犯罪记录";
838 847 //判断家庭成员是否犯罪
839 848 if (oConvertUtils.isNotEmpty(idCard) && oConvertUtils.isNotEmpty(idCards) && oConvertUtils.isNotEmpty(idCards.get(houseNum))) {
840   - if(oConvertUtils.isNotEmpty(minor.getReason()) && !minor.getReason().contains(reason)){
841   - minor.setReason(minor.getReason()+","+reason);
842   - }else {
  849 + if (oConvertUtils.isNotEmpty(minor.getReason()) && !minor.getReason().contains(reason)) {
  850 + minor.setReason(minor.getReason() + "," + reason);
  851 + } else {
843 852 minor.setReason(reason);
844 853 }
845 854 }
... ...
juvenile-prosecution-boot/jeecg-boot-module-system/src/main/resources/application-dev.yml
... ... @@ -311,4 +311,12 @@ third-app:
311 311 client-id: ??
312 312 # appSecret
313 313 client-secret: ??
314   - agent-id: ??
315 314 \ No newline at end of file
  315 + agent-id: ??
  316 +#Spark配置
  317 +spark:
  318 + datasource:
  319 + url: jdbc:mysql://192.168.1.201:3306/juvenile-prosecution?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true
  320 + username: root
  321 + password: mx123456
  322 + driver: com.mysql.cj.jdbc.Driver
  323 + enabled: true
316 324 \ No newline at end of file
... ...