Commit 81920ba8 by qiuchaofei

同步 视图,存储过程等对象

parent e5fc656c
...@@ -90,6 +90,8 @@ public class AsyncDataFromMongoToNeo4j { ...@@ -90,6 +90,8 @@ public class AsyncDataFromMongoToNeo4j {
// // 同步其他数据(模型,标准,质量等) // // 同步其他数据(模型,标准,质量等)
// //
// //同步作业内部的临时数据 // //同步作业内部的临时数据
logger.info("开始处理临时节点。。。");
metadataService.deleteTempNodeByCatalogName(catalogName); metadataService.deleteTempNodeByCatalogName(catalogName);
metadataService.createTempNode(catalogName); metadataService.createTempNode(catalogName);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
......
...@@ -23,4 +23,9 @@ public interface ISchemaService { ...@@ -23,4 +23,9 @@ public interface ISchemaService {
Map<String,String> getColumnIdFromSchemaId(String schemaId); Map<String,String> getColumnIdFromSchemaId(String schemaId);
Map<String, String> getViewIdFromSchemaId(String schemaId);
Map<String, String> getFunctionIdFromSchemaId(String schemaId);
Map<String, String> getProcedureIdFromSchemaId(String schemaId);
} }
...@@ -36,7 +36,6 @@ import org.springframework.data.domain.PageRequest; ...@@ -36,7 +36,6 @@ import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
...@@ -1330,6 +1329,20 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1330,6 +1329,20 @@ public class MetadataServiceImpl implements IMetadataService {
} }
logger.info("需要删除的表个数:"+ tableIdMapFromNeo4j.size()); logger.info("需要删除的表个数:"+ tableIdMapFromNeo4j.size());
List<Neo4jTable> neo4jTables = new ArrayList<>();
for(Object obj : tableIdMapFromNeo4j.keySet()){
String deleteTableId = (String ) obj;
Neo4jTable neo4jTable = neo4jTableRepository.findNeo4jTableByMetadataId(deleteTableId);
neo4jTables.add(neo4jTable);
}
if(neo4jTables.size()!=0){
neo4jTableRepository.deleteAll(neo4jTables);
}
logger.info("需要删除的表完成。");
tableIdMapFromNeo4j.clear();
if(neo4jTableList.size()!=0){ if(neo4jTableList.size()!=0){
neo4jTableRepository.saveAll(neo4jTableList); neo4jTableRepository.saveAll(neo4jTableList);
logger.info("继续同步同步了:"+neo4jTableList.size()+"个表。"); logger.info("继续同步同步了:"+neo4jTableList.size()+"个表。");
...@@ -1357,6 +1370,7 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1357,6 +1370,7 @@ public class MetadataServiceImpl implements IMetadataService {
logger.info("开始同步schema:"+schemaData.getName()); logger.info("开始同步schema:"+schemaData.getName());
//系统与schema的关系 //系统与schema的关系
String schemaData_id = schemaData.get_id(); String schemaData_id = schemaData.get_id();
Neo4jSchema neo4jSchema = new Neo4jSchema(); Neo4jSchema neo4jSchema = new Neo4jSchema();
neo4jSchema.setMetadataId(schemaData_id); neo4jSchema.setMetadataId(schemaData_id);
neo4jSchema.setName(schemaData.getName()); neo4jSchema.setName(schemaData.getName());
...@@ -1386,6 +1400,11 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1386,6 +1400,11 @@ public class MetadataServiceImpl implements IMetadataService {
Map<String,Neo4jTable> neo4jTableIdMap = new HashMap<>(); Map<String,Neo4jTable> neo4jTableIdMap = new HashMap<>();
saveAllTableByBatch(catalogName,compositionRelationList,neo4jSchemaMap,neo4jTableIdMap,neo4jSchema.getMetadataId()); saveAllTableByBatch(catalogName,compositionRelationList,neo4jSchemaMap,neo4jTableIdMap,neo4jSchema.getMetadataId());
long columnTime1 = System.currentTimeMillis(); long columnTime1 = System.currentTimeMillis();
saveAllViewByBatch(catalogName,compositionRelationList,neo4jSchemaMap,neo4jSchema.getMetadataId());
saveAllfunctionByBatch(catalogName,compositionRelationList,neo4jSchemaMap,neo4jSchema.getMetadataId());
saveAllProcedureByBatch(catalogName,compositionRelationList,neo4jSchemaMap,neo4jSchema.getMetadataId());
//专门做字段解析,批量获取字段,进行组装。 //专门做字段解析,批量获取字段,进行组装。
saveAllColumnByBatch(catalogName,compositionRelationList,neo4jTableIdMap,neo4jSchema.getMetadataId()); saveAllColumnByBatch(catalogName,compositionRelationList,neo4jTableIdMap,neo4jSchema.getMetadataId());
// Neo4jSchema neo4jSchemaList = neo4jSchemaRepository.findNeo4jSchemaByMetadataId(neo4jSchema.getMetadataId()); // Neo4jSchema neo4jSchemaList = neo4jSchemaRepository.findNeo4jSchemaByMetadataId(neo4jSchema.getMetadataId());
...@@ -1409,6 +1428,207 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1409,6 +1428,207 @@ public class MetadataServiceImpl implements IMetadataService {
objectCountMap.put(SchemaCountString,schemaCount); objectCountMap.put(SchemaCountString,schemaCount);
} }
private void saveAllProcedureByBatch(String catalogName, List<CompositionRelation> compositionRelationList,
Map<String, Neo4jSchema> neo4jSchemaMap, String schemaId) {
long time1 = System.currentTimeMillis();
Map<String, String> procedureIdMapFromNeo4j = schemaService.getProcedureIdFromSchemaId(schemaId);
long time2 = System.currentTimeMillis();
logger.info("从neo4j 获取 schema:"+ schemaId +" 的Procedure个数:"+procedureIdMapFromNeo4j.size() +" , 耗时:"+(time2-time1));
List<Neo4jProcedure> neo4jProcedureList = new ArrayList<>();
int procedureCount = 0;
long columnTime0 = System.currentTimeMillis();
//用游标的方式,不要用分页获取
int batchSize = 5000;
String tablePrefix = "Catalog,Database,Schema,Procedure";
BasicDBObject query = new BasicDBObject();
query.put("_class", tablePrefix);
query.put("parentId",schemaId);
MongoCursor<Document> procedureCursor = this.mongoTemplate.getCollection(PREFIX_METADATA_NODE + catalogName).
find(query).noCursorTimeout(true).batchSize(batchSize).iterator();
while (procedureCursor.hasNext()) {
Document procedureDocument = procedureCursor.next();
String procedureId =(String) procedureDocument.get("_id");
//存在了,就不写入
if(procedureIdMapFromNeo4j.containsKey(procedureId)){
procedureIdMapFromNeo4j.remove(procedureId);
continue;
}
String procedureName =(String) procedureDocument.get("name");
Neo4jSchema neo4jSchema = neo4jSchemaMap.get(schemaId);
String procedurePath = neo4jSchema.getDataPath()+";"+procedureName;
Neo4jProcedure neo4jProcedure = new Neo4jProcedure();
neo4jProcedure.setMetadataId(procedureId);
neo4jProcedure.setName(procedureName);
neo4jProcedure.setDataPath(procedurePath);
neo4jProcedureList.add(neo4jProcedure);
CompositionRelation schem2Table = new CompositionRelation();
schem2Table.setStart(neo4jSchema);
schem2Table.setEnd(neo4jProcedure);
schem2Table.setName("Schema--procedure");
compositionRelationList.add(schem2Table);
procedureCount++;
if(procedureCount % batchSize ==0){
neo4jProcedureRepository.saveAll(neo4jProcedureList);
logger.info("同步了:"+procedureCount+"个function。"+procedureId);
neo4jProcedureList.clear();
}
}
if(procedureCursor != null){
procedureCursor.close();
}
logger.info("需要删除的function个数:"+ procedureIdMapFromNeo4j.size());
if(neo4jProcedureList.size()!=0){
neo4jProcedureRepository.saveAll(neo4jProcedureList);
logger.info("继续同步同步了:"+neo4jProcedureList.size()+"个function。");
neo4jProcedureList.clear();
}
long columnTime1 = System.currentTimeMillis();
logger.info("新创建的function数量:"+procedureCount+" 用时:"+(columnTime1-columnTime0));
}
private void saveAllfunctionByBatch(String catalogName, List<CompositionRelation> compositionRelationList,
Map<String, Neo4jSchema> neo4jSchemaMap, String schemaId) {
long time1 = System.currentTimeMillis();
Map<String, String> functionIdMapFromNeo4j = schemaService.getFunctionIdFromSchemaId(schemaId);
long time2 = System.currentTimeMillis();
logger.info("从neo4j 获取 schema:"+ schemaId +" 的function个数:"+functionIdMapFromNeo4j.size() +" , 耗时:"+(time2-time1));
List<Neo4jFunction> neo4jFunctionList = new ArrayList<>();
int functionCount = 0;
long columnTime0 = System.currentTimeMillis();
//用游标的方式,不要用分页获取
int batchSize = 5000;
String tablePrefix = "Catalog,Database,Schema,Function";
BasicDBObject query = new BasicDBObject();
query.put("_class", tablePrefix);
query.put("parentId",schemaId);
MongoCursor<Document> functionCursor = this.mongoTemplate.getCollection(PREFIX_METADATA_NODE + catalogName).
find(query).noCursorTimeout(true).batchSize(batchSize).iterator();
while (functionCursor.hasNext()) {
Document functionDocument = functionCursor.next();
String functionId =(String) functionDocument.get("_id");
//存在了,就不写入
if(functionIdMapFromNeo4j.containsKey(functionId)){
functionIdMapFromNeo4j.remove(functionId);
continue;
}
String functionName =(String) functionDocument.get("name");
Neo4jSchema neo4jSchema = neo4jSchemaMap.get(schemaId);
String functionPath = neo4jSchema.getDataPath()+";"+functionName;
Neo4jFunction neo4jFunction = new Neo4jFunction();
neo4jFunction.setMetadataId(functionId);
neo4jFunction.setName(functionName);
neo4jFunction.setDataPath(functionPath);
neo4jFunctionList.add(neo4jFunction);
CompositionRelation schem2Table = new CompositionRelation();
schem2Table.setStart(neo4jSchema);
schem2Table.setEnd(neo4jFunction);
schem2Table.setName("Schema--function");
compositionRelationList.add(schem2Table);
functionCount++;
if(functionCount % batchSize ==0){
neo4jFunctionRepository.saveAll(neo4jFunctionList);
logger.info("同步了:"+functionCount+"个function。"+functionId);
neo4jFunctionList.clear();
}
}
if(functionCursor != null){
functionCursor.close();
}
logger.info("需要删除的function个数:"+ functionIdMapFromNeo4j.size());
if(neo4jFunctionList.size()!=0){
neo4jFunctionRepository.saveAll(neo4jFunctionList);
logger.info("继续同步同步了:"+neo4jFunctionList.size()+"个function。");
neo4jFunctionList.clear();
}
long columnTime1 = System.currentTimeMillis();
logger.info("新创建的function数量:"+functionCount+" 用时:"+(columnTime1-columnTime0));
}
private void saveAllViewByBatch(String catalogName, List<CompositionRelation> compositionRelationList,
Map<String, Neo4jSchema> neo4jSchemaMap,String schemaId) {
long time1 = System.currentTimeMillis();
Map<String, String> viewIdMapFromNeo4j = schemaService.getViewIdFromSchemaId(schemaId);
long time2 = System.currentTimeMillis();
logger.info("从neo4j 获取 schema:"+ schemaId +" 的view个数:"+viewIdMapFromNeo4j.size() +" , 耗时:"+(time2-time1));
List<Neo4jView> neo4jViewList = new ArrayList<>();
int viewCount = 0;
long columnTime0 = System.currentTimeMillis();
//用游标的方式,不要用分页获取
int batchSize = 5000;
String tablePrefix = "Catalog,Database,Schema,View";
BasicDBObject query = new BasicDBObject();
query.put("_class", tablePrefix);
query.put("parentId",schemaId);
MongoCursor<Document> tableCursor = this.mongoTemplate.getCollection(PREFIX_METADATA_NODE + catalogName).
find(query).noCursorTimeout(true).batchSize(batchSize).iterator();
while (tableCursor.hasNext()) {
Document tableDocument = tableCursor.next();
String viewId =(String) tableDocument.get("_id");
//存在了,就不写入
if(viewIdMapFromNeo4j.containsKey(viewId)){
viewIdMapFromNeo4j.remove(viewId);
continue;
}
String viewName =(String) tableDocument.get("name");
Neo4jSchema neo4jSchema = neo4jSchemaMap.get(schemaId);
String tablePath = neo4jSchema.getDataPath()+";"+viewName;
Neo4jView neo4jView = new Neo4jView();
neo4jView.setMetadataId(viewId);
neo4jView.setName(viewName);
neo4jViewList.add(neo4jView);
CompositionRelation schem2Table = new CompositionRelation();
schem2Table.setStart(neo4jSchema);
schem2Table.setEnd(neo4jView);
schem2Table.setName("Schema--Table");
compositionRelationList.add(schem2Table);
viewCount++;
if(viewCount % batchSize ==0){
neo4jViewRepository.saveAll(neo4jViewList);
logger.info("同步了:"+viewCount+"个表。"+viewId);
neo4jViewList.clear();
}
}
if(tableCursor != null){
tableCursor.close();
}
logger.info("需要删除的view个数:"+ viewIdMapFromNeo4j.size());
if(neo4jViewList.size()!=0){
neo4jViewRepository.saveAll(neo4jViewList);
logger.info("继续同步同步了:"+neo4jViewList.size()+"个视图。");
neo4jViewList.clear();
}
long columnTime1 = System.currentTimeMillis();
logger.info("新创建的view数量:"+viewCount+" 用时:"+(columnTime1-columnTime0));
}
private void saveAllColumnByBatch(String catalogName, List<CompositionRelation> compositionRelationList, private void saveAllColumnByBatch(String catalogName, List<CompositionRelation> compositionRelationList,
Map<String, Neo4jTable> neo4jTableIdMap, String schemaId) { Map<String, Neo4jTable> neo4jTableIdMap, String schemaId) {
...@@ -1478,7 +1698,19 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1478,7 +1698,19 @@ public class MetadataServiceImpl implements IMetadataService {
neo4jColumnList.clear(); neo4jColumnList.clear();
} }
} }
logger.info("需要删除的表个数:"+ columnFromSchemaId.size()); logger.info("需要删除的字段个数:"+ columnFromSchemaId.size());
List<Neo4jColumn> neo4jColumns = new ArrayList<>();
for(Object obj : columnFromSchemaId.keySet()){
String deleteColumnId = (String ) obj;
Neo4jColumn neo4jColumn = neo4jColumnRepository.findNeo4jColumnByMetadataId(deleteColumnId);
neo4jColumns.add(neo4jColumn);
}
if(neo4jColumns.size()!=0){
neo4jColumnRepository.deleteAll(neo4jColumns);
}
if(neo4jColumnList.size()!=0){ if(neo4jColumnList.size()!=0){
neo4jColumnRepository.saveAll(neo4jColumnList); neo4jColumnRepository.saveAll(neo4jColumnList);
neo4jColumnList.clear(); neo4jColumnList.clear();
...@@ -1894,7 +2126,7 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1894,7 +2126,7 @@ public class MetadataServiceImpl implements IMetadataService {
List<String> systemIdList = dataBaseData.getSysList(); List<String> systemIdList = dataBaseData.getSysList();
for(String systemId:systemIdList){ for(String systemId:systemIdList){
//系统id //系统id
logger.info("系统ID:"+systemId); // logger.info("系统ID:"+systemId);
if(systemDataMap.containsKey(systemId)){ if(systemDataMap.containsKey(systemId)){
neo4jSystem = systemDataMap.get(systemId); neo4jSystem = systemDataMap.get(systemId);
break; break;
......
...@@ -351,6 +351,87 @@ public class SchemaServiceImpl implements ISchemaService { ...@@ -351,6 +351,87 @@ public class SchemaServiceImpl implements ISchemaService {
return columnIdMap; return columnIdMap;
} }
@Override
public Map<String, String> getViewIdFromSchemaId(String schemaId) {
Map<String,String> viewIdMap = new HashMap<>();
//批量获取指定schema下面的view
String countCypher = "match (n:Neo4jSchema{metadataId:\""+schemaId+"\"})-->(m:Neo4jView) return count(m) as count";
String detailCypher = "match (n:Neo4jSchema{metadataId:\""+schemaId+"\"})-->(m:Neo4jTable) return m skip 10 limit 10";
StatementResult countResult = session.run(countCypher);
int jdcount= 0 ;
while(countResult.hasNext()){
Record record = countResult.next();
jdcount= record.get( "count").asInt();
}
//分页获取 view
int pageSize = 100;
int page = jdcount/pageSize;
for(int i=0;i<page+1;i++){
detailCypher = "match (n:Neo4jSchema{metadataId:\""+schemaId+"\"})-->(m:Neo4jView) return m.metadataId as id skip "+(i*pageSize ) +" limit "+ pageSize;
StatementResult detailResult = session.run(detailCypher);
while (detailResult.hasNext()){
Record record = detailResult.next();
String metadataId = record.get( "id").asString();
viewIdMap.put(metadataId,"");
}
}
return viewIdMap;
}
@Override
public Map<String, String> getFunctionIdFromSchemaId(String schemaId) {
Map<String,String> functionIdMap = new HashMap<>();
//批量获取指定schema下面的view
String countCypher = "match (n:Neo4jSchema{metadataId:\""+schemaId+"\"})-->(m:Neo4jFunction) return count(m) as count";
String detailCypher = "match (n:Neo4jSchema{metadataId:\""+schemaId+"\"})-->(m:Neo4jTable) return m skip 10 limit 10";
StatementResult countResult = session.run(countCypher);
int jdcount= 0 ;
while(countResult.hasNext()){
Record record = countResult.next();
jdcount= record.get( "count").asInt();
}
//分页获取 view
int pageSize = 100;
int page = jdcount/pageSize;
for(int i=0;i<page+1;i++){
detailCypher = "match (n:Neo4jSchema{metadataId:\""+schemaId+"\"})-->(m:Neo4jFunction) return m.metadataId as id skip "+(i*pageSize ) +" limit "+ pageSize;
StatementResult detailResult = session.run(detailCypher);
while (detailResult.hasNext()){
Record record = detailResult.next();
String metadataId = record.get( "id").asString();
functionIdMap.put(metadataId,"");
}
}
return functionIdMap;
}
@Override
public Map<String, String> getProcedureIdFromSchemaId(String schemaId) {
Map<String,String> functionIdMap = new HashMap<>();
//批量获取指定schema下面的view
String countCypher = "match (n:Neo4jSchema{metadataId:\""+schemaId+"\"})-->(m:Neo4jProcedure) return count(m) as count";
String detailCypher = "match (n:Neo4jSchema{metadataId:\""+schemaId+"\"})-->(m:Neo4jTable) return m skip 10 limit 10";
StatementResult countResult = session.run(countCypher);
int jdcount= 0 ;
while(countResult.hasNext()){
Record record = countResult.next();
jdcount= record.get( "count").asInt();
}
//分页获取 view
int pageSize = 100;
int page = jdcount/pageSize;
for(int i=0;i<page+1;i++){
detailCypher = "match (n:Neo4jSchema{metadataId:\""+schemaId+"\"})-->(m:Neo4jProcedure) return m.metadataId as id skip "+(i*pageSize ) +" limit "+ pageSize;
StatementResult detailResult = session.run(detailCypher);
while (detailResult.hasNext()){
Record record = detailResult.next();
String metadataId = record.get( "id").asString();
functionIdMap.put(metadataId,"");
}
}
return functionIdMap;
}
private void saveLabelFromTag(List<String> layer, Map<String, Neo4jSchema> neo4jSchemaMap, Map<String, List<Map<String, Object>>> returnResults) { private void saveLabelFromTag(List<String> layer, Map<String, Neo4jSchema> neo4jSchemaMap, Map<String, List<Map<String, Object>>> returnResults) {
for (Object obj : returnResults.keySet()) { for (Object obj : returnResults.keySet()) {
String schemaId = (String) obj; String schemaId = (String) obj;
......
...@@ -921,37 +921,37 @@ public class TableServiceImpl implements ITableService { ...@@ -921,37 +921,37 @@ public class TableServiceImpl implements ITableService {
getTargetTable(tableId ,targetTables); getTargetTable(tableId ,targetTables);
//质量报告 //质量报告
List<ReturnNode> qualityList = new ArrayList<>(); // List<ReturnNode> qualityList = new ArrayList<>();
try{ // try{
//从表获取字段: // //从表获取字段:
List<Neo4jColumn> neo4jColumnList = neo4jTableRepository.getColumnByTableId(tableId); // List<Neo4jColumn> neo4jColumnList = neo4jTableRepository.getColumnByTableId(tableId);
List<String> reportIdList = new ArrayList<>(); // List<String> reportIdList = new ArrayList<>();
//
for(Neo4jColumn neo4jColumn :neo4jColumnList ){ // for(Neo4jColumn neo4jColumn :neo4jColumnList ){
// // //
String metaId =neo4jColumn.getMetadataId(); // String metaId =neo4jColumn.getMetadataId();
// metaId = "Column=1=55d151b1ba484d0cbe3ccd7e9e2f0bfe"; //// metaId = "Column=1=55d151b1ba484d0cbe3ccd7e9e2f0bfe";
Map<String, String> dataQualityRepost = dataQualityReportRemoteService.getDataAssetGraphInfoByMetadataId(metaId); // Map<String, String> dataQualityRepost = dataQualityReportRemoteService.getDataAssetGraphInfoByMetadataId(metaId);
if(dataQualityRepost ==null || dataQualityRepost.size()==0){ // if(dataQualityRepost ==null || dataQualityRepost.size()==0){
continue; // continue;
} // }
String reportId = dataQualityRepost.get("id"); // String reportId = dataQualityRepost.get("id");
if(reportIdList.contains(reportId)){ // if(reportIdList.contains(reportId)){
continue; // continue;
} // }
reportIdList.add(reportId); // reportIdList.add(reportId);
ReturnNode returnNode = new ReturnNode(); // ReturnNode returnNode = new ReturnNode();
returnNode.setId(reportId); // returnNode.setId(reportId);
returnNode.setName(dataQualityRepost.get(JobAnalyzer)); // returnNode.setName(dataQualityRepost.get(JobAnalyzer));
returnNode.setCnName(dataQualityRepost.get(JobAnalyzer)); // returnNode.setCnName(dataQualityRepost.get(JobAnalyzer));
returnNode.setAttributeMaps(dataQualityRepost); // returnNode.setAttributeMaps(dataQualityRepost);
qualityList.add(returnNode); //// qualityList.add(returnNode);
} // }
}catch (Exception e){ // }catch (Exception e){
e.printStackTrace(); // e.printStackTrace();
} // }
sourceAndTargetTable.put("质量",qualityList); // sourceAndTargetTable.put("质量",qualityList);
sourceAndTargetTable.put("来源",sourceTables); sourceAndTargetTable.put("来源",sourceTables);
sourceAndTargetTable.put("目标",targetTables); sourceAndTargetTable.put("目标",targetTables);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment