Commit 8b9a480b by qiuchaofei

1.优化同步数据到neo4j的性能

parent 6acdf9e7
......@@ -12,6 +12,8 @@ import java.util.List;
public interface Neo4jSystemRepository extends Neo4jRepository<Neo4jSystem, Long> {
List<Neo4jSystem> findNeo4jSystemByMetadataId(String metadataId);
List<Neo4jSystem> findNeo4jSystemByName(String metadataName);
// @Query("MATCH p=(n:MetaData {metadataId:{parentId}})-[r:Composition]->(m) RETURN m limit {limit} ")
@Query("match (n:Neo4jSystem{metadataId:{systemId}} )-[r:Composition]->(m) return m ")
List<Neo4jSchema> getSchemaBySystemId(@Param("systemId") String systemId);
......
package com.keymobile.metadata.metadataRelation.service;
import com.keymobile.metadata.metadataRelation.controller.MetaDataController;
import com.keymobile.metadata.metadataRelation.pojo.mongo.MongoData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
......@@ -22,6 +25,7 @@ import java.util.concurrent.Executor;
@EnableAsync
public class AsyncDataFromMongoToNeo4j {
private static final Logger logger = LoggerFactory.getLogger(AsyncDataFromMongoToNeo4j.class);
@Autowired
private IMetadataService metadataService;
......@@ -33,7 +37,7 @@ public class AsyncDataFromMongoToNeo4j {
@Async
public void asyncDataFromMongoToNeo4j(String catalogName) {
long start = System.currentTimeMillis();
String sql = "select scope_id, scope_name from auth_scope";
List<MongoData> mongoDataList = jdbcTemplate.query(sql, new RowMapper<MongoData>() {
@Override
......@@ -57,8 +61,10 @@ public class AsyncDataFromMongoToNeo4j {
// // 同步其他数据(模型,标准,质量等)
//
// //同步作业内部的临时数据
// metadataService.deleteTempNodeByCatalogName(catalogName);
// metadataService.createTempNode(catalogName);
metadataService.deleteTempNodeByCatalogName(catalogName);
metadataService.createTempNode(catalogName);
long end = System.currentTimeMillis();
logger.info("同步数据完成,用时:"+(end-start));
}
......
......@@ -70,6 +70,7 @@ public class MetadataServiceImpl implements IMetadataService {
@Autowired
private MetadataRepoRemoteService metadataRepoRemoteService;
@Autowired
private Neo4jSystemRepository neo4jSystemRepository;
@Autowired
......@@ -1237,10 +1238,16 @@ public class MetadataServiceImpl implements IMetadataService {
}
//从mongodb同步元数据到neo4j
@Override
public void syschroMetadataFromMongo(String catalogName,List<MongoData> systemDataList) {
int schemaCount = 1;
int tableCount = 1;
int viewCount = 1;
int functionCount = 1;
int procedureCount = 1;
String collectionName = "Metadata_"+catalogName;
List<String> layer = new ArrayList<>();
......@@ -1265,16 +1272,16 @@ public class MetadataServiceImpl implements IMetadataService {
neo4jSystem.setCnName(systemData.getCnName());
String systemPath = catalogName+";"+systemData.getName();
neo4jSystem.setDataPath(systemPath);
//如果存在,就不用创建
List<Neo4jSystem> neo4jSchemaList = neo4jSystemRepository.findNeo4jSystemByMetadataId(systemId);
//用名称查找,如果存在,就不用创建
List<Neo4jSystem> neo4jSchemaList = neo4jSystemRepository.findNeo4jSystemByName(systemData.getName());
if(neo4jSchemaList==null || neo4jSchemaList.size()==0){
neo4jSystemRepository.save(neo4jSystem);
}
systemDataMap.put(systemData.get_id(),neo4jSystem);
}
List<CompositionRelation> compositionRelationList = new ArrayList<>();
for(MongoData catalogData:catalogDataList){
List<MongoData> dataBaseDataList= mongoDbServiceImpl.findDataByparentId(catalogData.get_id(),collectionName);
......@@ -1296,63 +1303,111 @@ public class MetadataServiceImpl implements IMetadataService {
}
String schemaPath = neo4jSystem.getDataPath()+";"+schemaData.getName();
neo4jSchema.setDataPath(schemaPath);
//如果存在,就不用创建
int ran1 = r.nextInt(6);
if(ran1>6){
ran1 = 6;
}
if(neo4jSchema.getLabel()==null || neo4jSchema.getLabel().equals("")){
String label = layer.get(ran1);
neo4jSchema.setLabel(label);
}
//如果存在,就不用创建
List<Neo4jSchema> neo4jSchemaList = neo4jSchemaRepository.findNeo4jSchemaByMetadataId(neo4jSchema.getMetadataId());
if(neo4jSchemaList==null || neo4jSchemaList.size()==0){
neo4jSchemaRepository.save(neo4jSchema);
}
relationshipService.saveRelation(neo4jSystem.getMetadataId(),neo4jSchema.getMetadataId(),"Composition");
CompositionRelation compositionRelation = new CompositionRelation();
compositionRelation.setStart(neo4jSystem);
compositionRelation.setEnd(neo4jSchema);
compositionRelation.setName("System--Schema");
compositionRelationList.add(compositionRelation);
// relationshipService.saveRelation(neo4jSystem.getMetadataId(),neo4jSchema.getMetadataId(),"Composition");
//根据schema获取表级数据,再按照类型分类:表,视图,作业,函数等
List<MongoData> tableClassDataList = mongoDbServiceImpl.findDataByparentId(schemaData.get_id(),collectionName);
int count=0;//获取前100个表测试
int count=1;//获取前100个表测试
List<Neo4jTable> neo4jTableList = new ArrayList<>();
List<Neo4jView> viewTableList = new ArrayList<>();
List<Neo4jFunction> functionTableList = new ArrayList<>();
List<Neo4jProcedure> procedureTableList = new ArrayList<>();
for(MongoData tableClassData:tableClassDataList){
if(count >20){
// break;
}
count++;
String metadataId = tableClassData.get_id();
String endId = tableClassData.get_id();
String tablePath = schemaPath+";"+tableClassData.getName();
neo4jSchema.setDataPath(schemaPath);
if(metadataId.startsWith("Table=")){
if(tableCount %100==0){
// break;
logger.info("同步了:"+tableCount+"个表。");
neo4jTableRepository.saveAll(neo4jTableList);
neo4jTableList.clear();
}
Neo4jTable neo4jTable = new Neo4jTable();
neo4jTable.setMetadataId(tableClassData.get_id());
neo4jTable.setName(tableClassData.getName());
neo4jTable.setCnName(tableClassData.getCnName());
neo4jTable.setDataPath(tablePath);
List<Neo4jTable> neo4jTableList = neo4jTableRepository.findNeo4jTableByMetadataId(metadataId);
if(neo4jTableList==null || neo4jTableList.size()==0){
neo4jTableRepository.save(neo4jTable);
List<Neo4jTable> neo4jTableListExist = neo4jTableRepository.findNeo4jTableByMetadataId(metadataId);
if(neo4jTableListExist == null || neo4jTableListExist.size() == 0){
neo4jTableList.add(neo4jTable);
CompositionRelation schem2Table = new CompositionRelation();
schem2Table.setStart(neo4jSchema);
schem2Table.setEnd(neo4jTable);
schem2Table.setName("System--Schema");
compositionRelationList.add(schem2Table);
tableCount++;
}
}else if(metadataId.startsWith("View=")){
if(viewCount %100==0){
// break;
logger.info("同步了:"+viewCount+"个表。");
neo4jViewRepository.saveAll(viewTableList);
viewTableList.clear();
}
Neo4jView neo4jView = new Neo4jView();
neo4jView.setMetadataId(tableClassData.get_id());
neo4jView.setName(tableClassData.getName());
neo4jView.setCnName(tableClassData.getCnName());
neo4jView.setDataPath(tablePath);
List<Neo4jView> neo4jTableList = neo4jViewRepository.findNeo4jViewByMetadataId(metadataId);
if(neo4jTableList==null || neo4jTableList.size()==0){
neo4jViewRepository.save(neo4jView);
List<Neo4jView> neo4jViewList = neo4jViewRepository.findNeo4jViewByMetadataId(metadataId);
if(neo4jViewList==null || neo4jViewList.size()==0){
neo4jViewList.add(neo4jView);
CompositionRelation schem2View = new CompositionRelation();
schem2View.setStart(neo4jSchema);
schem2View.setEnd(neo4jView);
schem2View.setName("Schema--View");
compositionRelationList.add(schem2View);
}
}else if(metadataId.startsWith("Function=")){
if(functionCount %100==0){
// break;
logger.info("同步了:"+functionCount+"个函数。");
neo4jFunctionRepository.saveAll(functionTableList);
functionTableList.clear();
}
Neo4jFunction neo4jFunction = new Neo4jFunction();
neo4jFunction.setMetadataId(tableClassData.get_id());
neo4jFunction.setName(tableClassData.getName());
......@@ -1361,9 +1416,25 @@ public class MetadataServiceImpl implements IMetadataService {
neo4jFunction.setDataPath(tablePath);
List<Neo4jFunction> neo4jFunctionList = neo4jFunctionRepository.findNeo4jFunctionByMetadataId(metadataId);
if(neo4jFunctionList==null || neo4jFunctionList.size()==0){
neo4jFunctionRepository.save(neo4jFunction);
functionTableList.add(neo4jFunction);
// neo4jFunctionRepository.save(neo4jFunction);
CompositionRelation schem2Function = new CompositionRelation();
schem2Function.setStart(neo4jSchema);
schem2Function.setEnd(neo4jFunction);
schem2Function.setName("Schema--Function");
compositionRelationList.add(schem2Function);
functionCount++;
}
}else if(metadataId.startsWith("Procedure=")){
if(procedureCount %100==0){
// break;
logger.info("同步了:"+procedureCount+"个函数。");
neo4jProcedureRepository.saveAll(procedureTableList);
procedureTableList.clear();
}
Neo4jProcedure neo4jProcedure = new Neo4jProcedure();
neo4jProcedure.setMetadataId(tableClassData.get_id());
neo4jProcedure.setName(tableClassData.getName());
......@@ -1371,23 +1442,59 @@ public class MetadataServiceImpl implements IMetadataService {
neo4jProcedure.setDataPath(tablePath);
List<Neo4jProcedure> neo4jProcedureList = neo4jProcedureRepository.findNeo4jProcedureByMetadataId(metadataId);
if(neo4jProcedureList==null || neo4jProcedureList.size()==0){
neo4jProcedureRepository.save(neo4jProcedure);
neo4jProcedureList.add(neo4jProcedure);
// neo4jProcedureRepository.save(neo4jProcedure);
CompositionRelation schem2Procedure = new CompositionRelation();
schem2Procedure.setStart(neo4jSchema);
schem2Procedure.setEnd(neo4jProcedure);
schem2Procedure.setName("Schema--Procedure");
compositionRelationList.add(schem2Procedure);
procedureCount++;
}
}else{
}
relationshipService.saveRelation(neo4jSchema.getMetadataId(),endId,"Composition");
// relationshipService.saveRelation(neo4jSchema.getMetadataId(),endId,"Composition");
}
if(neo4jTableList!=null && neo4jTableList.size()!=0){
neo4jTableRepository.saveAll(neo4jTableList);
}
}
if(viewTableList!=null && viewTableList.size()!=0){
neo4jViewRepository.saveAll(viewTableList);
}
if(functionTableList!=null && functionTableList.size()!=0){
neo4jFunctionRepository.saveAll(functionTableList);
}
if(procedureTableList != null && procedureTableList.size()!=0){
neo4jProcedureRepository.saveAll(procedureTableList);
}
}
}
}
int count = 0;
List<CompositionRelation> newCompositionList= new ArrayList<>();
for(CompositionRelation compositionRelation:compositionRelationList){
newCompositionList.add(compositionRelation);
if(count%100==0){
compositionRelationRespository.saveAll(newCompositionList);
newCompositionList.clear();
}
count++;
}
if(newCompositionList.size()!=0){
compositionRelationRespository.saveAll(newCompositionList);
}
}
@Override
public void syschroTable2EtlJobRelations(String catalogName) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment