Commit 96980932 by qiuchaofei

1.添加etljob,etlscript模型,2修改table的查找,3修改同步流向关系的方法

parent a7b76387
......@@ -145,9 +145,6 @@ public class RelationalGraphController {
@RequestMapping(path = "/getTableBySchemaIdAndKeyWord", method = RequestMethod.GET)
public Map<String, List<ReturnNode>> getTableBySchemaIdAndKeyWord(String schemaId,String keyWord){
//传入一个系统名称/id,返回系统下的所有schema,注意分层
Map<String, List<ReturnNode>> stringListMap = schemaService.getTablesBySchemaId(schemaId);
return tableService.autoMatchBySchemaIdAndInputWord(schemaId,keyWord);
......
......@@ -3,6 +3,7 @@ package com.keymobile.metadata.metadataRelation.pojo.metadata;
import com.keymobile.metadata.metadataRelation.pojo.BaseNode;
import org.neo4j.ogm.annotation.NodeEntity;
@NodeEntity(label="Neo4jJob")
public class Neo4jJob extends BaseNode {
@NodeEntity(label="Neo4jETLJob")
public class Neo4jETLJob extends BaseNode {
}
package com.keymobile.metadata.metadataRelation.pojo.metadata;
import com.keymobile.metadata.metadataRelation.pojo.BaseNode;
import org.neo4j.ogm.annotation.NodeEntity;
@NodeEntity(label="Neo4jETLScript")
public class Neo4jETLScript extends BaseNode {
}
package com.keymobile.metadata.metadataRelation.respository.metadata;
import com.keymobile.metadata.metadataRelation.pojo.metadata.Neo4jETLJob;
import com.keymobile.metadata.metadataRelation.pojo.metadata.Neo4jFunction;
import org.springframework.data.neo4j.repository.Neo4jRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface Neo4jETLJobRepository extends Neo4jRepository<Neo4jETLJob, Long> {
List<Neo4jETLJob> findNeo4jETLJobByMetadataId(String metadataId);
}
package com.keymobile.metadata.metadataRelation.respository.metadata;
import com.keymobile.metadata.metadataRelation.pojo.metadata.Neo4jETLScript;
import org.springframework.data.neo4j.repository.Neo4jRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface Neo4jETLScriptRepository extends Neo4jRepository<Neo4jETLScript, Long> {
List<Neo4jETLScript> findNeo4jETLScriptByMetadataId(String metadataId);
}
......@@ -13,10 +13,19 @@ import java.util.List;
public interface Neo4jTableRepository extends Neo4jRepository<Neo4jTable,Long> {
List<Neo4jTable> findNeo4jTableByMetadataId(String metadataId);
@Query("match (n{metadataId:{tableId}} )<-[r:流向]-(m) return m ")
@Query("match (n{metadataId:{tableId}} )<-[r:流向]-(m1)<-[r1:流向]-(m) return m ")
List<BaseNode> getSourceTable(@Param("tableId") String tableId);
@Query("match (n{metadataId:{tableId}} )-[r:流向]->(m) return m ")
@Query("match (n{metadataId:{tableId}} )-[r1:流向]->(m1)-[r:流向]->(m) return m ")
List<BaseNode> getTargetTable(@Param("tableId") String tableId);
@Query("match (n{metadataId:{tableId}} )<-[r1:流向]-(m1) <-[r:Composition]-(m) return m ")
List<BaseNode> getSourceEtlJob(@Param("tableId") String tableId);
@Query("match (n{metadataId:{tableId}} )-[r1:流向]->(m1) <-[r:Composition]-(m) return m ")
List<BaseNode> getTargetEtlJob(@Param("tableId") String tableId);
}
......@@ -32,13 +32,10 @@ public class AsyncDataFromMongoToNeo4j {
@Autowired
private JdbcTemplate jdbcTemplate;
@Async
public void asyncDataFromMongoToNeo4j(String catalogName) {
long start = System.currentTimeMillis();
String sql = "select scope_id, scope_name from auth_scope";
String sql = "select scope_id, scope_name from auth_scope where domain_id ="+ catalogName;
List<MongoData> mongoDataList = jdbcTemplate.query(sql, new RowMapper<MongoData>() {
@Override
public MongoData mapRow(ResultSet resultSet, int i) throws SQLException {
......
......@@ -87,6 +87,11 @@ public class MetadataServiceImpl implements IMetadataService {
@Autowired
private Neo4jProcedureRepository neo4jProcedureRepository;
@Autowired
private Neo4jETLJobRepository neo4jETLJobRepository;
@Autowired
private Neo4jETLScriptRepository neo4jETLScriptRepository;
@Override
public List<MetaData> findNodeByName(String dataName) {
return metadataRepository.findMetaData(dataName, 3);
......@@ -1281,13 +1286,79 @@ public class MetadataServiceImpl implements IMetadataService {
}
systemDataMap.put(systemData.get_id(),neo4jSystem);
}
List<CompositionRelation> compositionRelationList = new ArrayList<>();
int etlJobCount = 1;
int etlScriptCount = 1;
for(MongoData catalogData:catalogDataList){
List<MongoData> dataBaseDataList= mongoDbServiceImpl.findDataByparentId(catalogData.get_id(),collectionName);
for(MongoData dataBaseData:dataBaseDataList){
String mongoId = dataBaseData.get_id();
if(mongoId.startsWith("Server=")){
//如果是server,就把etljob找出来
List<MongoData> etlJobDataList= mongoDbServiceImpl.findDataByparentId(dataBaseData.get_id(),collectionName);
List<Neo4jETLJob> neo4jETLJobList = new ArrayList<>();
for(MongoData etlJobData:etlJobDataList){
if(etlJobCount%200 == 0){
logger.info("同步了"+etlJobCount);
neo4jETLJobRepository.saveAll(neo4jETLJobList);
neo4jETLJobList.clear();
}
Neo4jETLJob neo4jETLJob = new Neo4jETLJob();
neo4jETLJob.setMetadataId(etlJobData.get_id());
neo4jETLJob.setName(etlJobData.getName());
neo4jETLJob.setCnName(etlJobData.getCnName());
//如果存在,就不用创建
List<Neo4jETLJob> neo4jETLJobList1 = neo4jETLJobRepository.findNeo4jETLJobByMetadataId(neo4jETLJob.getMetadataId());
if(neo4jETLJobList1==null || neo4jETLJobList1.size()==0){
neo4jETLJobList.add(neo4jETLJob);
etlJobCount++;
}
List<Neo4jETLScript> neo4jETLScriptList = new ArrayList<>();
List<MongoData> etlScriptMongoList = mongoDbServiceImpl.findDataByparentId(etlJobData.get_id(),collectionName);
for(MongoData etlScriptMongo:etlScriptMongoList){
if(etlScriptCount%200==0){
neo4jETLScriptRepository.saveAll(neo4jETLScriptList);
neo4jETLScriptList.clear();
}
Neo4jETLScript neo4jETLScript = new Neo4jETLScript();
neo4jETLScript.setMetadataId(etlScriptMongo.get_id());
neo4jETLScript.setName(etlScriptMongo.getName());
neo4jETLScript.setCnName(etlScriptMongo.getCnName());
//如果存在,就不用创建
List<Neo4jETLScript> neo4jScriptList = neo4jETLScriptRepository.findNeo4jETLScriptByMetadataId(neo4jETLScript.getMetadataId());
if(neo4jScriptList==null || neo4jScriptList.size()==0){
neo4jETLScriptList.add(neo4jETLScript);
etlScriptCount++;
CompositionRelation compositionRelation = new CompositionRelation();
compositionRelation.setStart(neo4jETLJob);
compositionRelation.setEnd(neo4jETLScript);
compositionRelation.setName("etlJob--Script");
compositionRelationList.add(compositionRelation);
}
}
if(neo4jETLScriptList !=null && neo4jETLScriptList.size()!=0 ){
neo4jETLScriptRepository.saveAll(neo4jETLScriptList);
}
}
if(neo4jETLJobList !=null && neo4jETLJobList.size()!=0 ){
neo4jETLJobRepository.saveAll(neo4jETLJobList);
}
}else if(mongoId.startsWith("Database=")){
List<MongoData> schemaDataList= mongoDbServiceImpl.findDataByparentId(dataBaseData.get_id(),collectionName);
for(MongoData schemaData:schemaDataList){
//系统与schema的关系
......@@ -1317,6 +1388,7 @@ public class MetadataServiceImpl implements IMetadataService {
List<Neo4jSchema> neo4jSchemaList = neo4jSchemaRepository.findNeo4jSchemaByMetadataId(neo4jSchema.getMetadataId());
if(neo4jSchemaList==null || neo4jSchemaList.size()==0){
neo4jSchemaRepository.save(neo4jSchema);
schemaCount++;
}
CompositionRelation compositionRelation = new CompositionRelation();
compositionRelation.setStart(neo4jSystem);
......@@ -1353,13 +1425,15 @@ public class MetadataServiceImpl implements IMetadataService {
neo4jTableList.clear();
}
List<Neo4jTable> neo4jTableListExist = neo4jTableRepository.findNeo4jTableByMetadataId(metadataId);
if(neo4jTableListExist == null || neo4jTableListExist.size() == 0){
Neo4jTable neo4jTable = new Neo4jTable();
neo4jTable.setMetadataId(tableClassData.get_id());
neo4jTable.setName(tableClassData.getName());
neo4jTable.setCnName(tableClassData.getCnName());
neo4jTable.setDataPath(tablePath);
List<Neo4jTable> neo4jTableListExist = neo4jTableRepository.findNeo4jTableByMetadataId(metadataId);
if(neo4jTableListExist == null || neo4jTableListExist.size() == 0){
neo4jTableList.add(neo4jTable);
CompositionRelation schem2Table = new CompositionRelation();
......@@ -1482,6 +1556,7 @@ public class MetadataServiceImpl implements IMetadataService {
}
}
}
}
int count = 0;
List<CompositionRelation> newCompositionList= new ArrayList<>();
for(CompositionRelation compositionRelation:compositionRelationList){
......@@ -1495,17 +1570,33 @@ public class MetadataServiceImpl implements IMetadataService {
if(newCompositionList.size()!=0){
compositionRelationRespository.saveAll(newCompositionList);
}
logger.info("创建的system数量:"+systemDataMap.size());
logger.info("创建的schema:"+schemaCount);
logger.info("创建的table数量:"+tableCount);
logger.info("创建的作业数量:"+etlJobCount);
logger.info("创建的脚本数量:"+etlScriptCount);
}
@Override
public void syschroTable2EtlJobRelations(String catalogName) {
Map<String, String> relationMap = new HashMap<>();
List<RelationMongo> relationMongoList = mongoDbServiceImpl.findAllRelationByCatalog(catalogName);
int size = 0;
for (RelationMongo relationMongo : relationMongoList) {
String sourceId = relationMongo.getSource();
String targetId = relationMongo.getTarget();
int page =0,pageSize = 300;
long totalElement = mongoDbServiceImpl.countRelation(PageRequest.of(page,pageSize),catalogName);
for(; page< totalElement;page+= pageSize){
List<Document> relationList = mongoDbServiceImpl.findRelationByPage(PageRequest.of(page,pageSize,Sort.by("_id")),catalogName);
for(Document relation :relationList){
String type = relation.getString("type");
if(!(type.equals("Input") || type.equals("Output"))){
continue;
}
String sourceId =relation.getString("source"); //relationMongo.getSource();
String targetId =relation.getString("target"); // relationMongo.getTarget();
Map<String, Object> sourceData = metadataRepoRemoteService.getMetadata(sourceId);
Map<String, Object> targetData = metadataRepoRemoteService.getMetadata(targetId);
if(sourceData==null || targetData==null){
......@@ -1539,19 +1630,19 @@ public class MetadataServiceImpl implements IMetadataService {
logger.info("没有找到id:" + targetId + "的父节点元数据。");
continue;
}
String targetParentId = (String) targetParent.get("_id");
Map<String, Object> targetParentParent = metadataRepoRemoteService.getParent(targetParentId);
if (targetParentParent==null) {
logger.info("没有找到id:" + targetId + "的祖父节点元数据。");
continue;
}
// String targetParentId = (String) targetParent.get("_id");
// Map<String, Object> targetParentParent = metadataRepoRemoteService.getParent(targetParentId);
// if (targetParentParent==null) {
// logger.info("没有找到id:" + targetId + "的祖父节点元数据。");
// continue;
// }
startId = (String )sourceParent.get("_id");
endId = (String )targetParentParent.get("_id");
endId = (String )targetParent.get("_id");
}else if(sourceId.contains("SQL=") && targetId.contains("Column=")){
Map<String, Object> targetParent = metadataRepoRemoteService.getParent(sourceId);
Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId);
Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(targetId);
Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
if (sourceParent == null ) {
logger.info("没有找到id:" + sourceId + "的父节点元数据。");
......@@ -1561,18 +1652,17 @@ public class MetadataServiceImpl implements IMetadataService {
logger.info("没有找到id:" + targetId + "的父节点元数据。");
continue;
}
String sourceParenttId = (String) sourceParent.get("_id");
Map<String, Object> sourceParentParent = metadataRepoRemoteService.getParent(sourceParenttId);
if (sourceParentParent==null) {
logger.info("没有找到id:" + sourceId + "的祖父节点元数据。");
continue;
}
// String sourceParenttId = (String) sourceParent.get("_id");
// Map<String, Object> sourceParentParent = metadataRepoRemoteService.getParent(sourceParenttId);
// if (sourceParentParent==null) {
// logger.info("没有找到id:" + sourceId + "的祖父节点元数据。");
// continue;
// }
startId = (String )sourceParentParent.get("_id");
startId = (String )sourceParent.get("_id");
endId = (String )targetParent.get("_id");
}
String type = relationMongo.getType();
String relationId = startId+"_"+endId;
if(!relationMap.containsKey(relationId)){
if(relationMap.size()%200 == 0){
......@@ -1583,6 +1673,91 @@ public class MetadataServiceImpl implements IMetadataService {
}
}
}
// List<RelationMongo> relationMongoList = mongoDbServiceImpl.findAllRelationByCatalog(catalogName);
// int size = 0;
// for (RelationMongo relationMongo : relationMongoList) {
// String sourceId = relationMongo.getSource();
// String targetId = relationMongo.getTarget();
// Map<String, Object> sourceData = metadataRepoRemoteService.getMetadata(sourceId);
// Map<String, Object> targetData = metadataRepoRemoteService.getMetadata(targetId);
// if(sourceData==null || targetData==null){
// continue;
// }
// String startId = "";
// String endId = "";
// //如果是字段,上升到表,如果是sql,上升到etljob,其他的模型站不处理
// if(sourceId.contains("Column=") && targetId.contains("Column=") ){
//
// Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId);
// Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
// if (sourceParent == null ) {
// logger.info("没有找到id:" + sourceId + "的父节点元数据。");
// continue;
// }
// if (targetParent==null) {
// logger.info("没有找到id:" + targetId + "的父节点元数据。");
// continue;
// }
// startId = (String )sourceParent.get("_id");
// endId = (String )targetParent.get("_id");
// } else if(sourceId.contains("Column=") && targetId.contains("SQL=") ){
// Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId);
// Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
// if (sourceParent == null ) {
// logger.info("没有找到id:" + sourceId + "的父节点元数据。");
// continue;
// }
// if (targetParent==null) {
// logger.info("没有找到id:" + targetId + "的父节点元数据。");
// continue;
// }
// String targetParentId = (String) targetParent.get("_id");
// Map<String, Object> targetParentParent = metadataRepoRemoteService.getParent(targetParentId);
// if (targetParentParent==null) {
// logger.info("没有找到id:" + targetId + "的祖父节点元数据。");
// continue;
// }
//
// startId = (String )sourceParent.get("_id");
// endId = (String )targetParentParent.get("_id");
// }else if(sourceId.contains("SQL=") && targetId.contains("Column=")){
// Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId);
//
// Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
//
// if (sourceParent == null ) {
// logger.info("没有找到id:" + sourceId + "的父节点元数据。");
// continue;
// }
// if (targetParent==null) {
// logger.info("没有找到id:" + targetId + "的父节点元数据。");
// continue;
// }
// String sourceParenttId = (String) sourceParent.get("_id");
// Map<String, Object> sourceParentParent = metadataRepoRemoteService.getParent(sourceParenttId);
// if (sourceParentParent==null) {
// logger.info("没有找到id:" + sourceId + "的祖父节点元数据。");
// continue;
// }
//
// startId = (String )sourceParentParent.get("_id");
// endId = (String )targetParent.get("_id");
// }
//
// String type = relationMongo.getType();
// String relationId = startId+"_"+endId;
// if(!relationMap.containsKey(relationId)){
// if(relationMap.size()%200 == 0){
// logger.info("创建第"+relationMap.size()+1+"多少关系:");
// }
// relationMap.put(relationId,"");
// relationshipService.saveRelation(startId,endId,"流向");
// }
//
// }
logger.info("创建了多少关系:"+relationMap.size());
}
......
......@@ -23,6 +23,7 @@ public class MongoDbServiceImpl {
@Autowired
private MongoTemplate mongoTemplate;
private static final String PREFIX_MD_RELATION = "md_relation_";
......@@ -125,4 +126,16 @@ public class MongoDbServiceImpl {
query.with(pageable);
return mongoTemplate.count(query, Document.class, PREFIX_MD_RELATION_TEMP_NODE + catalogName);
}
public List<Document> findRelationByPage(Pageable pageable, String catalogName) {
Query query = new Query(new Criteria().orOperator(Criteria.where("type").is("Input"),Criteria.where("type").is("Output")));
query.with(pageable);
return mongoTemplate.find(query, Document.class, PREFIX_MD_RELATION + catalogName);
}
public long countRelation(Pageable pageable, String catalogName) {
Query query = new Query(new Criteria().orOperator(Criteria.where("type").is("Input"),Criteria.where("type").is("Output")));
query.with(pageable);
return mongoTemplate.count(query, Document.class, PREFIX_MD_RELATION + catalogName);
}
}
......@@ -50,23 +50,23 @@ public class TableServiceImpl implements ITableService {
//作业
List<ReturnNode> etlJobs = new ArrayList<>();
//获取关联作业,
List<BaseNode> sourceBaseNodes = neo4jTableRepository.getSourceTable(tableId);
List<BaseNode> sourceBaseNodes = neo4jTableRepository.getSourceEtlJob(tableId);
for(BaseNode sourceBaseNode:sourceBaseNodes){
if(sourceBaseNode.getMetadataId().startsWith("Procedure=")){
if(sourceBaseNode.getMetadataId().startsWith("ETLJob=")){
ReturnNode returnNode = new ReturnNode();
returnNode.setId(sourceBaseNode.getMetadataId());
returnNode.setName(sourceBaseNode.getName());
returnNode.setType("Procedure");
returnNode.setType("ETLJob");
etlJobs.add(returnNode);
}
}
List<BaseNode> targetBaseNodes = neo4jTableRepository.getTargetTable(tableId);
List<BaseNode> targetBaseNodes = neo4jTableRepository.getTargetEtlJob(tableId);
for(BaseNode targetBaseNode:targetBaseNodes){
if(targetBaseNode.getMetadataId().startsWith("Procedure=")){
if(targetBaseNode.getMetadataId().startsWith("ETLJob=")){
ReturnNode returnNode = new ReturnNode();
returnNode.setId(targetBaseNode.getMetadataId());
returnNode.setName(targetBaseNode.getName());
returnNode.setType("Procedure");
returnNode.setType("ETLJob");
etlJobs.add(returnNode);
}
}
......@@ -81,6 +81,7 @@ public class TableServiceImpl implements ITableService {
relationObjects.put("模型",modelList);
//资产
List<ReturnNode> assetList = new ArrayList<>();
try{
List<Map<String, String>> dataAssetGraphInfos = dataAssertRemoteService.getDataAssetGraphInfoByMetadataId(tableId);
for(Map<String, String> map : dataAssetGraphInfos){
ReturnNode returnNode = new ReturnNode();
......@@ -89,11 +90,17 @@ public class TableServiceImpl implements ITableService {
returnNode.setCnName(map.get("cnName"));
assetList.add(returnNode);
}
}catch (Exception e){
e.printStackTrace();
}
relationObjects.put("资产",assetList);
//标准
List<ReturnNode> standardList = new ArrayList<>();
Map<String,Object> maps = new HashMap<>();
maps.put("metadataId",tableId);
try{
Map<String, Object> standardMaps = dataStandardRemoteService.findStandardByMetadatId(1, 10,
maps);
List<LinkedHashMap<String,Object>> contentList = (List<LinkedHashMap<String,Object>>) standardMaps.get("content");
......@@ -109,6 +116,11 @@ public class TableServiceImpl implements ITableService {
standardList.add(returnNode);
}
relationObjects.put("标准",standardList);
}catch (Exception e){
e.printStackTrace();
}
List<ReturnNode> neo4jTableList = getCurrentTableInfo(tableId);
relationObjects.put("当前表",neo4jTableList);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment