Commit d8a6658a by qiuchaofei

多线程获取关系

parent a6a3ba05
...@@ -5,6 +5,7 @@ import java.io.FileInputStream; ...@@ -5,6 +5,7 @@ import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.*; import java.util.*;
import java.util.concurrent.*;
import com.keymobile.metadata.metadataRelation.config.Neo4jConfig; import com.keymobile.metadata.metadataRelation.config.Neo4jConfig;
import com.keymobile.metadata.metadataRelation.pojo.*; import com.keymobile.metadata.metadataRelation.pojo.*;
...@@ -172,7 +173,10 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -172,7 +173,10 @@ public class MetadataServiceImpl implements IMetadataService {
String cypher = "match (n:MetaData ) where n.metadataId =~\"" + modelName + ".*\" return n"; String cypher = "match (n:MetaData ) where n.metadataId =~\"" + modelName + ".*\" return n";
logger.info("cypher:" + cypher); logger.info("cypher:" + cypher);
Map<String, MetaModel> metaModelMap = new HashMap<>(); Map<String, MetaModel> metaModelMap = new HashMap<>();
long starttime = System.currentTimeMillis();
StatementResult result = session.run(cypher); StatementResult result = session.run(cypher);
long endtime = System.currentTimeMillis();
logger.info("查询耗时:"+(endtime-starttime));
List<String> metadataIdList = new ArrayList<>(); List<String> metadataIdList = new ArrayList<>();
...@@ -211,82 +215,120 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -211,82 +215,120 @@ public class MetadataServiceImpl implements IMetadataService {
} }
} }
} }
logger.info("得到的节点数是:"+returnReslult.getNodes().size());
List<String> analysisId = new ArrayList<>(); List<String> analysisId = new ArrayList<>();
int count = 0; int count = 0;
//遍历每两个节点的关系 //遍历每两个节点的关系
for (String metadataId : metadataIdList) { long time0 = System.currentTimeMillis();
logger.info("dataId:" + metadataId); ExecutorService executor = Executors.newFixedThreadPool(10);
analysisId.add(metadataId);
for (String metadataId1 : metadataIdList) {
logger.info("---------dataId1:" + metadataId1);
if (metadataId.equals(metadataId1)) {
continue;
}
if (analysisId.contains(metadataId1)) {
continue;
}
String impactCypher = "";
if(modelName.toLowerCase().equals("table") ){
impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-[r:Composition]->()-->()-->()<-[r1:Composition]-(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("sqlscript")){
impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-[r:Execute]->()-->()-->()<-[r1:Execute]-(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("etljob")){
impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-[r:Execute]->()-[r1:Execute]->()-->()-->()<-[r2:Execute]-()<-[r3:Execute]-(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("column") ||modelName.toLowerCase().equals("sql") ){
impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-->()-->(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
"RETURN p";
}
List<Future<List<Edge>>> relations = new ArrayList<>();
StatementResult impactResult = session.run(impactCypher); final CountDownLatch countDownLatch = new CountDownLatch(10);
if (impactResult.hasNext()) { for(String metadataId:metadataIdList){
logger.info(metadataId + " --->>" + metadataId1); Future<List<Edge>> futures = executor.submit(new neo4jRelaionTask(metadataId,metadataIdList,modelName,session,countDownLatch));
Edge edge = new Edge(); relations.add(futures);
// edge.setEdgeId(); }
edge.setFromId(metadataId); try {
edge.setToId(metadataId1); countDownLatch.await();
edge.setType("流入"); executor.shutdown();
edge.setEdgeId("" + count++); } catch (Exception e) {
returnReslult.getEdges().add(edge); e.printStackTrace();
} } finally {
String lineageCypher = ""; executor.shutdown();
if(modelName.toLowerCase().equals("table")){ }
lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Composition]->() -->()-->()<-[r1:Composition]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("sqlscript")){
lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Execute]->() -->()-->()<-[r1:Execute]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("etljob")){
lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Execute]->()-[r1:Execute]->()-->()-->()<-[r2:Execute]-()<-[r3:Execute]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("column") ||modelName.toLowerCase().equals("sql") ){
lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-->()-->(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
"RETURN p";
}
// String lineageCypher = "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Composition]->() -->()-->()<-[r1:Composition]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
// "RETURN p";
StatementResult lineageResult = session.run(lineageCypher);
if (lineageResult.hasNext()) {
logger.info(metadataId + "<<-- " + metadataId1);
Edge edge = new Edge();
edge.setEdgeId("" + count++); for(Future<List<Edge>> f : relations){
// edge.setEdgeId(); try {
edge.setFromId(metadataId1); List<Edge> relationList = f.get();
edge.setToId(metadataId); for(Edge edge:relationList){
edge.setType("流出"); // System.out.println("relation:"+edge.getEdgeId());
returnReslult.getEdges().add(edge); returnReslult.getEdges().add(edge);
} }
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} }
} }
// for (String metadataId : metadataIdList) {
// logger.info("dataId:" + metadataId);
//
// analysisId.add(metadataId);
// for (String metadataId1 : metadataIdList) {
// logger.info("---------dataId1:" + metadataId1);
// if (metadataId.equals(metadataId1)) {
// continue;
// }
// if (analysisId.contains(metadataId1)) {
// continue;
// }
// String impactCypher = "";
// if(modelName.toLowerCase().equals("table") ){
// impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-[r:Composition]->()-->()-->()<-[r1:Composition]-(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
// "RETURN p";
// }else if(modelName.toLowerCase().equals("sqlscript")){
// impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-[r:Execute]->()-->()-->()<-[r1:Execute]-(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
// "RETURN p";
//
// }else if(modelName.toLowerCase().equals("etljob")){
// impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-[r:Execute]->()-[r1:Execute]->()-->()-->()<-[r2:Execute]-()<-[r3:Execute]-(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
// "RETURN p";
// }else if(modelName.toLowerCase().equals("column") ||modelName.toLowerCase().equals("sql") ){
// impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-->()-->(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
// "RETURN p";
// }
//
//
// StatementResult impactResult = session.run(impactCypher);
// if (impactResult.hasNext()) {
// logger.info(metadataId + " --->>" + metadataId1);
// Edge edge = new Edge();
//// edge.setEdgeId();
// edge.setFromId(metadataId);
// edge.setToId(metadataId1);
// edge.setType("流入");
// edge.setEdgeId("" + count++);
// returnReslult.getEdges().add(edge);
// }
// String lineageCypher = "";
// if(modelName.toLowerCase().equals("table")){
// lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Composition]->() -->()-->()<-[r1:Composition]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
// "RETURN p";
// }else if(modelName.toLowerCase().equals("sqlscript")){
// lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Execute]->() -->()-->()<-[r1:Execute]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
// "RETURN p";
// }else if(modelName.toLowerCase().equals("etljob")){
// lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Execute]->()-[r1:Execute]->()-->()-->()<-[r2:Execute]-()<-[r3:Execute]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
// "RETURN p";
// }else if(modelName.toLowerCase().equals("column") ||modelName.toLowerCase().equals("sql") ){
// lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-->()-->(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
// "RETURN p";
// }
//// String lineageCypher = "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Composition]->() -->()-->()<-[r1:Composition]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
//// "RETURN p";
// StatementResult lineageResult = session.run(lineageCypher);
// if (lineageResult.hasNext()) {
// logger.info(metadataId + "<<-- " + metadataId1);
// Edge edge = new Edge();
//
// edge.setEdgeId("" + count++);
//// edge.setEdgeId();
// edge.setFromId(metadataId1);
// edge.setToId(metadataId);
// edge.setType("流出");
// returnReslult.getEdges().add(edge);
// }
// }
// }
long time1 = System.currentTimeMillis();
logger.info("得到的关系数是:"+returnReslult.getEdges().size());
logger.info("查询关系耗时:" + (time1-time0));
returnReslult.setMetaModelMap(metaModelMap); returnReslult.setMetaModelMap(metaModelMap);
return returnReslult; return returnReslult;
} }
...@@ -637,14 +679,13 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -637,14 +679,13 @@ public class MetadataServiceImpl implements IMetadataService {
int page = 0, pageSize = 500; int page = 0, pageSize = 500;
long totalElement = mongoDbServiceImpl.countTempRelation(PageRequest.of(page, pageSize), catalogName); long totalElement = mongoDbServiceImpl.countTempRelation(PageRequest.of(page, pageSize), catalogName);
Map<String, Long> nameIdMap = new HashMap<>(); Map<String, Long> nameIdMap = new HashMap<>();
long tatalPage = Math.round(Math.ceil(totalElement / (pageSize + 0.0))); for (; page < totalElement; page += pageSize) {
for (; page < tatalPage; page++) {
List<Document> list = mongoDbServiceImpl.findTempRelationByPage(PageRequest.of(page, pageSize, Sort.by("etlJobId")), catalogName); List<Document> list = mongoDbServiceImpl.findTempRelationByPage(PageRequest.of(page, pageSize, Sort.by("etlJobId")), catalogName);
for (Document relation : list) { for (Document relation : list) {
String sqlId = relation.getString("etlSqlId"); String sqlId = relation.getString("etlSqlId");
String etlScriptId = relation.getString("etlScriptId"); String etlScriptId = relation.getString("etlScriptId");
String etlJobId = relation.getString("etlJobId"); String etlJobId = relation.getString("etlJobId");
//deleteTempNodeByCatalogName(sqlId); deleteTempNodeByCatalogName(sqlId);
MetaData metadata = metadataRepository.findMetaDataByMetadataId(sqlId); MetaData metadata = metadataRepository.findMetaDataByMetadataId(sqlId);
if (metadata == null) { if (metadata == null) {
logger.error("id : {} is not found", sqlId); logger.error("id : {} is not found", sqlId);
...@@ -686,3 +727,94 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -686,3 +727,94 @@ public class MetadataServiceImpl implements IMetadataService {
return (List<TempNode>) tempNodeRepository.findAll(); return (List<TempNode>) tempNodeRepository.findAll();
} }
} }
class neo4jRelaionTask implements Callable<List<Edge>>{
private static final Logger logger = LoggerFactory.getLogger(neo4jRelaionTask.class);
private String metadataId ;
private List<String> metadataIdList = new ArrayList<>();
private String modelName;
private Session session;
private CountDownLatch countDownLatch;
public neo4jRelaionTask(String metadataId, List<String> metadataIdList,String modelname, Session session , CountDownLatch countDownLatch) {
this.metadataId = metadataId;
this.metadataIdList = metadataIdList;
this.countDownLatch = countDownLatch;
this.modelName = modelname;
this.session=session;
}
List<Edge> edgeList = new ArrayList<>();
@Override
public List<Edge> call() throws Exception {
long count = System.currentTimeMillis();
for (String metadataId1 : metadataIdList) {
if (metadataId.equals(metadataId1)) {
continue;
}
// if (analysisId.contains(metadataId1)) {
// continue;
// }
String impactCypher = "";
if(modelName.toLowerCase().equals("table") ){
impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-[r:Composition]->()-->()-->()<-[r1:Composition]-(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("sqlscript")){
impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-[r:Execute]->()-->()-->()<-[r1:Execute]-(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("etljob")){
impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-[r:Execute]->()-[r1:Execute]->()-->()-->()<-[r2:Execute]-()<-[r3:Execute]-(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("column") ||modelName.toLowerCase().equals("sql") ){
impactCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId + "\"})-->()-->(b:MetaData{metadataId:\"" + metadataId1 + "\"})\n" +
"RETURN p";
}
StatementResult impactResult = session.run(impactCypher);
if (impactResult.hasNext()) {
logger.info(metadataId + " --->>" + metadataId1);
Edge edge = new Edge();
// edge.setEdgeId();
edge.setFromId(metadataId);
edge.setToId(metadataId1);
edge.setType("流入");
edge.setEdgeId("" + count++);
edgeList.add(edge);
}
String lineageCypher = "";
if(modelName.toLowerCase().equals("table")){
lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Composition]->() -->()-->()<-[r1:Composition]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("sqlscript")){
lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Execute]->() -->()-->()<-[r1:Execute]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("etljob")){
lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Execute]->()-[r1:Execute]->()-->()-->()<-[r2:Execute]-()<-[r3:Execute]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
"RETURN p";
}else if(modelName.toLowerCase().equals("column") ||modelName.toLowerCase().equals("sql") ){
lineageCypher= "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-->()-->(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
"RETURN p";
}
// String lineageCypher = "MATCH p=(a:MetaData{metadataId:\"" + metadataId1 + "\"})-[r:Composition]->() -->()-->()<-[r1:Composition]-(b:MetaData{metadataId:\"" + metadataId + "\"})\n" +
// "RETURN p";
StatementResult lineageResult = session.run(lineageCypher);
if (lineageResult.hasNext()) {
logger.info(metadataId + "<<-- " + metadataId1);
Edge edge = new Edge();
edge.setEdgeId("" + count++);
// edge.setEdgeId();
edge.setFromId(metadataId1);
edge.setToId(metadataId);
edge.setType("流出");
edgeList.add(edge);
}
}
this.countDownLatch.countDown();
return edgeList;
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment