Commit 1c6cc46a by qiuchaofei

1.同步表与表,2.修改从mongo读数的方式,3修改dependency的存储方式。

parent b196d6db
......@@ -2,6 +2,28 @@ package com.keymobile.metadata.metadataRelation.pojo;
import org.neo4j.ogm.annotation.RelationshipEntity;
@RelationshipEntity(type = "Dependency")
@RelationshipEntity(type = "流向")
public class DependencyRelation extends BaseRelationship {
private String jobId;
public String getJobId() {
return jobId;
}
public void setJobId(String jobId) {
this.jobId = jobId;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
private String description;
}
package com.keymobile.metadata.metadataRelation.respository;
import com.keymobile.metadata.metadataRelation.pojo.DependencyRelation;
import org.springframework.data.neo4j.repository.Neo4jRepository;
public interface DependencyRelationResposity extends Neo4jRepository<DependencyRelation, Long> {
}
......@@ -108,4 +108,6 @@ public interface IMetadataService {
void syschro1104Relations(String catalogName);
void syschroTable2TableRelations(String catalogName);
}
......@@ -7,7 +7,6 @@ import java.io.InputStream;
import java.util.*;
import java.util.concurrent.*;
import com.keymobile.metadata.metadataRelation.config.Neo4jConfig;
import com.keymobile.metadata.metadataRelation.pojo.*;
import com.keymobile.metadata.metadataRelation.pojo.metadata.*;
import com.keymobile.metadata.metadataRelation.pojo.mongo.MongoData;
......@@ -24,6 +23,8 @@ import com.keymobile.metadata.metadataRelation.service.ISchemaService;
import com.keymobile.metadata.metadataRelation.service.ITableService;
import com.keymobile.metadata.metadataRelation.util.Neo4jTool;
import com.keymobile.metadata.metadataRelation.util.SchemaLayerObject;
import com.mongodb.BasicDBObject;
import com.mongodb.client.MongoCursor;
import org.apache.commons.lang.StringUtils;
import org.bson.Document;
import org.neo4j.driver.v1.*;
......@@ -36,6 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.system.ApplicationHome;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
......@@ -48,6 +50,10 @@ public class MetadataServiceImpl implements IMetadataService {
@Autowired
private IBaseRelationshipService relationshipService;
@Autowired
private DependencyRelationResposity dependencyRelationResposity;
@Autowired
private CompositionRelationRespository compositionRelationRespository;
@Autowired
......@@ -69,7 +75,8 @@ public class MetadataServiceImpl implements IMetadataService {
@Autowired
private MongoDbServiceImpl mongoDbServiceImpl;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private MetadataRepoRemoteService metadataRepoRemoteService;
......@@ -104,6 +111,8 @@ public class MetadataServiceImpl implements IMetadataService {
@Autowired
private ISchemaService schemaService;
private String PREFIX_METADATA_NODE = "Metadata_";
@Override
public List<MetaData> findNodeByName(String dataName) {
return metadataRepository.findMetaData(dataName, 3);
......@@ -122,7 +131,11 @@ public class MetadataServiceImpl implements IMetadataService {
private String SystemModelName = "System";
private String TableModelEqual = "Table=";
private String ColumnModelQueal = "Column=";
private String EtlJobModelQueal = "EtlJob=";
private String EtlJobModelQueal = "ETLJob=";
private String EtlScriptModelQueal = "ETLScript=";
private String EtlSqlModelQueal = "SQL=";
private String IntegerString = "INTEGER";
private String JobIdString = "jobId";
private String DescriptionString = "description";
......@@ -1175,7 +1188,6 @@ public class MetadataServiceImpl implements IMetadataService {
List<CompositionRelation> compositionRelationList = new ArrayList<>();
int schemaCount = 1;
int tableCount = 1;
int columnCount = 1;
int viewCount = 1;
int functionCount = 1;
......@@ -1188,7 +1200,7 @@ public class MetadataServiceImpl implements IMetadataService {
Map<String,Integer> objectCountMap = new HashMap<>();
objectCountMap.put(SchemaCountString,schemaCount);
objectCountMap.put(TableCountString,tableCount);
// objectCountMap.put(TableCountString,tableCount);
objectCountMap.put(ColumnCountString,columnCount);
objectCountMap.put(ViewCountString,viewCount);
objectCountMap.put(FunctionCountString,functionCount);
......@@ -1198,6 +1210,7 @@ public class MetadataServiceImpl implements IMetadataService {
objectCountMap.put(EtlScriptCountString,etlScriptCount);
objectCountMap.put(EtlSqlCountString,etlSqlCount);
Map<String, Neo4jSchema> neo4jSchemaMap =new HashMap<>();
for(MongoData catalogData:catalogDataList){
List<MongoData> dataBaseDataList= mongoDbServiceImpl.findDataByparentId(catalogData.get_id(),catalogName);
......@@ -1208,16 +1221,32 @@ public class MetadataServiceImpl implements IMetadataService {
saveServerDataToNeo4j(dataBaseData,catalogName,systemDataMap,compositionRelationList,objectCountMap);
}else if(mongoId.startsWith("Database=")){
//同步 database类的元数据
saveDataBaseDataToNeo4j(dataBaseData,catalogName,systemDataMap,compositionRelationList,objectCountMap);
saveDataBaseDataToNeo4j(dataBaseData,catalogName,systemDataMap,compositionRelationList,objectCountMap,neo4jSchemaMap);
}
}
}
long columnTime0 = System.currentTimeMillis();
Map<String,Neo4jTable> neo4jTableIdMap = new HashMap<>();
saveAllTableByBatch(catalogName,compositionRelationList,neo4jSchemaMap,neo4jTableIdMap);
long columnTime1 = System.currentTimeMillis();
//专门做字段解析,批量获取字段,进行组装。
saveAllColumnByBatch(catalogName,compositionRelationList,neo4jTableIdMap);
long columnTime2 = System.currentTimeMillis();
logger.info("获取字段用时:"+(columnTime2-columnTime1));
logger.info("创建的system数量:"+systemDataMap.size());
logger.info("创建的schema:"+(objectCountMap.get(SchemaCountString)-1));
// logger.info("创建的table数量:"+(objectCountMap.get(TableCountString)-1));
logger.info("创建的etlJob数量:"+(objectCountMap.get(EtlJobCountString)-1));
logger.info("创建的etlScript数量:"+(objectCountMap.get(EtlScriptCountString)-1));
logger.info("创建的etlSql数量:"+(objectCountMap.get(EtlSqlCountString)-1));
long time1 = System.currentTimeMillis();
int count = 0;
List<CompositionRelation> newCompositionList= new ArrayList<>();
for(CompositionRelation compositionRelation:compositionRelationList){
newCompositionList.add(compositionRelation);
if(count%100==0){
if(count % 500==0){
compositionRelationRespository.saveAll(newCompositionList);
newCompositionList.clear();
}
......@@ -1227,30 +1256,87 @@ public class MetadataServiceImpl implements IMetadataService {
compositionRelationRespository.saveAll(newCompositionList);
}
logger.info("创建的system数量:"+systemDataMap.size());
logger.info("创建的schema:"+(objectCountMap.get(SchemaCountString)-1));
logger.info("创建的table数量:"+(objectCountMap.get(TableCountString)-1));
logger.info("创建的column数量:"+(objectCountMap.get(ColumnCountString)-1));
logger.info("创建的etlJob数量:"+(objectCountMap.get(EtlJobCountString)-1));
logger.info("创建的etlScript数量:"+(objectCountMap.get(EtlScriptCountString)-1));
logger.info("创建的etlSql数量:"+(objectCountMap.get(EtlSqlCountString)-1));
long time2 = System.currentTimeMillis();
logger.info("用时:"+(time2-time1));
logger.info("创建组合关系{}条,用时{}:",compositionRelationList.size(),(time2-time1));
}
private void saveAllTableByBatch(String catalogName, List<CompositionRelation> compositionRelationList,
Map<String, Neo4jSchema> neo4jSchemaMap, Map<String, Neo4jTable> neo4jTableIdMap) {
List<Neo4jTable> neo4jTableList = new ArrayList<>();
int tableCount = 1;
long columnTime0 = System.currentTimeMillis();
//用游标的方式,不要用分页获取
int batchSize = 5000;
String tablePrefix = "Catalog,Database,Schema,Table";
BasicDBObject query = new BasicDBObject();
query.put("_class", tablePrefix);
MongoCursor<Document> tableCursor = this.mongoTemplate.getCollection(PREFIX_METADATA_NODE + catalogName).
find(query).noCursorTimeout(true).batchSize(batchSize).iterator();
while (tableCursor.hasNext()) {
Document tableDocument = tableCursor.next();
String tableId =(String) tableDocument.get("_id");
String tableName =(String) tableDocument.get("name");
String tableCnName =(String) tableDocument.get("cnName");
String schemaId =(String) tableDocument.get("parentId");
Neo4jSchema neo4jSchema = neo4jSchemaMap.get(schemaId);
String tablePath = neo4jSchema.getDataPath()+";"+tableName;
Neo4jTable neo4jTable = new Neo4jTable();
neo4jTable.setMetadataId(tableId);
neo4jTable.setName(tableName);
neo4jTable.setCnName(tableCnName);
neo4jTable.setDataPath(tablePath);
neo4jTable.setIsEnvironment(catalogName);
// neo4jTable.setIsSystem(neo4jSystem.getName());
neo4jTable.setIsLayer(neo4jSchema.getLabel());
neo4jTable.setIsSchema(neo4jSchema.getName());
neo4jTable.setTableSize(""+tableDocument.get("tableSize"));
neo4jTable.setTableCount(""+tableDocument.get("tableRowsCount"));
neo4jTable.setUpdateTIme((String) tableDocument.get("lastUpdateTime"));
neo4jTable.setComment((String) tableDocument.get("comment"));
neo4jTable.setCnName((String)tableDocument.get(CNNameString));
neo4jTable.setTableCount(""+tableDocument.get("tableRowsCount"));
neo4jTableList.add(neo4jTable);
CompositionRelation schem2Table = new CompositionRelation();
schem2Table.setStart(neo4jSchema);
schem2Table.setEnd(neo4jTable);
schem2Table.setName("Schema--Table");
compositionRelationList.add(schem2Table);
tableCount++;
neo4jTableIdMap.put(neo4jTable.getMetadataId(),neo4jTable);
if(tableCount % batchSize ==0){
neo4jTableRepository.saveAll(neo4jTableList);
logger.info("同步了:"+tableCount+"个表。"+tableId);
neo4jTableList.clear();
}
}
if(neo4jTableList.size()!=0){
neo4jTableRepository.saveAll(neo4jTableList);
logger.info("继续同步同步了:"+neo4jTableList.size()+"个表。");
neo4jTableList.clear();
}
long columnTime1 = System.currentTimeMillis();
logger.info("创建的table数量:"+tableCount+" 用时:"+(columnTime1-columnTime0));
}
private void saveDataBaseDataToNeo4j(MongoData dataBaseData, String catalogName, Map<String, Neo4jSystem> systemDataMap,
List<CompositionRelation> compositionRelationList, Map<String, Integer> objectCountMap) {
List<CompositionRelation> compositionRelationList,
Map<String, Integer> objectCountMap, Map<String, Neo4jSchema> neo4jSchemaMap) {
int schemaCount = objectCountMap.get(SchemaCountString);
int tableCount = 0;
Neo4jSystem neo4jSystem = getNeo4jSystem(dataBaseData,systemDataMap);
if(dataBaseData.getName().equalsIgnoreCase("MYSQL-0")){
System.out.println("MYSQL-0");
}
List<MongoData> schemaDataList= mongoDbServiceImpl.findDataByparentId(dataBaseData.get_id(),catalogName);
List<String> schemaIds = getSchemaIds(schemaDataList);
Map<String,String> shemaIdAndLabel = schemaService.getSchemaAndLabelMap(schemaIds);
for(MongoData schemaData:schemaDataList){
//系统与schema的关系
String schemaData_id = schemaData.get_id();
......@@ -1259,13 +1345,8 @@ public class MetadataServiceImpl implements IMetadataService {
neo4jSchema.setName(schemaData.getName());
neo4jSchema.setCnName(schemaData.getCnName());
if(neo4jSchema.getName().equalsIgnoreCase("dev1")){
System.out.println("dev1");
}
String schemaLabel = getNeo4jSchemaLabel(dataBaseData,schemaData_id,shemaIdAndLabel) ;
neo4jSchema.setLabel(schemaLabel);
System.out.println(neo4jSchema.getName()+" : "+ neo4jSchema.getLabel());
String schemaPath = neo4jSystem.getDataPath()+";"+schemaData.getName();
neo4jSchema.setDataPath(schemaPath);
......@@ -1284,29 +1365,80 @@ public class MetadataServiceImpl implements IMetadataService {
// relationshipService.saveRelation(neo4jSystem.getMetadataId(),neo4jSchema.getMetadataId(),"Composition");
neo4jSchema.setDataPath(schemaPath);
neo4jSchemaMap.put(schemaData_id,neo4jSchema);
//根据schema获取表级数据,再按照类型分类:表,视图,作业,函数等
int page =0,pageSize = 300;
long totalElement = mongoDbServiceImpl.countDocumentByParentId(PageRequest.of(page,pageSize),catalogName,schemaData_id);
int totalPageCount = ((int)totalElement/pageSize)+1;
for(; page< totalPageCount;page++ ){
List<Document> tableDocumentList = mongoDbServiceImpl.getDocumentByparentIdByPage(PageRequest.of(page,pageSize,Sort.by("_id")),catalogName,schemaData_id);
saveTableDocument(tableDocumentList,neo4jSchema,objectCountMap,neo4jSystem, compositionRelationList,catalogName);
}
// int page =0,pageSize = 500;
// long totalElement = mongoDbServiceImpl.countDocumentByParentId(PageRequest.of(page,pageSize),catalogName,schemaData_id);
// int totalPageCount = ((int)totalElement/pageSize)+1;
// for(; page< totalPageCount;page++ ){
// List<Document> tableDocumentList = mongoDbServiceImpl.getDocumentByparentIdByPage(PageRequest.of(page,pageSize,Sort.by("_id")),catalogName,schemaData_id);
// saveTableDocument(tableDocumentList,neo4jSchema,objectCountMap,neo4jSystem,
// compositionRelationList,catalogName,neo4jTableIdMap);
// }
}
objectCountMap.put(SchemaCountString,schemaCount);
}
private void saveAllColumnByBatch(String catalogName,List<CompositionRelation> compositionRelationList,
Map<String,Neo4jTable> neo4jTableIdMap) {
int columnCount = 0;// objectCountMap.get(ColumnCountString);
String ColumnPrefix = "Catalog,Database,Schema,Table,Column";
int batchSize = 5000;
BasicDBObject query = new BasicDBObject();
query.put("_class", ColumnPrefix);
MongoCursor<Document> columnCursor = this.mongoTemplate.getCollection(PREFIX_METADATA_NODE + catalogName).
find(query).noCursorTimeout(true).batchSize(batchSize).iterator();
List<Neo4jColumn> neo4jColumnList = new ArrayList<>();
while (columnCursor.hasNext()) {
Document columnDocument = columnCursor.next();
String columnId =(String) columnDocument.get("_id");
String metadataName =(String) columnDocument.get("name");
String metadataCnName =(String) columnDocument.get("cnName");
String tableId = (String) columnDocument.get("parentId");
//
Neo4jColumn neo4jColumn = new Neo4jColumn();
neo4jColumn.setMetadataId(columnId);
neo4jColumn.setName(metadataName);
neo4jColumn.setCnName(metadataCnName);
neo4jColumnList.add(neo4jColumn);
CompositionRelation table2Column = new CompositionRelation();
Neo4jTable neo4jTable = neo4jTableIdMap.get(tableId);
if(neo4jTable == null){
continue;
}
table2Column.setStart(neo4jTable);
table2Column.setEnd(neo4jColumn);
table2Column.setName("Table--Column");
compositionRelationList.add(table2Column);
columnCount++;
if (columnCount % batchSize == 0) {
logger.info("同步了:"+columnCount+"个字段。"+columnId);
neo4jColumnRepository.saveAll(neo4jColumnList);
neo4jColumnList.clear();
}
}
if(neo4jColumnList.size()!=0){
neo4jColumnRepository.saveAll(neo4jColumnList);
neo4jColumnList.clear();
}
logger.info("创建的column数量:"+columnCount);
}
private void saveTableDocument(List<Document> tableDocumentList, Neo4jSchema neo4jSchema, Map<String, Integer> objectCountMap,
Neo4jSystem neo4jSystem, List<CompositionRelation> compositionRelationList, String catalogName) {
Neo4jSystem neo4jSystem, List<CompositionRelation> compositionRelationList,
String catalogName, Map<String, Neo4jTable> neo4jTableIdMap) {
int schemaCount = objectCountMap.get(SchemaCountString);
int tableCount = objectCountMap.get(TableCountString);
int columnCount = objectCountMap.get(ColumnCountString);
int viewCount = objectCountMap.get(ViewCountString);
int functionCount = objectCountMap.get(FunctionCountString);
int procedureCount = objectCountMap.get(ProcedureCountString);
List<Neo4jTable> neo4jTableList = new ArrayList<>();
List<Neo4jColumn> neo4jColumnList = new ArrayList<>();
List<Neo4jTable> neo4jTableList = new ArrayList<>();
List<Neo4jView> viewTableList = new ArrayList<>();
List<Neo4jFunction> functionTableList = new ArrayList<>();
List<Neo4jProcedure> procedureTableList = new ArrayList<>();
......@@ -1344,32 +1476,29 @@ public class MetadataServiceImpl implements IMetadataService {
schem2Table.setName("Schema--Table");
compositionRelationList.add(schem2Table);
tableCount++;
List<MongoData> columnMongoDataList = mongoDbServiceImpl.findDataByparentId(metadataId, catalogName);
for(MongoData columnMongoData:columnMongoDataList){
String columnId = columnMongoData.get_id();
Neo4jColumn neo4jColumnList1 =null;// neo4jColumnRepository.findNeo4jColumnByMetadataId(columnId);
// if(neo4jColumnList1==null){
neo4jTableIdMap.put(neo4jTable.getMetadataId(),neo4jTable);
// List<MongoData> columnMongoDataList = mongoDbServiceImpl.findDataByparentId(metadataId, catalogName);
// for(MongoData columnMongoData:columnMongoDataList){
// String columnId = columnMongoData.get_id();
// Neo4jColumn neo4jColumnList1 =null;// neo4jColumnRepository.findNeo4jColumnByMetadataId(columnId);
//
// }
Neo4jColumn neo4jColumn = new Neo4jColumn();
neo4jColumn.setMetadataId(columnId);
neo4jColumn.setName(columnMongoData.getName());
neo4jColumn.setCnName(columnMongoData.getCnName());
neo4jColumnList.add(neo4jColumn);
CompositionRelation table2Column = new CompositionRelation();
table2Column.setStart(neo4jTable);
table2Column.setEnd(neo4jColumn);
table2Column.setName("Table--Column");
compositionRelationList.add(table2Column);
columnCount++;
saveColumnEvery200(columnCount,neo4jColumnList);
}
//// if(neo4jColumnList1==null){
////
//// }
// Neo4jColumn neo4jColumn = new Neo4jColumn();
// neo4jColumn.setMetadataId(columnId);
// neo4jColumn.setName(columnMongoData.getName());
// neo4jColumn.setCnName(columnMongoData.getCnName());
// neo4jColumnList.add(neo4jColumn);
//
// CompositionRelation table2Column = new CompositionRelation();
// table2Column.setStart(neo4jTable);
// table2Column.setEnd(neo4jColumn);
// table2Column.setName("Table--Column");
// compositionRelationList.add(table2Column);
// columnCount++;
// saveColumnEvery200(columnCount,neo4jColumnList);
// }
// }
}else if(metadataId.startsWith("View=")){
saveViewEvery200(viewCount,viewTableList);
......@@ -1446,7 +1575,7 @@ public class MetadataServiceImpl implements IMetadataService {
objectCountMap.put(SchemaCountString,schemaCount);
objectCountMap.put(TableCountString,tableCount);
objectCountMap.put(ColumnCountString,columnCount);
// objectCountMap.put(ColumnCountString,columnCount);
objectCountMap.put(ViewCountString,viewCount);
objectCountMap.put(FunctionCountString,functionCount);
objectCountMap.put(ProcedureCountString,procedureCount);
......@@ -1527,15 +1656,11 @@ public class MetadataServiceImpl implements IMetadataService {
}
private void saveColumnEvery200(int columnCount, List<Neo4jColumn> neo4jColumnList) {
if(columnCount %100==0){
logger.info("同步了:"+columnCount+"个字段。");
neo4jColumnRepository.saveAll(neo4jColumnList);
neo4jColumnList.clear();
}
}
private void saveTableEvery200(int tableCount, List<Neo4jTable> neo4jTableList) {
if(tableCount %100==0){
if(tableCount % 1000 == 0){
logger.info("同步了:"+tableCount+"个表。");
neo4jTableRepository.saveAll(neo4jTableList);
neo4jTableList.clear();
......@@ -1832,6 +1957,79 @@ public class MetadataServiceImpl implements IMetadataService {
}
@Override
public void syschroTable2TableRelations(String catalogName) {
Map<String, String> relationMap = new HashMap<>();
int page =0,pageSize = 300;
long time0 = System.currentTimeMillis();
long totalElement = mongoDbServiceImpl.countRelationByDependency(PageRequest.of(page,pageSize),catalogName);
long time1 = System.currentTimeMillis();
logger.info("从mongo获取的关系的总数量是:"+ totalElement +" ,耗时:" +(time1-time0) );
int totalPageCount = ((int)totalElement/pageSize)+1;
Map<String,DependencyRelation> dependencyRelationMap = new HashMap<>();
Map<String,BaseNode> baseNodeMap = new HashMap<>();
logger.info("准备分页从mongo获取关系,每页:"+ pageSize);
for(; page< totalPageCount;page++){
logger.info("开始从 mongo 第"+(page+1 ) +" 次获取关系。");
long time3 = System.currentTimeMillis();
List<Document> relationList = mongoDbServiceImpl.findRelationByDependency(PageRequest.of(page,pageSize,Sort.by("_id")),catalogName);
long time4 = System.currentTimeMillis();
logger.info("从 mongo 第"+(page+1) +" 次获取关系结束,耗时:" +(time4-time3) );
for(Document relation :relationList){
Map<String,String> attributeMap = new HashMap<>();
String jobId = relation.getString(JobIdString);
attributeMap.put(JobIdString,jobId);
String description = relation.getString(DescriptionString);
if(description==null){
description = "0";
}
attributeMap.put(DescriptionString,description);
String sourceId =relation.getString("source"); //relationMongo.getSource();
String targetId =relation.getString("target"); // relationMongo.getTarget();
//本身的字段级关系也要同步
DependencyRelation dependencyRelation = new DependencyRelation();
BaseNode startNode = null;
BaseNode endNode = null;
dependencyRelation.setJobId(jobId);
dependencyRelation.setDescription(description);
if(baseNodeMap.get(sourceId)!=null){
startNode = baseNodeMap.get(sourceId);
}else {
startNode = getNodeByMetadataId(sourceId);
baseNodeMap.put(sourceId,startNode);
}
if(baseNodeMap.get(targetId)!=null){
endNode =baseNodeMap.get(targetId);
} else {
endNode = getNodeByMetadataId(targetId);//neo4jColumnRepository.findNeo4jColumnByMetadataId(sourceId);
baseNodeMap.put(targetId,endNode);
}
if(startNode == null || endNode == null){
continue;
}
dependencyRelation.setStart(startNode);
dependencyRelation.setEnd(endNode);
dependencyRelationMap.put(sourceId+"_"+targetId,dependencyRelation);
//从idpath 找出父节点id
String sourceIdPath = relation.getString("sourceIdPath");
String targetIdPath = relation.getString("targetIdPath");
getRelationFromSourceAndTarget(dependencyRelationMap, attributeMap, sourceId, targetId,sourceIdPath,targetIdPath, baseNodeMap);
}
}
long start = System.currentTimeMillis();
saveRelationFromEdgeIdMap(dependencyRelationMap);
long end = System.currentTimeMillis();
logger.info("一共在neo4j创建了多少Table2Table关系:"+dependencyRelationMap.size() + ", 用时:"+(end-start));
}
private String getTargetColumnId(String targetTableId, String targetColumn) {
String targetColumnId = "";
List<Neo4jColumn> targetColumnList = neo4jTableRepository.getColumnByTableId(targetTableId);
......@@ -1872,30 +2070,36 @@ public class MetadataServiceImpl implements IMetadataService {
@Override
public void syschroTable2EtlJobRelations(String catalogName) {
Map<String, String> relationMap = new HashMap<>();
Map<String,Map<String, String>> edgeIdMap = new HashMap<>();
// Map<String,Map<String, String>> edgeIdMap = new HashMap<>();
Map<String,DependencyRelation> dependencyRelationMap = new HashMap<>();
Map<String,BaseNode> baseNodeMap = new HashMap<>();
int page =0,pageSize = 300;
long time0 = System.currentTimeMillis();
long totalElement = mongoDbServiceImpl.countRelation(PageRequest.of(page,pageSize),catalogName);
long totalElement = mongoDbServiceImpl.countRelationByInputOutPut(PageRequest.of(page,pageSize),catalogName);
long time1 = System.currentTimeMillis();
logger.info("从mongo获取的关系的总数量是:"+ totalElement +" ,耗时:" +(time1-time0) );
int totalPageCount = ((int)totalElement/pageSize)+1;
logger.info("准备分页从mongo获取关系,每页:"+ pageSize);
for(; page< totalPageCount;page++){
logger.info("开始从 mongo 第"+page+1 +" 次获取关系。");
logger.info("开始从 mongo 第"+(page+1) +" 次获取关系。");
long time3 = System.currentTimeMillis();
List<Document> relationList = mongoDbServiceImpl.findRelationByPage(PageRequest.of(page,pageSize,Sort.by("_id")),catalogName);
List<Document> relationList = mongoDbServiceImpl.findRelationByPageInputOutPut(PageRequest.of(page,pageSize,Sort.by("_id")),catalogName);
long time4 = System.currentTimeMillis();
logger.info("从 mongo 第"+page+1 +" 次获取关系结束,耗时:" +(time4-time3) );
logger.info("从 mongo 第"+(page+1) +" 次获取关系结束,耗时:" +(time4-time3) );
for(Document relation :relationList){
Map<String,String> attributeMap = new HashMap<>();
String jobId = relation.getString(JobIdString);
if(jobId == null ){
jobId = "0";
}
attributeMap.put(JobIdString,jobId);
String description = relation.getString(DescriptionString);
if(description==null){
if(description==null || description.equals("")){
description = "0";
}
......@@ -1905,27 +2109,43 @@ public class MetadataServiceImpl implements IMetadataService {
String sourceId =relation.getString("source"); //relationMongo.getSource();
String targetId =relation.getString("target"); // relationMongo.getTarget();
//本身的字段级关系也要同步
edgeIdMap.put(sourceId+"_"+targetId,attributeMap);
Map<String, Object> sourceData = metadataRepoRemoteService.getMetadata(sourceId);
Map<String, Object> targetData = metadataRepoRemoteService.getMetadata(targetId);
List<String> types = new ArrayList<>();
types.add(InputString);
types.add(OutputString);
if(!(types.contains(type))){
continue;
// edgeIdMap.put(sourceId+"_"+targetId,attributeMap);
DependencyRelation dependencyRelation = new DependencyRelation();
BaseNode startNode = null;
BaseNode endNode = null;
dependencyRelation.setJobId(jobId);
dependencyRelation.setDescription(description);
if(baseNodeMap.get(sourceId)!=null){
startNode = baseNodeMap.get(sourceId);
}else {
startNode = getNodeByMetadataId(sourceId);
baseNodeMap.put(sourceId,startNode);
}
if(baseNodeMap.get(targetId)!=null){
endNode =baseNodeMap.get(targetId);
} else {
endNode = getNodeByMetadataId(targetId);//neo4jColumnRepository.findNeo4jColumnByMetadataId(sourceId);
baseNodeMap.put(targetId,endNode);
}
if(sourceData==null || targetData==null){
if(startNode == null || endNode == null){
continue;
}
getRelationFromSourceAndTarget(edgeIdMap, attributeMap, sourceId, targetId);
dependencyRelation.setStart(startNode);
dependencyRelation.setEnd(endNode);
dependencyRelationMap.put(sourceId+"_"+targetId,dependencyRelation);
//从idpath 找出父节点id
String sourceIdPath = relation.getString("sourceIdPath");
String targetIdPath = relation.getString("targetIdPath");
getRelationFromSourceAndTarget(dependencyRelationMap, attributeMap,sourceId,targetId, sourceIdPath, targetIdPath,baseNodeMap);
}
}
long start = System.currentTimeMillis();
saveRelationFromEdgeIdMap(edgeIdMap);
saveRelationFromEdgeIdMap(dependencyRelationMap);
long end = System.currentTimeMillis();
logger.info("一共在neo4j,创建了多少关系:"+edgeIdMap.size() + " ,总用时:"+(end-start));
logger.info("一共在neo4j,创建了多少关系:"+ dependencyRelationMap.size() + " ,总用时:"+(end-start));
// List<RelationMongo> relationMongoList = mongoDbServiceImpl.findAllRelationByCatalog(catalogName);
// int size = 0;
......@@ -2011,115 +2231,126 @@ public class MetadataServiceImpl implements IMetadataService {
// }
}
private void getRelationFromSourceAndTarget(Map<String, Map<String, String>> edgeIdMap, Map<String, String> attributeMap, String sourceId, String targetId) {
private BaseNode getNodeByMetadataId(String metadataId) {
BaseNode baseNode = null;
if(metadataId.startsWith(ColumnModelQueal)){
baseNode = neo4jColumnRepository.findNeo4jColumnByMetadataId(metadataId);
}else if(metadataId.startsWith(TableModelEqual)){
baseNode = neo4jTableRepository.findNeo4jTableByMetadataId(metadataId);
} else if(metadataId.startsWith(EtlScriptModelQueal)){
baseNode = neo4jETLScriptRepository.findNeo4jETLScriptByMetadataId(metadataId);
}else if(metadataId.startsWith(EtlSqlModelQueal)){
baseNode = neo4jETLSqlRepository.findNeo4jETLSqlByMetadataId(metadataId);
}
return baseNode;
}
private void getRelationFromSourceAndTarget(Map<String, DependencyRelation> dependencyRelationMap, Map<String, String> attributeMap,
String sourceId, String targetId,
String sourceIdPath, String targetIdPath, Map<String, BaseNode> baseNodeMap) {
String startId = "";
String endId = "";
//如果是字段,上升到表,如果是sql,上升到etlscript,其他的模型之间的关系暂不处理
if(sourceId.contains(ColumnModelQueal) && targetId.contains(ColumnModelQueal) ){
// Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId);
// Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
// //要记录字段级别的关系
// if (sourceParent == null ) {
// logger.info("没有找到id:" + sourceId + "的父节点元数据。");
// continue;
// }
// if (targetParent==null) {
// logger.info("没有找到id:" + targetId + "的父节点元数据。");
// continue;
// }
// startId = (String )sourceParent.get("_id");
// endId = (String )targetParent.get("_id");
startId = getParentId(sourceId);
endId = getParentId(targetId);
startId = getParentId(sourceId,sourceIdPath);
endId = getParentId(targetId,targetIdPath);
} else if(sourceId.contains(ColumnModelQueal) && targetId.contains("SQL=") ){
// Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId);
// Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
// if (sourceParent == null ) {
// logger.info("没有找到id:" + sourceId + "的父节点元数据。");
// continue;
// }
// if (targetParent==null) {
// logger.info("没有找到id:" + targetId + "的父节点元数据。");
// continue;
// }
//// String targetParentId = (String) targetParent.get("_id");
//// Map<String, Object> targetParentParent = metadataRepoRemoteService.getParent(targetParentId);
//// if (targetParentParent==null) {
//// logger.info("没有找到id:" + targetId + "的祖父节点元数据。");
//// continue;
//// }
//
// startId = (String )sourceParent.get("_id");
// endId = (String )targetParent.get("_id");
startId = getParentId(sourceId,sourceIdPath);
endId = getParentId(targetId,targetIdPath);
}else if(sourceId.contains("SQL=") && targetId.contains(ColumnModelQueal)){
startId = getParentId(sourceId,sourceIdPath);
endId = getParentId(targetId,targetIdPath);
} else if(sourceId.contains("SQL=") && targetId.contains("SQL=") ){
startId = getParentId(sourceId,sourceIdPath);
endId = getParentId(targetId,targetIdPath);
}
String relationId = startId+"_"+endId;
startId = getParentId(sourceId);
endId = getParentId(targetId);
DependencyRelation dependencyRelation = new DependencyRelation();
BaseNode startNode = null;
BaseNode endNode = null;
}else if(sourceId.contains("SQL=") && targetId.contains(ColumnModelQueal)){
// Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId);
//
// Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
//
// if (sourceParent == null ) {
// logger.info("没有找到id:" + sourceId + "的父节点元数据。");
// continue;
// }
// if (targetParent==null) {
// logger.info("没有找到id:" + targetId + "的父节点元数据。");
// continue;
// }
//// String sourceParenttId = (String) sourceParent.get("_id");
//// Map<String, Object> sourceParentParent = metadataRepoRemoteService.getParent(sourceParenttId);
//// if (sourceParentParent==null) {
//// logger.info("没有找到id:" + sourceId + "的祖父节点元数据。");
//// continue;
//// }
//
// startId = (String )sourceParent.get("_id");
// endId = (String )targetParent.get("_id");
if(baseNodeMap.get(startId)!=null){
startNode = baseNodeMap.get(startId);
}else {
startNode = getNodeByMetadataId(startId) ;//neo4jColumnRepository.findNeo4jColumnByMetadataId(startId);
baseNodeMap.put(startId,startNode);
}
if(baseNodeMap.get(endId)!=null){
endNode =baseNodeMap.get(endId);
} else {
endNode = getNodeByMetadataId(endId);//neo4jColumnRepository.findNeo4jColumnByMetadataId(endId);
baseNodeMap.put(endId,endNode);
}
if(startNode != null && endNode != null){
String jobId = attributeMap.get("jobId");
String description = attributeMap.get("description");
dependencyRelation.setJobId(jobId);
dependencyRelation.setDescription(description);
startId = getParentId(sourceId);
endId = getParentId(targetId);
dependencyRelation.setStart(startNode);
dependencyRelation.setEnd(endNode);
dependencyRelationMap.put(relationId,dependencyRelation);
}
String relationId = startId+"_"+endId;
edgeIdMap.put(relationId,attributeMap);
}
private void saveRelationFromEdgeIdMap(Map<String, Map<String, String>> edgeIdMap) {
private void saveRelationFromEdgeIdMap(Map<String,DependencyRelation> dependencyRelationMap) {
int count=1;
for(Object obj : edgeIdMap.keySet()){
if(count%100 == 0){
logger.info("创建了"+count+"关系:");
}
String edgeId = (String) obj;
String[] dataIds = edgeId.split("_");
String startId = dataIds[0];
String endId = dataIds[1];
Map<String,String> attributeMap0 = edgeIdMap.get(obj);
String description = attributeMap0.get(DescriptionString);
String jobId = attributeMap0.get(JobIdString);
// String description = edgeIdMap.get(obj);
if(description ==null || description.equalsIgnoreCase("")){
relationshipService.saveRelation(startId,endId,"流向",jobId);
}else{
relationshipService.saveRelation(startId,endId,"流向",jobId,description);
List<DependencyRelation > dependencyRelationList = new ArrayList<>();
for(Object obj : dependencyRelationMap.keySet()){
DependencyRelation dependencyRelation = dependencyRelationMap.get(obj);
dependencyRelationList.add(dependencyRelation);
if(count % 500 == 0){
dependencyRelationResposity.saveAll(dependencyRelationList);
logger.info("创建了"+count+"关系:");
dependencyRelationList.clear();
}
count++;
// String edgeId = (String) obj;
// String[] dataIds = edgeId.split("_");
// if(dataIds==null || dataIds.length==0){
// continue;
// }
// String startId = dataIds[0];
// String endId = dataIds[1];
// Map<String,String> attributeMap0 = edgeIdMap.get(obj);
// String description = attributeMap0.get(DescriptionString);
// String jobId = attributeMap0.get(JobIdString);
//// String description = edgeIdMap.get(obj);
// if(description ==null || description.equalsIgnoreCase("")){
// relationshipService.saveRelation(startId,endId,"流向",jobId);
// }else{
// relationshipService.saveRelation(startId,endId,"流向",jobId,description);
//
// }
}
if(dependencyRelationList .size() !=0){
dependencyRelationResposity.saveAll(dependencyRelationList);
dependencyRelationList.clear();
}
}
private String getParentId(String targetId) {
private String getParentId(String currentId,String IdPath) {
String parentId = "";
Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
if (targetParent==null) {
logger.info("没有找到元数据的targetId=" + targetId + "的父节点。");
}else {
parentId = (String )targetParent.get("_id");
}
String[] sourceIdPathArray = IdPath.split("/");
if(sourceIdPathArray.length>1){
parentId = sourceIdPathArray[sourceIdPathArray.length-2];
}
// String[] targetIdPathArray = targetIdPath.split("////");
// String targetParentID = targetIdPathArray[targetIdPathArray.length-1];
// Map<String, Object> targetParent = metadataRepoRemoteService.getParent(currentId);
// if (targetParent==null) {
// logger.info("没有找到元数据的targetId=" + currentId + "的父节点。");
// }else {
// parentId = (String )targetParent.get("_id");
// }
return parentId;
}
......@@ -2305,6 +2536,8 @@ public class MetadataServiceImpl implements IMetadataService {
@Override
public List<String> findAllNodeTypes() {
Iterable<MetaData> dataIterable = metadataRepository.findAll();
List<String> labels = new ArrayList<>();
labels.add("All");
......@@ -2450,10 +2683,6 @@ public class MetadataServiceImpl implements IMetadataService {
@Override
public void deleteNodeByCatalogName(String catalogName) {
try{
// Neo4jConfig neo4jConfig = new Neo4jConfig();
// Driver neo4jConnection = neo4jConfig.getNeo4jConnection();
// Session session = neo4jConnection.session();
String cypher = "match (n) where n.metadataId =~'.*="+catalogName+"=.*' detach delete n";
logger.info("删除数据的语句:{}",cypher);
StatementResult statementResult = session.run(cypher);
......
......@@ -157,19 +157,34 @@ public class MongoDbServiceImpl {
return mongoTemplate.count(query, Document.class, PREFIX_MD_RELATION_TEMP_NODE + catalogName);
}
public List<Document> findRelationByPage(Pageable pageable, String catalogName) {
public List<Document> findRelationByPageInputOutPut(Pageable pageable, String catalogName) {
Query query = new Query(new Criteria().orOperator(Criteria.where("type").is(INPUT),Criteria.where("type").is(OUTPUT)));
query.with(pageable);
return mongoTemplate.find(query, Document.class, PREFIX_MD_RELATION + catalogName);
}
public long countRelation(Pageable pageable, String catalogName) {
public long countRelationByInputOutPut(Pageable pageable, String catalogName) {
Query query = new Query(new Criteria().orOperator(Criteria.where("type").is(INPUT),Criteria.where("type").is(OUTPUT)));
query.with(pageable);
return mongoTemplate.count(query, Document.class, PREFIX_MD_RELATION + catalogName);
}
public List<Document> findRelationByDependency(Pageable pageable, String catalogName) {
Query query = new Query(new Criteria().andOperator(Criteria.where("type").is("Dependency"),Criteria.where("from").is("Script")));
query.with(pageable);
return mongoTemplate.find(query, Document.class, PREFIX_MD_RELATION + catalogName);
}
public long countRelationByDependency(Pageable pageable, String catalogName) {
Query query = new Query(new Criteria().andOperator(Criteria.where("type").is("Dependency"),Criteria.where("from").is("Script")));
// Query query = new Query(new Criteria().orOperator(Criteria.where("type").is(INPUT),Criteria.where("type").is(OUTPUT)));
query.with(pageable);
return mongoTemplate.count(query, Document.class, PREFIX_MD_RELATION + catalogName);
}
public long countRelationByJobId(Pageable pageable, String catalogName,String jobId) {
Query query = new Query(new Criteria() .andOperator(Criteria.where("jobId").is(jobId),
new Criteria().orOperator(Criteria.where("type").is(INPUT),Criteria.where("type").is(OUTPUT))
......@@ -194,4 +209,37 @@ public class MongoDbServiceImpl {
return mongoTemplate.findAll(Document.class, "Relation" + catalogName);
}
//按照字段id模糊匹配,查总数
public long countColumnByColumnId(Pageable pageable, String catalogName,String columnModelPath) {
Query query = new Query(Criteria.where("_class").is(columnModelPath));
query.with(pageable);
return mongoTemplate.count(query, Document.class, PREFIX_METADATA_NODE + catalogName);
}
public List<Document> findColumnByColumnIdByPage(Pageable pageable, String catalogName,String columnModelPath) {
Query query = new Query(Criteria.where("_class").is(columnModelPath));
query.with(pageable);
return mongoTemplate.find(query, Document.class, PREFIX_METADATA_NODE + catalogName);
}
/**
BasicDBObject query = new BasicDBObject();
query.put(T_L, null);
FindIterable<Document> iterables = this.mongoTemplate.getCollection(name).find(query);
MongoCursor<Document> cursor = iterables.iterator();
List<Document> results = new ArrayList<>();
int i = 1;
while (cursor.hasNext()) {
results.add(cursor.next());
if (i % 30000 == 0) {
processList(results, name);
LogManager.logInfo(InitCtrl.class, name + "已处理了" + i + "条");
}
i++;
}
*/
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment