Commit f909c3cf by qiuchaofei

1 关系添加属性:jobId,2 添加接口,给解析sql调用,更新局部关系。

parent cb037974
......@@ -556,6 +556,21 @@ public class MetaDataController {
logger.info("返回结果,后台继续运行。");
return flag;
}
@ApiOperation(tags = "", value = "根据环境id与jobId同步元数据关系的接口")
@RequestMapping(path = "/synchroRelationByJobId", method = RequestMethod.POST)
public String synchroRelationByJobId(String catalogName,String jobId) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
LogManager.logInfo(LogConstants.CTX_Relation, "用户 "+ getUserName() + " 在 "+ sdf.format(new Date())+"同步"+catalogName+" JobId:"+jobId+"的元数据。" );
logger.info("开始异步迁移元数据:从mongo到neo4j");
String flag = "success";
asyncDataFromMongoToNeo4j.asyncJobFromMongo2Neo4j(catalogName,jobId);
logger.info("返回结果,后台继续运行。");
return flag;
}
private String getUserName() {
Object obj = SecurityContextHolder.getContext().getAuthentication().getPrincipal();
if (obj instanceof String) {
......
......@@ -22,6 +22,8 @@ public class AsyncDataFromMongoToNeo4j {
private static final Logger logger = LoggerFactory.getLogger(AsyncDataFromMongoToNeo4j.class);
@Autowired
private IMetadataService metadataService;
@Autowired
private IMetadataRelationService metadataRelationService;
@Autowired
private JdbcTemplate jdbcTemplate;
......@@ -83,6 +85,12 @@ public class AsyncDataFromMongoToNeo4j {
logger.info("同步数据完成,用时:"+(end-start));
}
@Async
public void asyncJobFromMongo2Neo4j(String catalogName,String jobId) {
//先按照环境id,作业id,删除关系,然后同步关系,
// 一般来说,关系的字段已经存在neo4j,但是sql不一定存在,
// 如果sql不存在,需要先同步sql ,srcipt,job,再建立关系。
metadataRelationService.deleteRelationByJobId(catalogName,jobId);
metadataRelationService.syschroRelationFromMongo(catalogName,jobId);
}
}
......@@ -69,6 +69,8 @@ public interface IBaseRelationshipService {
void saveRelation(String startMetadataId, String endMetadataid, String composition);
void saveRelation(String startId, String endId, String relationType,String description);
void saveRelation(String startId, String endId, String relationType,String jobId);
void saveRelation(String startId, String endId, String relationType,String jobId,String description);
}
......@@ -8,4 +8,8 @@ public interface IMetadataRelationService {
ReturnReslult filterJosnByModelName(String reulstJson,String filterModel,String retainModel);
//删除关系
void deleteRelationByJobId(String catalogName, String jobId);
//同步关系,如果元数据不存在,要先同步元数据
void syschroRelationFromMongo(String catalogName, String jobId);
}
......@@ -183,7 +183,7 @@ public class BaseRelationshipServiceImpl implements IBaseRelationshipService {
}
@Override
public void saveRelation(String startId, String endId, String relationType,String description) {
public void saveRelation(String startId, String endId, String relationType,String jobId) {
Neo4jConfig neo4jConfig = new Neo4jConfig();
Driver neo4jConnection = neo4jConfig.getNeo4jConnection();
Session session = neo4jConnection.session();
......@@ -191,9 +191,26 @@ public class BaseRelationshipServiceImpl implements IBaseRelationshipService {
// (m{metadataId:"Table=1=d644b631fa8c434e928bcd1f1665b060"})
// create (n)<-[r:Composition]-(m)
// return n,m,r
String cypher = " match (n{metadataId:\""+startId+"\"}),(m{metadataId:\""+endId+"\"}) merge (n)-[r:"+relationType+"]->(m) set r.description ="+description;
String cypher = " match (n{metadataId:\""+startId+"\"}),(m{metadataId:\""+endId+"\"}) merge (n)-[r:"+relationType+"]->(m) set r.jobId ="+jobId;
// logger.info("运行保存关系完成:" + cypher);
session.run(cypher);
}
@Override
public void saveRelation(String startId, String endId, String relationType,String jobId,String description) {
Neo4jConfig neo4jConfig = new Neo4jConfig();
Driver neo4jConnection = neo4jConfig.getNeo4jConnection();
Session session = neo4jConnection.session();
// match (n{metadataId:"Column=1=d9c2d67e56a3428e8e4f22918782437f"}),
// (m{metadataId:"Table=1=d644b631fa8c434e928bcd1f1665b060"})
// create (n)<-[r:Composition]-(m)
// return n,m,r
String cypher = " match (n{metadataId:\""+startId+"\"}),(m{metadataId:\""+endId+"\"}) " +
"merge (n)-[r:"+relationType+"]->(m) set r.jobId ="+description + " and r.description = "+ description;
// logger.info("运行保存关系完成:" + cypher);
session.run(cypher);
}
}
package com.keymobile.metadata.metadataRelation.service.impl;
import com.keymobile.metadata.metadataRelation.config.Neo4jConfig;
import com.keymobile.metadata.metadataRelation.pojo.MetaData;
import com.keymobile.metadata.metadataRelation.pojo.*;
import com.keymobile.metadata.metadataRelation.pojo.metadata.Neo4jETLJob;
import com.keymobile.metadata.metadataRelation.pojo.metadata.Neo4jETLScript;
import com.keymobile.metadata.metadataRelation.pojo.metadata.Neo4jETLSql;
import com.keymobile.metadata.metadataRelation.pojo.returnBean.MetaModel;
import com.keymobile.metadata.metadataRelation.pojo.returnBean.ReturnEdge;
import com.keymobile.metadata.metadataRelation.pojo.returnBean.ReturnNode;
import com.keymobile.metadata.metadataRelation.pojo.returnBean.ReturnReslult;
import com.keymobile.metadata.metadataRelation.remote.MetadataRepoRemoteService;
import com.keymobile.metadata.metadataRelation.respository.metadata.Neo4jETLJobRepository;
import com.keymobile.metadata.metadataRelation.respository.metadata.Neo4jETLScriptRepository;
import com.keymobile.metadata.metadataRelation.respository.metadata.Neo4jETLSqlRepository;
import com.keymobile.metadata.metadataRelation.service.IBaseRelationshipService;
import com.keymobile.metadata.metadataRelation.service.IMetadataRelationService;
import com.keymobile.metadata.metadataRelation.util.Neo4jTool;
import net.sf.json.JSON;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.bson.Document;
import org.neo4j.driver.v1.*;
import org.neo4j.driver.v1.types.Node;
import org.neo4j.driver.v1.types.Path;
import org.neo4j.driver.v1.types.Relationship;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
......@@ -30,6 +42,21 @@ public class MetadataRelationServiceImpl implements IMetadataRelationService {
private static final Logger logger = LoggerFactory.getLogger(MetadataRelationServiceImpl.class);
@Autowired
private MetadataRepoRemoteService metadataRepoRemoteService;
@Autowired
private MongoDbServiceImpl mongoDbServiceImpl;
@Autowired
private Neo4jETLJobRepository neo4jETLJobRepository;
@Autowired
private Neo4jETLScriptRepository neo4jETLScriptRepository;
@Autowired
private Neo4jETLSqlRepository neo4jETLSqlRepository;
@Autowired
private IBaseRelationshipService relationshipService;
@Override
public ReturnReslult expandNodeByMetadataId(String metadataId, String direction){
//方向有,forward,backword,all
......@@ -320,6 +347,220 @@ public class MetadataRelationServiceImpl implements IMetadataRelationService {
return newReslult;
}
@Override
public void deleteRelationByJobId(String catalogName, String jobId) {
try{
Neo4jConfig neo4jConfig = new Neo4jConfig();
Driver neo4jConnection = neo4jConfig.getNeo4jConnection();
Session session = neo4jConnection.session();
String cypher = "match (n)-[r:`流向`]-(m) where r.jobId ="+jobId +" delete r";
logger.info("删除数据的语句:"+cypher);
StatementResult statementResult = session.run(cypher);
while (statementResult.hasNext()){
logger.info("statementResult.next():"+statementResult.next());
}
}catch(Exception e){
logger.info("删除数据异常:"+e.getMessage());
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void syschroRelationFromMongo(String catalogName, String jobId) {
Map<String,String> edgeIdMap = new HashMap<>();
int page =0,pageSize = 300;
long time0 = System.currentTimeMillis();
long totalElement = mongoDbServiceImpl.countRelationByJobId(PageRequest.of(page,pageSize),catalogName,jobId);
long time1 = System.currentTimeMillis();
logger.info("从mongo获取的关系的总数量是:"+ totalElement +" ,耗时:" +(time1-time0) );
int totalPageCount = ((int)totalElement/pageSize)+1;
logger.info("准备分页从mongo获取关系,每页:"+ pageSize);
Map<String,String > etlSqlIdMap = new HashMap<>();
Map<String,String > etlJobIdMap = new HashMap<>();
Map<String,String > etlScriptIdMap = new HashMap<>();
for(; page< totalPageCount;page++){
logger.info("开始从 mongo 第"+page+1 +" 次获取关系。");
String description = "";
long time3 = System.currentTimeMillis();
List<Document> relationList = mongoDbServiceImpl.findRelationByJobIdByPage(PageRequest.of(page,pageSize, Sort.by("_id")),catalogName,jobId);
long time4 = System.currentTimeMillis();
logger.info("从 mongo 第"+page+1 +" 次获取关系结束,耗时:" +(time4-time3) );
for(Document relation :relationList){
description = relation.getString("description");
String type = relation.getString("type");
if(!(type.equals("Input") || type.equals("Output"))){
continue;
}
String sourceId =relation.getString("source"); //relationMongo.getSource();
String targetId =relation.getString("target"); // relationMongo.getTarget();
//本身的字段级关系也要同步
edgeIdMap.put(sourceId+"_"+targetId,description);
Map<String, Object> sourceData = metadataRepoRemoteService.getMetadata(sourceId);
Map<String, Object> targetData = metadataRepoRemoteService.getMetadata(targetId);
if(sourceData==null || targetData==null){
continue;
}
String startId = "";
String endId = "";
//如果是字段,上升到表,如果是sql,上升到etlscript,其他的模型之间的关系暂不处理
// 先同步etlsql等模型。
if(sourceId.contains("Column=") && targetId.contains("Column=") ){
Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId);
Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
//要记录字段级别的关系
if (sourceParent == null ) {
logger.info("没有找到id:" + sourceId + "的父节点元数据。");
continue;
}
if (targetParent==null) {
logger.info("没有找到id:" + targetId + "的父节点元数据。");
continue;
}
startId = (String )sourceParent.get("_id");
endId = (String )targetParent.get("_id");
} else if(sourceId.contains("Column=") && targetId.contains("SQL=") ){
etlSqlIdMap.put(targetId,"");
Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId);
Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
if (sourceParent == null ) {
logger.info("没有找到id:" + sourceId + "的父节点元数据。");
continue;
}
if (targetParent==null) {
logger.info("没有找到id:" + targetId + "的父节点元数据。");
continue;
}
String targetParentId = (String) targetParent.get("_id");
Map<String, Object> targetParentParent = metadataRepoRemoteService.getParent(targetParentId);
if (targetParentParent==null) {
logger.info("没有找到id:" + targetId + "的祖父节点元数据。");
continue;
}
startId = (String )sourceParent.get("_id");
endId = (String )targetParent.get("_id");
etlScriptIdMap.put(endId,"");
String parentParentId = (String ) targetParentParent.get("_id");
etlJobIdMap.put(parentParentId,"");
}else if(sourceId.contains("SQL=") && targetId.contains("Column=")){
etlSqlIdMap.put(sourceId,"");
Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId);
Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
if (sourceParent == null ) {
logger.info("没有找到id:" + sourceId + "的父节点元数据。");
continue;
}
if (targetParent==null) {
logger.info("没有找到id:" + targetId + "的父节点元数据。");
continue;
}
String sourceParenttId = (String) sourceParent.get("_id");
etlScriptIdMap.put(sourceParenttId,"");
Map<String, Object> sourceParentParent = metadataRepoRemoteService.getParent(sourceParenttId);
if (sourceParentParent==null) {
logger.info("没有找到id:" + sourceId + "的祖父节点元数据。");
continue;
}
String parentParentId = (String ) sourceParentParent.get("_id");
etlJobIdMap.put(parentParentId,"");
startId = (String )sourceParent.get("_id");
endId = (String )targetParent.get("_id");
}
String relationId = startId+"_"+endId;
edgeIdMap.put(relationId,description);
}
}
for(Object obj : etlJobIdMap.keySet()){
logger.info("开始同步 etlJob,id="+ obj);
String metadataId = (String )obj;
Neo4jETLJob neo4jETLJob1 = neo4jETLJobRepository.findNeo4jETLJobByMetadataId(metadataId);
if(neo4jETLJob1 ==null ){
//不存在就从 mongo同步
Map<String, Object> attribute = metadataRepoRemoteService.getMetadata(metadataId);
String metadataName = (String ) attribute.get("name");
Neo4jETLJob neo4jETLJob = new Neo4jETLJob();
neo4jETLJob.setMetadataId(metadataId);
neo4jETLJob.setName(metadataName);
neo4jETLJobRepository.save(neo4jETLJob);
}
}
for(Object obj : etlScriptIdMap.keySet()){
String metadataId = (String )obj;
logger.info("开始同步 etlScript,id="+ metadataId);
Neo4jETLScript neo4jETLJob1 = neo4jETLScriptRepository.findNeo4jETLScriptByMetadataId(metadataId);
if(neo4jETLJob1 ==null ){
//不存在就从 mongo同步
Map<String, Object> attribute = metadataRepoRemoteService.getMetadata(metadataId);
String metadataName = (String ) attribute.get("name");
Neo4jETLScript neo4jETLJob = new Neo4jETLScript();
neo4jETLJob.setMetadataId(metadataId);
neo4jETLJob.setName(metadataName);
neo4jETLScriptRepository.save(neo4jETLJob);
}
}
for(Object obj : etlSqlIdMap.keySet()){
logger.info("开始同步 etlsql,id="+ obj);
String metadataId = (String )obj;
Neo4jETLSql neo4jETLsql1 = neo4jETLSqlRepository.findNeo4jETLSqlByMetadataId(metadataId);
if(neo4jETLsql1 ==null ){
//不存在就从 mongo同步
Map<String, Object> attribute = metadataRepoRemoteService.getMetadata(metadataId);
String metadataName = (String ) attribute.get("name");
Neo4jETLSql neo4jETLJob = new Neo4jETLSql();
neo4jETLJob.setMetadataId(metadataId);
neo4jETLJob.setName(metadataName);
neo4jETLSqlRepository.save(neo4jETLJob);
}
}
long start = System.currentTimeMillis();
int count=1;
for(Object obj : edgeIdMap.keySet()){
if(count%100 == 0){
logger.info("创建了"+count+"关系:");
}
String edgeId = (String) obj;
String[] dataIds = edgeId.split("_");
String startId = dataIds[0];
String endId = dataIds[1];
String description = edgeIdMap.get(obj);
if(description ==null || description.equalsIgnoreCase("")){
relationshipService.saveRelation(startId,endId,"流向",jobId);
}else{
relationshipService.saveRelation(startId,endId,"流向",jobId,description);
}
}
long end = System.currentTimeMillis();
logger.info("一共在neo4j,创建了多少关系:"+edgeIdMap.size() + " ,总用时:"+(end-start));
}
private ReturnReslult convertorToReturnResult(String reulstJson){
ReturnReslult returnReslult = new ReturnReslult();
......
......@@ -1852,13 +1852,11 @@ public class MetadataServiceImpl implements IMetadataService {
relationshipService.saveRelation(sourceTableId,targetTableId,"流向");
}
}
@Override
public void syschroTable2EtlJobRelations(String catalogName) {
Map<String, String> relationMap = new HashMap<>();
Map<String,String> edgeIdMap = new HashMap<>();
Map<String,Map<String, String>> edgeIdMap = new HashMap<>();
int page =0,pageSize = 300;
long time0 = System.currentTimeMillis();
long totalElement = mongoDbServiceImpl.countRelation(PageRequest.of(page,pageSize),catalogName);
......@@ -1869,13 +1867,17 @@ public class MetadataServiceImpl implements IMetadataService {
logger.info("准备分页从mongo获取关系,每页:"+ pageSize);
for(; page< totalPageCount;page++){
logger.info("开始从 mongo 第"+page+1 +" 次获取关系。");
String description = "";
long time3 = System.currentTimeMillis();
List<Document> relationList = mongoDbServiceImpl.findRelationByPage(PageRequest.of(page,pageSize,Sort.by("_id")),catalogName);
long time4 = System.currentTimeMillis();
logger.info("从 mongo 第"+page+1 +" 次获取关系结束,耗时:" +(time4-time3) );
for(Document relation :relationList){
description = relation.getString("description");
Map<String,String> attributeMap = new HashMap<>();
String jobId = relation.getString("jobId");
attributeMap.put("jobId",jobId);
String description = relation.getString("description");
attributeMap.put("description",description);
String type = relation.getString("type");
if(!(type.equals("Input") || type.equals("Output"))){
continue;
......@@ -1884,7 +1886,7 @@ public class MetadataServiceImpl implements IMetadataService {
String targetId =relation.getString("target"); // relationMongo.getTarget();
//本身的字段级关系也要同步
edgeIdMap.put(sourceId+"_"+targetId,description);
edgeIdMap.put(sourceId+"_"+targetId,attributeMap);
Map<String, Object> sourceData = metadataRepoRemoteService.getMetadata(sourceId);
Map<String, Object> targetData = metadataRepoRemoteService.getMetadata(targetId);
......@@ -1953,7 +1955,7 @@ public class MetadataServiceImpl implements IMetadataService {
endId = (String )targetParent.get("_id");
}
String relationId = startId+"_"+endId;
edgeIdMap.put(relationId,description);
edgeIdMap.put(relationId,attributeMap);
}
}
......@@ -1967,11 +1969,14 @@ public class MetadataServiceImpl implements IMetadataService {
String[] dataIds = edgeId.split("_");
String startId = dataIds[0];
String endId = dataIds[1];
String description = edgeIdMap.get(obj);
Map<String,String> attributeMap0 = edgeIdMap.get(obj);
String description = attributeMap0.get("description");
String jobId = attributeMap0.get("jobId");
// String description = edgeIdMap.get(obj);
if(description ==null || description.equalsIgnoreCase("")){
relationshipService.saveRelation(startId,endId,"流向");
relationshipService.saveRelation(startId,endId,"流向",jobId);
}else{
relationshipService.saveRelation(startId,endId,"流向",description);
relationshipService.saveRelation(startId,endId,"流向",jobId,description);
}
}
......
......@@ -165,6 +165,27 @@ public class MongoDbServiceImpl {
return mongoTemplate.count(query, Document.class, PREFIX_MD_RELATION + catalogName);
}
public long countRelationByJobId(Pageable pageable, String catalogName,String jobId) {
Query query = new Query(new Criteria() .andOperator(Criteria.where("jobId").is(jobId),
new Criteria().orOperator(Criteria.where("type").is("Input"),Criteria.where("type").is("Output"))
)
);
// Query query = new Query(new Criteria().orOperator(Criteria.where("type").is("Input"),Criteria.where("type").is("Output")));
query.with(pageable);
return mongoTemplate.count(query, Document.class, PREFIX_MD_RELATION + catalogName);
}
public List<Document> findRelationByJobIdByPage(Pageable pageable, String catalogName,String jobId) {
Query query = new Query(new Criteria() .andOperator(Criteria.where("jobId").is(jobId),
new Criteria().orOperator(Criteria.where("type").is("Input"),Criteria.where("type").is("Output"))
)
);
// Query query = new Query(new Criteria().orOperator(Criteria.where("type").is("Input"),Criteria.where("type").is("Output")));
query.with(pageable);
return mongoTemplate.find(query, Document.class, PREFIX_MD_RELATION + catalogName);
}
public List<Document> find1104Relations( String catalogName) {
return mongoTemplate.findAll(Document.class, "Relation" + catalogName);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment