Commit c5d98f29 by qiuchaofei

1.修改同步节点的方法,2去掉system.out的打印。3.同步过程中在map中移除已经存在的节点。

parent 5f373286
......@@ -52,7 +52,18 @@
<artifactId>neo4j-ogm-http-driver</artifactId>
<version>3.0.0</version>
</dependency>
<!-- add this dependency if you want to use the embedded driver -->
<!-- add this dependency if you want to use the embedded driver
<dependency>
<groupId>com.steelbridgelabs.oss</groupId>
<artifactId>neo4j-gremlin-bolt</artifactId>
<version>0.3.1</version>
</dependency>
-->
<dependency> <!-- If you're using the Bolt driver -->
<groupId>org.neo4j</groupId>
<artifactId>neo4j-ogm-bolt-driver</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-ogm-embedded-driver</artifactId>
......@@ -99,11 +110,7 @@
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
<dependency>
<groupId>com.steelbridgelabs.oss</groupId>
<artifactId>neo4j-gremlin-bolt</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
......
......@@ -305,7 +305,7 @@ public class RelationalGraphController {
@RequestMapping(path = "/getIp", method = RequestMethod.GET)
public String getIp(Integer numbers) {
String ip = getIp();
System.out.println("请求的ip是:"+ip);
// System.out.println("请求的ip是:"+ip);
return ip;
}
......
......@@ -93,7 +93,7 @@ public class AsyncDataFromMongoToNeo4j {
metadataService.deleteTempNodeByCatalogName(catalogName);
metadataService.createTempNode(catalogName);
long end = System.currentTimeMillis();
logger.info("同步数据完成,用时:"+(end-start));
logger.info("同步数据完成,总共用时:"+(end-start));
}
@Async
public void asyncJobFromMongo2Neo4j(String catalogName,String jobId) {
......@@ -101,7 +101,7 @@ public class AsyncDataFromMongoToNeo4j {
CompletableFuture<String> future = asyncRelation.asyncJobFromMongo2Neo4j(catalogName, jobId);
try {
String message = future.get();
System.out.println("message:"+message);
// System.out.println("message:"+message);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
......@@ -111,7 +111,7 @@ public class AsyncDataFromMongoToNeo4j {
//先按照环境id,作业id,删除关系,然后同步关系,
// 一般来说,关系的字段已经存在neo4j,但是sql不一定存在,
// 如果sql不存在,需要先同步sql ,srcipt,job,再建立关系。
// metadataRelationService.deleteRelationByJobId(catalogName,jobId);
// metadataRelationService.syschroRelationFromMongo(catalogName,jobId);
metadataRelationService.deleteRelationByJobId(catalogName,jobId);
metadataRelationService.syschroRelationFromMongo(catalogName,jobId);
}
}
......@@ -52,7 +52,7 @@ public class BaseRelationshipServiceImpl implements IBaseRelationshipService {
List<BaseRelationship> baseRelationshipList = new ArrayList<>();
for (CompositionRelation compositionRelation : compositionRelationList) {
System.out.println("compositionRelation:"+compositionRelation);
// System.out.println("compositionRelation:"+compositionRelation);
if(compositionRelation == null){
continue;
}
......
......@@ -400,7 +400,7 @@ public class MetadataRelationServiceImpl implements IMetadataRelationService {
// Driver neo4jConnection = neo4jConfig.getNeo4jConnection();
// Session session = neo4jConnection.session();
String cypher = "match (n)-[r:`流向`]-(m) where r.jobId ="+jobId +" delete r";
String cypher = "match (n)-[r:`流向`]-(m) where r.jobId =\""+jobId +"\" delete r";
logger.info("删除数据的语句:"+cypher);
StatementResult statementResult = session.run(cypher);
while (statementResult.hasNext()){
......
......@@ -50,7 +50,8 @@ public class MetadataServiceImpl implements IMetadataService {
@Autowired
private IBaseRelationshipService relationshipService;
@Autowired
IMetadataRelationService metadataRelationService;
@Autowired
private DependencyRelationResposity dependencyRelationResposity;
......@@ -578,7 +579,7 @@ public class MetadataServiceImpl implements IMetadataService {
Record record = result.next();
List<Value> values = record.values();
for (Value value : values) {
System.out.println(TypeString+value.type().name());
// System.out.println(TypeString+value.type().name());
if(value.type().name().equals("NODE")){
Node node = value.asNode();
......@@ -605,7 +606,7 @@ public class MetadataServiceImpl implements IMetadataService {
String cypher = "match p=(n:softplatform{name:\""+metadataName+"\"}) <-[r]-(m:softplatform) return p";
int count = returnReslult.getCount();
System.out.println(count+" ;cypher:"+cypher);
// System.out.println(count+" ;cypher:"+cypher);
if(count > layer){
return;
}
......@@ -707,7 +708,7 @@ public class MetadataServiceImpl implements IMetadataService {
String cypher = "match p=(n:softplatform{name:\""+metadataName+"\"}) -[r]->(m:softplatform) return p";
int count = returnReslult.getCount();
System.out.println(count+" ;cypher:"+cypher);
// System.out.println(count+" ;cypher:"+cypher);
if(count > layer){
return;
}
......@@ -868,7 +869,7 @@ public class MetadataServiceImpl implements IMetadataService {
Record record = result.next();
List<Value> values = record.values();
for (Value value : values) {
System.out.println(TypeString+value.type().name());
// System.out.println(TypeString+value.type().name());
if(value.type().name().equals("NODE")){
Node node = value.asNode();
......@@ -1259,12 +1260,12 @@ public class MetadataServiceImpl implements IMetadataService {
Map<String, Neo4jSchema> neo4jSchemaMap, Map<String, Neo4jTable> neo4jTableIdMap, String schemaId) {
long time1 = System.currentTimeMillis();
Map<String, String> tableIdMap = schemaService.getTableIdFromSchemaId(schemaId);
Map<String, String> tableIdMapFromNeo4j = schemaService.getTableIdFromSchemaId(schemaId);
long time2 = System.currentTimeMillis();
logger.info("从neo4j 获取 schema:"+ schemaId +" 的表个数:"+tableIdMap.size() +" , 耗时:"+(time2-time1));
logger.info("从neo4j 获取 schema:"+ schemaId +" 的表个数:"+tableIdMapFromNeo4j.size() +" , 耗时:"+(time2-time1));
List<Neo4jTable> neo4jTableList = new ArrayList<>();
int tableCount = 1;
int tableCount = 0;
long columnTime0 = System.currentTimeMillis();
//用游标的方式,不要用分页获取
int batchSize = 5000;
......@@ -1279,7 +1280,8 @@ public class MetadataServiceImpl implements IMetadataService {
Document tableDocument = tableCursor.next();
String tableId =(String) tableDocument.get("_id");
//存在了,就不写入
if(tableIdMap.containsKey(tableId)){
if(tableIdMapFromNeo4j.containsKey(tableId)){
tableIdMapFromNeo4j.remove(tableId);
continue;
}
......@@ -1316,17 +1318,18 @@ public class MetadataServiceImpl implements IMetadataService {
compositionRelationList.add(schem2Table);
neo4jTableIdMap.put(neo4jTable.getMetadataId(),neo4jTable);
tableCount++;
if(tableCount % batchSize ==0){
neo4jTableRepository.saveAll(neo4jTableList);
logger.info("同步了:"+tableCount+"个表。"+tableId);
neo4jTableList.clear();
}
tableCount++;
}
if(tableCursor != null){
tableCursor.close();
}
logger.info("需要删除的表个数:"+ tableIdMapFromNeo4j.size());
if(neo4jTableList.size()!=0){
neo4jTableRepository.saveAll(neo4jTableList);
logger.info("继续同步同步了:"+neo4jTableList.size()+"个表。");
......@@ -1351,6 +1354,7 @@ public class MetadataServiceImpl implements IMetadataService {
Map<String, ReturnNode> schemaMap = schemaService.getSchemaBySystemId(neo4jSystem.getMetadataId());
for(MongoData schemaData:schemaDataList){
logger.info("开始同步schema:"+schemaData.getName());
//系统与schema的关系
String schemaData_id = schemaData.get_id();
Neo4jSchema neo4jSchema = new Neo4jSchema();
......@@ -1445,6 +1449,7 @@ public class MetadataServiceImpl implements IMetadataService {
Document columnDocument = columnCursor.next();
String columnId =(String) columnDocument.get("_id");
if(columnFromSchemaId.containsKey(columnId)){
columnFromSchemaId.remove(columnId);
continue;
}
String metadataName =(String) columnDocument.get("name");
......@@ -1473,6 +1478,7 @@ public class MetadataServiceImpl implements IMetadataService {
neo4jColumnList.clear();
}
}
logger.info("需要删除的表个数:"+ columnFromSchemaId.size());
if(neo4jColumnList.size()!=0){
neo4jColumnRepository.saveAll(neo4jColumnList);
neo4jColumnList.clear();
......@@ -1764,7 +1770,7 @@ public class MetadataServiceImpl implements IMetadataService {
Neo4jSystem neo4jSystem = getNeo4jSystem(dataBaseData,systemDataMap);
logger.info("获取的系统是:"+neo4jSystem);
// logger.info("获取的系统是:"+neo4jSystem);
Neo4jETLJob neo4jETLJob = new Neo4jETLJob();
neo4jETLJob.setMetadataId(etlJobData.get_id());
neo4jETLJob.setName(etlJobData.getName());
......@@ -2022,6 +2028,7 @@ public class MetadataServiceImpl implements IMetadataService {
logger.info("从mongo获取的关系的总数量是:"+ totalElement +" ,耗时:" +(time1-time0) );
int totalPageCount = ((int)totalElement/pageSize)+1;
Map<String,String> jobidMap = new HashMap<>();
Map<String,DependencyRelation> dependencyRelationMap = new HashMap<>();
Map<String,BaseNode> baseNodeMap = new HashMap<>();
......@@ -2038,6 +2045,8 @@ public class MetadataServiceImpl implements IMetadataService {
Map<String,String> attributeMap = new HashMap<>();
String jobId = relation.getString(JobIdString);
attributeMap.put(JobIdString,jobId);
jobidMap.put(jobId,"");
String description = relation.getString(DescriptionString);
if(description==null){
description = "0";
......@@ -2078,6 +2087,13 @@ public class MetadataServiceImpl implements IMetadataService {
getRelationFromSourceAndTarget(dependencyRelationMap, attributeMap, sourceId, targetId,sourceIdPath,targetIdPath, baseNodeMap);
}
}
//先删除neo4j中的关系
for(Object obj :jobidMap.keySet()){
String jobId = (String)obj;
metadataRelationService.deleteRelationByJobId(catalogName,jobId);
}
long start = System.currentTimeMillis();
saveRelationFromEdgeIdMap(dependencyRelationMap);
long end = System.currentTimeMillis();
......@@ -2138,7 +2154,7 @@ public class MetadataServiceImpl implements IMetadataService {
long time1 = System.currentTimeMillis();
logger.info("从mongo获取的关系的总数量是:"+ totalElement +" ,耗时:" +(time1-time0) );
int totalPageCount = ((int)totalElement/pageSize)+1;
Map<String,String> jobidMap = new HashMap<>();
logger.info("准备分页从mongo获取关系,每页:"+ pageSize);
for(; page< totalPageCount;page++){
logger.info("开始从 mongo 第"+(page+1) +" 次获取关系。");
......@@ -2153,6 +2169,7 @@ public class MetadataServiceImpl implements IMetadataService {
if(jobId == null ){
jobId = "0";
}
jobidMap.put(jobId,"");
attributeMap.put(JobIdString,jobId);
String description = relation.getString(DescriptionString);
if(description==null || description.equals("")){
......@@ -2199,6 +2216,13 @@ public class MetadataServiceImpl implements IMetadataService {
}
}
long start = System.currentTimeMillis();
//先删除neo4j中的关系
for(Object obj :jobidMap.keySet()){
String jobId = (String)obj;
metadataRelationService.deleteRelationByJobId(catalogName,jobId);
}
saveRelationFromEdgeIdMap(dependencyRelationMap);
long end = System.currentTimeMillis();
logger.info("一共在neo4j,创建了多少关系:"+ dependencyRelationMap.size() + " ,总用时:"+(end-start));
......@@ -2301,9 +2325,9 @@ public class MetadataServiceImpl implements IMetadataService {
baseNode = neo4jETLSqlRepository.findNeo4jETLSqlByMetadataId(metadataId);
}
long time2 = System.currentTimeMillis();
logger.info("查询:"+metadataId +" ,用时:"+ (time2-time1));
// logger.info("查询:"+metadataId +" ,用时:"+ (time2-time1));
}catch (Exception e){
logger.info("采用cypher查询。");
logger.info("采用cypher查询:"+metadataId);
String cypher = "match(n{metadataId:\""+metadataId+"\"}) return n limit 1 ";
StatementResult statementResult = session.run(cypher);
while (statementResult.hasNext()){
......@@ -2845,7 +2869,7 @@ class neo4jRelaionTask implements Callable<List<ReturnEdge>>{
" return s.metadataId as metadataId ";
}
System.out.println("impactCypher:"+impactCypher);
// System.out.println("impactCypher:"+impactCypher);
StatementResult impactResult1 = session.run(impactCypher);
while (impactResult1.hasNext()) {
......@@ -2855,7 +2879,7 @@ class neo4jRelaionTask implements Callable<List<ReturnEdge>>{
for (Value value : values) {
String id = value.asString("metadataId");
System.out.println(metadataId+"-->>"+id);
// System.out.println(metadataId+"-->>"+id);
ReturnEdge edge = new ReturnEdge();
// edge.setEdgeId();
......
......@@ -382,7 +382,7 @@ public class MultiModelServiceImpl implements MultiModelService {
for(Object obj : objectMap.keySet()){
if(obj.equals("name")){
System.out.println("obj :"+ obj + " value :"+ objectMap.get("name"));
// System.out.println("obj :"+ obj + " value :"+ objectMap.get("name"));
}
}
}
......
......@@ -313,7 +313,7 @@ public class SchemaServiceImpl implements ISchemaService {
int pageSize = 100;
int page = jdcount/pageSize;
for(int i=0;i<page+1;i++){
detailCypher = "match (n:Neo4jSchema{metadataId:\""+schemaId+"\"})-->(m:Neo4jTable) return m.metadataId as id skip "+(page*pageSize ) +" limit "+ pageSize;
detailCypher = "match (n:Neo4jSchema{metadataId:\""+schemaId+"\"})-->(m:Neo4jTable) return m.metadataId as id skip "+(i*pageSize ) +" limit "+ pageSize;
StatementResult detailResult = session.run(detailCypher);
while (detailResult.hasNext()){
Record record = detailResult.next();
......
......@@ -596,12 +596,12 @@ public class TableServiceImpl implements ITableService {
ReturnEdge returnEdge = returnEdgeMap.get(obj);
returnReslult.getEdges().add(returnEdge);
}
System.out.println("查询run次数:"+runCypherCount +" , 时间:"+runCypherTime);
// System.out.println("查询run次数:"+runCypherCount +" , 时间:"+runCypherTime);
System.out.println("查询respossitory次数:"+respossitoryCount +" , 时间:"+respossitoryTime);
// System.out.println("查询respossitory次数:"+respossitoryCount +" , 时间:"+respossitoryTime);
long timeEnd = System.currentTimeMillis();
System.out.println("总耗时:"+(timeEnd-timeStart));
// System.out.println("总耗时:"+(timeEnd-timeStart));
return returnReslult;
}
......
......@@ -29,7 +29,7 @@ public class Neo4jTool {
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("transMap2Bean Error " + e);
// System.out.println("transMap2Bean Error " + e);
}
return;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment