Commit 1ab4c956 by qiuchaofei

1.添加关系的属性,2 查询column,sql等模型

parent 185611fd
...@@ -6,7 +6,9 @@ import org.jasypt.encryption.StringEncryptor; ...@@ -6,7 +6,9 @@ import org.jasypt.encryption.StringEncryptor;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@EnableFeignClients @EnableFeignClients
@SpringBootApplication @SpringBootApplication
public class MetadataRelationApplication { public class MetadataRelationApplication {
......
...@@ -47,13 +47,6 @@ public class RelationalGraphController { ...@@ -47,13 +47,6 @@ public class RelationalGraphController {
@Autowired @Autowired
private TagRemoteService tagRemoteService; private TagRemoteService tagRemoteService;
//测试标签的接口的接口
@ApiOperation(tags = "", value = "测试标准的接口")
@RequestMapping(path = "/testTagService", method = RequestMethod.GET)
public void testTagService(String metadataId){
}
//测试标准的接口 //测试标准的接口
@ApiOperation(tags = "", value = "测试标准的接口") @ApiOperation(tags = "", value = "测试标准的接口")
@RequestMapping(path = "/dataAssertRemoteService", method = RequestMethod.GET) @RequestMapping(path = "/dataAssertRemoteService", method = RequestMethod.GET)
......
package com.keymobile.metadata.metadataRelation.pojo.metadata;
import com.keymobile.metadata.metadataRelation.pojo.BaseNode;
import org.neo4j.ogm.annotation.NodeEntity;
@NodeEntity(label="Neo4jColumn")
public class Neo4jColumn extends BaseNode{
}
package com.keymobile.metadata.metadataRelation.pojo.metadata;
import com.keymobile.metadata.metadataRelation.pojo.BaseNode;
import org.neo4j.ogm.annotation.NodeEntity;
@NodeEntity(label="Neo4jETLSql")
public class Neo4jETLSql extends BaseNode {
}
...@@ -15,6 +15,19 @@ public class ReturnNode { ...@@ -15,6 +15,19 @@ public class ReturnNode {
private String cnName; private String cnName;
private String dataPath; private String dataPath;
private Map<String ,String> attributeMaps = new HashMap<>();
private Map<String ,String> relationMaps = new HashMap<>();
public Map<String, String> getRelationMaps() {
return relationMaps;
}
public void setRelationMaps(Map<String, String> relationMaps) {
this.relationMaps = relationMaps;
}
public String getDataPath() { public String getDataPath() {
return dataPath; return dataPath;
...@@ -33,7 +46,6 @@ public class ReturnNode { ...@@ -33,7 +46,6 @@ public class ReturnNode {
this.attributeMaps = attributeMaps; this.attributeMaps = attributeMaps;
} }
Map<String ,String> attributeMaps = new HashMap<>();
public String getCnName() { public String getCnName() {
return cnName; return cnName;
......
package com.keymobile.metadata.metadataRelation.respository.metadata;
import com.keymobile.metadata.metadataRelation.pojo.metadata.Neo4jColumn;
import org.springframework.data.neo4j.repository.Neo4jRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface Neo4jColumnRepository extends Neo4jRepository<Neo4jColumn, Long> {
Neo4jColumn findNeo4jColumnByMetadataId(String metadataId);
}
package com.keymobile.metadata.metadataRelation.respository.metadata;
import com.keymobile.metadata.metadataRelation.pojo.metadata.Neo4jETLSql;
import org.springframework.data.neo4j.repository.Neo4jRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface Neo4jETLSqlRepository extends Neo4jRepository<Neo4jETLSql, Long> {
Neo4jETLSql findNeo4jETLSqlByMetadataId(String metadataId);
}
...@@ -19,6 +19,11 @@ public interface Neo4jTableRepository extends Neo4jRepository<Neo4jTable,Long> { ...@@ -19,6 +19,11 @@ public interface Neo4jTableRepository extends Neo4jRepository<Neo4jTable,Long> {
@Query("match (n{metadataId:{tableId}} )-[r1:流向]->(m1)-[r:流向]->(m) return m ") @Query("match (n{metadataId:{tableId}} )-[r1:流向]->(m1)-[r:流向]->(m) return m ")
List<BaseNode> getTargetTable(@Param("tableId") String tableId); List<BaseNode> getTargetTable(@Param("tableId") String tableId);
@Query("match (n{metadataId:{startTable}} )<-[r1:流向]-(m1)<-[r:流向]-(m{metadataId:{entTable}}) return m1 ")
List<BaseNode> getSourceEtlScriptBetween2Table(@Param("startTable") String startTable,@Param("entTable") String entTable);
@Query("match (n{metadataId:{startTable}} )-[r1:流向]->(m1)-[r:流向]->(m{metadataId:{entTable}}) return m1 ")
List<BaseNode> getTargetEtlScriptBetween2Table(@Param("startTable") String startTable,@Param("entTable") String entTable);
@Query("match (n{metadataId:{tableId}} )<-[r1:流向]-(m1) <-[r:Composition]-(m) return m ") @Query("match (n{metadataId:{tableId}} )<-[r1:流向]-(m1) <-[r:Composition]-(m) return m ")
......
package com.keymobile.metadata.metadataRelation.service; package com.keymobile.metadata.metadataRelation.service;
import com.keymobile.metadata.metadataRelation.controller.MetaDataController;
import com.keymobile.metadata.metadataRelation.pojo.mongo.MongoData; import com.keymobile.metadata.metadataRelation.pojo.mongo.MongoData;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.RowMapper;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.lang.reflect.Method;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor;
@Service @Service
@EnableAsync @EnableAsync
...@@ -32,6 +26,28 @@ public class AsyncDataFromMongoToNeo4j { ...@@ -32,6 +26,28 @@ public class AsyncDataFromMongoToNeo4j {
@Autowired @Autowired
private JdbcTemplate jdbcTemplate; private JdbcTemplate jdbcTemplate;
//定时同步数据,
//查找所有的环境,
@Scheduled(cron="0 0 1 * * ?")//每天凌晨1点运行
public void executeFileSyncTask() {
String sql = "select domain_id, domain_name from auth_domain ";
List<MongoData> scopeList = jdbcTemplate.query(sql, new RowMapper<MongoData>() {
@Override
public MongoData mapRow(ResultSet resultSet, int i) throws SQLException {
MongoData user = new MongoData();
user.set_id(resultSet.getString("domain_id"));
user.setName(resultSet.getString("domain_name"));
user.setCnName(resultSet.getString("domain_name"));
user.set_class("Scope");
return user;
}
});
for(MongoData scope:scopeList){
// asyncDataFromMongoToNeo4j(scope.get_id());
}
}
@Async @Async
public void asyncDataFromMongoToNeo4j(String catalogName) { public void asyncDataFromMongoToNeo4j(String catalogName) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
......
...@@ -169,7 +169,7 @@ public class BaseRelationshipServiceImpl implements IBaseRelationshipService { ...@@ -169,7 +169,7 @@ public class BaseRelationshipServiceImpl implements IBaseRelationshipService {
} }
@Override @Override
public void saveRelation(String startId, String endId, String composition) { public void saveRelation(String startId, String endId, String relationType) {
Neo4jConfig neo4jConfig = new Neo4jConfig(); Neo4jConfig neo4jConfig = new Neo4jConfig();
Driver neo4jConnection = neo4jConfig.getNeo4jConnection(); Driver neo4jConnection = neo4jConfig.getNeo4jConnection();
Session session = neo4jConnection.session(); Session session = neo4jConnection.session();
...@@ -177,8 +177,8 @@ public class BaseRelationshipServiceImpl implements IBaseRelationshipService { ...@@ -177,8 +177,8 @@ public class BaseRelationshipServiceImpl implements IBaseRelationshipService {
// (m{metadataId:"Table=1=d644b631fa8c434e928bcd1f1665b060"}) // (m{metadataId:"Table=1=d644b631fa8c434e928bcd1f1665b060"})
// create (n)<-[r:Composition]-(m) // create (n)<-[r:Composition]-(m)
// return n,m,r // return n,m,r
String cypher = " match (n{metadataId:\""+startId+"\"}),(m{metadataId:\""+endId+"\"}) merge (n)-[r:"+composition+"]->(m) return n,m,r "; String cypher = " match (n{metadataId:\""+startId+"\"}),(m{metadataId:\""+endId+"\"}) merge (n)-[r:"+relationType+"]->(m) return n,m,r ";
session.run(cypher); session.run(cypher);
logger.info("运行保存关系完成:" + cypher); // logger.info("运行保存关系完成:" + cypher);
} }
} }
...@@ -83,6 +83,9 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -83,6 +83,9 @@ public class MetadataServiceImpl implements IMetadataService {
private Neo4jViewRepository neo4jViewRepository; private Neo4jViewRepository neo4jViewRepository;
@Autowired @Autowired
private Neo4jColumnRepository neo4jColumnRepository;
@Autowired
private Neo4jFunctionRepository neo4jFunctionRepository; private Neo4jFunctionRepository neo4jFunctionRepository;
@Autowired @Autowired
...@@ -95,6 +98,9 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -95,6 +98,9 @@ public class MetadataServiceImpl implements IMetadataService {
private Neo4jETLScriptRepository neo4jETLScriptRepository; private Neo4jETLScriptRepository neo4jETLScriptRepository;
@Autowired @Autowired
private Neo4jETLSqlRepository neo4jETLSqlRepository ;
@Autowired
private ISchemaService schemaService; private ISchemaService schemaService;
@Override @Override
...@@ -883,8 +889,6 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -883,8 +889,6 @@ public class MetadataServiceImpl implements IMetadataService {
} }
}else if(metadataId .startsWith("Column=")){ }else if(metadataId .startsWith("Column=")){
// match (n:MetaData ) where n.metadataId =~"Table=.*" return n // match (n:MetaData ) where n.metadataId =~"Table=.*" return n
String cypher = "match (n:MetaData{metadataId:\""+metadataId+"\"}) with n match p=(n)<-[r:Composition*0..10]-(m) return m,length(p) as dist "; String cypher = "match (n:MetaData{metadataId:\""+metadataId+"\"}) with n match p=(n)<-[r:Composition*0..10]-(m) return m,length(p) as dist ";
logger.info("cypher:" + cypher); logger.info("cypher:" + cypher);
...@@ -1262,12 +1266,10 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1262,12 +1266,10 @@ public class MetadataServiceImpl implements IMetadataService {
int schemaCount = 1; int schemaCount = 1;
int tableCount = 1; int tableCount = 1;
int columnCount = 1;
int viewCount = 1; int viewCount = 1;
int functionCount = 1; int functionCount = 1;
int procedureCount = 1; int procedureCount = 1;
String collectionName = "Metadata_"+catalogName;
List<String> layer = new ArrayList<>(); List<String> layer = new ArrayList<>();
layer.add("贴源数据层"); layer.add("贴源数据层");
...@@ -1280,7 +1282,7 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1280,7 +1282,7 @@ public class MetadataServiceImpl implements IMetadataService {
Random r = new Random(1); Random r = new Random(1);
List<MongoData> catalogDataList = mongoDbServiceImpl.findDatabaseByparentId("root",collectionName); List<MongoData> catalogDataList = mongoDbServiceImpl.findDatabaseByparentId("root",catalogName);
Map<String ,Neo4jSystem> systemDataMap = new HashMap<>(); Map<String ,Neo4jSystem> systemDataMap = new HashMap<>();
for(MongoData systemData:systemDataList){ for(MongoData systemData:systemDataList){
Neo4jSystem neo4jSystem = new Neo4jSystem(); Neo4jSystem neo4jSystem = new Neo4jSystem();
...@@ -1291,7 +1293,7 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1291,7 +1293,7 @@ public class MetadataServiceImpl implements IMetadataService {
String systemPath = catalogName+";"+systemData.getName(); String systemPath = catalogName+";"+systemData.getName();
neo4jSystem.setDataPath(systemPath); neo4jSystem.setDataPath(systemPath);
//用名称查找,如果存在,就不用创建 //用名称查找,如果存在,就不用创建
List<Neo4jSystem> neo4jSchemaList = neo4jSystemRepository.findNeo4jSystemByName(systemData.getName()); List<Neo4jSystem> neo4jSchemaList = neo4jSystemRepository.findNeo4jSystemByMetadataId(neo4jSystem.getMetadataId());
if(neo4jSchemaList==null || neo4jSchemaList.size()==0){ if(neo4jSchemaList==null || neo4jSchemaList.size()==0){
neo4jSystemRepository.save(neo4jSystem); neo4jSystemRepository.save(neo4jSystem);
} }
...@@ -1301,14 +1303,16 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1301,14 +1303,16 @@ public class MetadataServiceImpl implements IMetadataService {
int etlJobCount = 1; int etlJobCount = 1;
int etlScriptCount = 1; int etlScriptCount = 1;
int etlSqlCount = 1;
for(MongoData catalogData:catalogDataList){ for(MongoData catalogData:catalogDataList){
List<MongoData> dataBaseDataList= mongoDbServiceImpl.findDataByparentId(catalogData.get_id(),collectionName); List<MongoData> dataBaseDataList= mongoDbServiceImpl.findDataByparentId(catalogData.get_id(),catalogName);
for(MongoData dataBaseData:dataBaseDataList){ for(MongoData dataBaseData:dataBaseDataList){
String mongoId = dataBaseData.get_id(); String mongoId = dataBaseData.get_id();
if(mongoId.startsWith("Server=")){ if(mongoId.startsWith("Server=")){
//如果是server,就把etljob找出来 //如果是server,就把etljob找出来
List<MongoData> etlJobDataList= mongoDbServiceImpl.findDataByparentId(dataBaseData.get_id(),collectionName); List<MongoData> etlJobDataList= mongoDbServiceImpl.findDataByparentId(dataBaseData.get_id(),catalogName);
List<Neo4jETLJob> neo4jETLJobList = new ArrayList<>(); List<Neo4jETLJob> neo4jETLJobList = new ArrayList<>();
for(MongoData etlJobData:etlJobDataList){ for(MongoData etlJobData:etlJobDataList){
if(etlJobCount%200 == 0){ if(etlJobCount%200 == 0){
...@@ -1328,8 +1332,8 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1328,8 +1332,8 @@ public class MetadataServiceImpl implements IMetadataService {
} }
List<Neo4jETLScript> neo4jETLScriptList = new ArrayList<>(); List<Neo4jETLScript> neo4jETLScriptList = new ArrayList<>();
List<Neo4jETLSql> neo4jETLSqlArrayList = new ArrayList<>();
List<MongoData> etlScriptMongoList = mongoDbServiceImpl.findDataByparentId(etlJobData.get_id(),collectionName); List<MongoData> etlScriptMongoList = mongoDbServiceImpl.findDataByparentId(etlJobData.get_id(),catalogName);
for(MongoData etlScriptMongo:etlScriptMongoList){ for(MongoData etlScriptMongo:etlScriptMongoList){
if(etlScriptCount%200==0){ if(etlScriptCount%200==0){
neo4jETLScriptRepository.saveAll(neo4jETLScriptList); neo4jETLScriptRepository.saveAll(neo4jETLScriptList);
...@@ -1350,18 +1354,54 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1350,18 +1354,54 @@ public class MetadataServiceImpl implements IMetadataService {
compositionRelation.setEnd(neo4jETLScript); compositionRelation.setEnd(neo4jETLScript);
compositionRelation.setName("etlJob--Script"); compositionRelation.setName("etlJob--Script");
compositionRelationList.add(compositionRelation); compositionRelationList.add(compositionRelation);
} }
List<MongoData> eltSqlMongoList = mongoDbServiceImpl.findDataByparentId(etlScriptMongo.get_id(),catalogName);
for (MongoData eltSqlMongo:eltSqlMongoList){
String etlSqlId = eltSqlMongo.get_id();
Neo4jETLSql neo4jETLSql =null;// neo4jETLSqlRepository.findNeo4jETLSqlByMetadataId(etlSqlId);
if(neo4jETLSql == null){
neo4jETLSql = new Neo4jETLSql();
neo4jETLSql.setMetadataId(eltSqlMongo.get_id());
neo4jETLSql.setName(neo4jETLJob.getName());
neo4jETLSql.setCnName(neo4jETLJob.getCnName());
neo4jETLSqlArrayList.add(neo4jETLSql);
CompositionRelation compositionRelation = new CompositionRelation();
compositionRelation.setStart(neo4jETLScript);
compositionRelation.setEnd(neo4jETLSql);
compositionRelation.setName("Script--sql");
compositionRelationList.add(compositionRelation);
etlSqlCount++;
}
if(etlSqlCount%200==0){
neo4jETLSqlRepository.saveAll(neo4jETLSqlArrayList);
neo4jETLSqlArrayList.clear();
}
}
} }
if(neo4jETLScriptList !=null && neo4jETLScriptList.size()!=0 ){ if(neo4jETLScriptList !=null && neo4jETLScriptList.size()!=0 ){
neo4jETLScriptRepository.saveAll(neo4jETLScriptList); neo4jETLScriptRepository.saveAll(neo4jETLScriptList);
} }
if(neo4jETLSqlArrayList!=null && neo4jETLSqlArrayList.size()!=0){
neo4jETLSqlRepository.saveAll(neo4jETLSqlArrayList);
}
} }
if(neo4jETLJobList !=null && neo4jETLJobList.size()!=0 ){ if(neo4jETLJobList !=null && neo4jETLJobList.size()!=0 ){
neo4jETLJobRepository.saveAll(neo4jETLJobList); neo4jETLJobRepository.saveAll(neo4jETLJobList);
} }
}else if(mongoId.startsWith("Database=")){ }else if(mongoId.startsWith("Database=")){
List<MongoData> schemaDataList= mongoDbServiceImpl.findDataByparentId(dataBaseData.get_id(),collectionName); List<MongoData> schemaDataList= mongoDbServiceImpl.findDataByparentId(dataBaseData.get_id(),catalogName);
List<String> schemaIds = new ArrayList<>(); List<String> schemaIds = new ArrayList<>();
for(MongoData schemaData:schemaDataList){ for(MongoData schemaData:schemaDataList){
...@@ -1400,7 +1440,7 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1400,7 +1440,7 @@ public class MetadataServiceImpl implements IMetadataService {
neo4jSchema.setLabel(label); neo4jSchema.setLabel(label);
} }
//如果存在,就不用创建 //如果存在,就不用创建
List<Neo4jSchema> neo4jSchemaList = neo4jSchemaRepository.findNeo4jSchemaByMetadataId(neo4jSchema.getMetadataId()); List<Neo4jSchema> neo4jSchemaList = null;//neo4jSchemaRepository.findNeo4jSchemaByMetadataId(neo4jSchema.getMetadataId());
if(neo4jSchemaList==null || neo4jSchemaList.size()==0){ if(neo4jSchemaList==null || neo4jSchemaList.size()==0){
neo4jSchemaRepository.save(neo4jSchema); neo4jSchemaRepository.save(neo4jSchema);
schemaCount++; schemaCount++;
...@@ -1415,6 +1455,7 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1415,6 +1455,7 @@ public class MetadataServiceImpl implements IMetadataService {
List<Neo4jTable> neo4jTableList = new ArrayList<>(); List<Neo4jTable> neo4jTableList = new ArrayList<>();
List<Neo4jColumn> neo4jColumnList = new ArrayList<>();
List<Neo4jView> viewTableList = new ArrayList<>(); List<Neo4jView> viewTableList = new ArrayList<>();
List<Neo4jFunction> functionTableList = new ArrayList<>(); List<Neo4jFunction> functionTableList = new ArrayList<>();
List<Neo4jProcedure> procedureTableList = new ArrayList<>(); List<Neo4jProcedure> procedureTableList = new ArrayList<>();
...@@ -1438,7 +1479,7 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1438,7 +1479,7 @@ public class MetadataServiceImpl implements IMetadataService {
neo4jTableList.clear(); neo4jTableList.clear();
} }
List<Neo4jTable> neo4jTableListExist = neo4jTableRepository.findNeo4jTableByMetadataId(metadataId); List<Neo4jTable> neo4jTableListExist = null;//neo4jTableRepository.findNeo4jTableByMetadataId(metadataId);
if(neo4jTableListExist == null || neo4jTableListExist.size() == 0){ if(neo4jTableListExist == null || neo4jTableListExist.size() == 0){
Neo4jTable neo4jTable = new Neo4jTable(); Neo4jTable neo4jTable = new Neo4jTable();
neo4jTable.setMetadataId(metadataId); neo4jTable.setMetadataId(metadataId);
...@@ -1458,29 +1499,53 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1458,29 +1499,53 @@ public class MetadataServiceImpl implements IMetadataService {
CompositionRelation schem2Table = new CompositionRelation(); CompositionRelation schem2Table = new CompositionRelation();
schem2Table.setStart(neo4jSchema); schem2Table.setStart(neo4jSchema);
schem2Table.setEnd(neo4jTable); schem2Table.setEnd(neo4jTable);
schem2Table.setName("System--Schema"); schem2Table.setName("Schema--Table");
compositionRelationList.add(schem2Table); compositionRelationList.add(schem2Table);
tableCount++; tableCount++;
List<MongoData> columnMongoDataList = mongoDbServiceImpl.findDataByparentId(metadataId, catalogName);
for(MongoData columnMongoData:columnMongoDataList){
String columnId = columnMongoData.get_id();
Neo4jColumn neo4jColumnList1 = null;//neo4jColumnRepository.findNeo4jColumnByMetadataId(columnId);
if(neo4jColumnList1==null){
Neo4jColumn neo4jColumn = new Neo4jColumn();
neo4jColumn.setMetadataId(columnId);
neo4jColumn.setName(columnMongoData.getName());
neo4jColumn.setCnName(columnMongoData.getCnName());
neo4jColumnList.add(neo4jColumn);
CompositionRelation table2Column = new CompositionRelation();
table2Column.setStart(neo4jTable);
table2Column.setEnd(neo4jColumn);
table2Column.setName("Table--Column");
compositionRelationList.add(table2Column);
columnCount++;
}
if(columnCount %100==0){
logger.info("同步了:"+columnCount+"个字段。");
neo4jColumnRepository.saveAll(neo4jColumnList);
neo4jColumnList.clear();
}
}
} }
}else if(metadataId.startsWith("View=")){ }else if(metadataId.startsWith("View=")){
if(viewCount %100==0){ if(viewCount %100==0){
// break;
logger.info("同步了:"+viewCount+"个表。"); logger.info("同步了:"+viewCount+"个表。");
neo4jViewRepository.saveAll(viewTableList); neo4jViewRepository.saveAll(viewTableList);
viewTableList.clear(); viewTableList.clear();
} }
Neo4jView neo4jView = new Neo4jView(); List<Neo4jView> neo4jViewList = null;//neo4jViewRepository.findNeo4jViewByMetadataId(metadataId);
neo4jView.setMetadataId(metadataId);
neo4jView.setName(metadataName);
neo4jView.setCnName(metadataCnName);
neo4jView.setDataPath(tablePath);
List<Neo4jView> neo4jViewList = neo4jViewRepository.findNeo4jViewByMetadataId(metadataId);
if(neo4jViewList==null || neo4jViewList.size()==0){ if(neo4jViewList==null || neo4jViewList.size()==0){
neo4jViewList.add(neo4jView); Neo4jView neo4jView = new Neo4jView();
neo4jView.setMetadataId(metadataId);
neo4jView.setName(metadataName);
neo4jView.setCnName(metadataCnName);
neo4jView.setDataPath(tablePath);
viewTableList.add(neo4jView);
CompositionRelation schem2View = new CompositionRelation(); CompositionRelation schem2View = new CompositionRelation();
schem2View.setStart(neo4jSchema); schem2View.setStart(neo4jSchema);
...@@ -1489,11 +1554,8 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1489,11 +1554,8 @@ public class MetadataServiceImpl implements IMetadataService {
compositionRelationList.add(schem2View); compositionRelationList.add(schem2View);
} }
}else if(metadataId.startsWith("Function=")){ }else if(metadataId.startsWith("Function=")){
if(functionCount %100==0){ if(functionCount %100==0){
// break;
logger.info("同步了:"+functionCount+"个函数。"); logger.info("同步了:"+functionCount+"个函数。");
neo4jFunctionRepository.saveAll(functionTableList); neo4jFunctionRepository.saveAll(functionTableList);
functionTableList.clear(); functionTableList.clear();
...@@ -1505,12 +1567,9 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1505,12 +1567,9 @@ public class MetadataServiceImpl implements IMetadataService {
neo4jFunction.setCnName(metadataCnName); neo4jFunction.setCnName(metadataCnName);
neo4jFunction.setDataPath(tablePath); neo4jFunction.setDataPath(tablePath);
List<Neo4jFunction> neo4jFunctionList = neo4jFunctionRepository.findNeo4jFunctionByMetadataId(metadataId); List<Neo4jFunction> neo4jFunctionList = null;//neo4jFunctionRepository.findNeo4jFunctionByMetadataId(metadataId);
if(neo4jFunctionList==null || neo4jFunctionList.size()==0){ if(neo4jFunctionList==null || neo4jFunctionList.size()==0){
functionTableList.add(neo4jFunction); functionTableList.add(neo4jFunction);
// neo4jFunctionRepository.save(neo4jFunction);
CompositionRelation schem2Function = new CompositionRelation(); CompositionRelation schem2Function = new CompositionRelation();
schem2Function.setStart(neo4jSchema); schem2Function.setStart(neo4jSchema);
schem2Function.setEnd(neo4jFunction); schem2Function.setEnd(neo4jFunction);
...@@ -1521,7 +1580,6 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1521,7 +1580,6 @@ public class MetadataServiceImpl implements IMetadataService {
}else if(metadataId.startsWith("Procedure=")){ }else if(metadataId.startsWith("Procedure=")){
if(procedureCount %100==0){ if(procedureCount %100==0){
// break;
logger.info("同步了:"+procedureCount+"个函数。"); logger.info("同步了:"+procedureCount+"个函数。");
neo4jProcedureRepository.saveAll(procedureTableList); neo4jProcedureRepository.saveAll(procedureTableList);
procedureTableList.clear(); procedureTableList.clear();
...@@ -1531,12 +1589,11 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1531,12 +1589,11 @@ public class MetadataServiceImpl implements IMetadataService {
neo4jProcedure.setName(metadataName); neo4jProcedure.setName(metadataName);
neo4jProcedure.setCnName(metadataCnName); neo4jProcedure.setCnName(metadataCnName);
neo4jProcedure.setDataPath(tablePath); neo4jProcedure.setDataPath(tablePath);
List<Neo4jProcedure> neo4jProcedureList = neo4jProcedureRepository.findNeo4jProcedureByMetadataId(metadataId); List<Neo4jProcedure> neo4jProcedureList = null;//neo4jProcedureRepository.findNeo4jProcedureByMetadataId(metadataId);
if(neo4jProcedureList==null || neo4jProcedureList.size()==0){ if(neo4jProcedureList==null || neo4jProcedureList.size()==0){
neo4jProcedureList.add(neo4jProcedure); procedureTableList.add(neo4jProcedure);
// neo4jProcedureRepository.save(neo4jProcedure);
CompositionRelation schem2Procedure = new CompositionRelation(); CompositionRelation schem2Procedure = new CompositionRelation();
schem2Procedure.setStart(neo4jSchema); schem2Procedure.setStart(neo4jSchema);
schem2Procedure.setEnd(neo4jProcedure); schem2Procedure.setEnd(neo4jProcedure);
...@@ -1544,39 +1601,31 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1544,39 +1601,31 @@ public class MetadataServiceImpl implements IMetadataService {
compositionRelationList.add(schem2Procedure); compositionRelationList.add(schem2Procedure);
procedureCount++; procedureCount++;
} }
}else{ }else{
} }
} }
} }
int count=1;//获取前100个表测试 int count=1;//获取前100个表测试
if(neo4jTableList!=null && neo4jTableList.size()!=0){ if(neo4jTableList!=null && neo4jTableList.size()!=0){
neo4jTableRepository.saveAll(neo4jTableList); neo4jTableRepository.saveAll(neo4jTableList);
} }
if(neo4jColumnList!=null && neo4jColumnList.size()!=0){
neo4jColumnRepository.saveAll(neo4jColumnList);
}
if(viewTableList!=null && viewTableList.size()!=0){ if(viewTableList!=null && viewTableList.size()!=0){
neo4jViewRepository.saveAll(viewTableList); neo4jViewRepository.saveAll(viewTableList);
} }
if(functionTableList!=null && functionTableList.size()!=0){ if(functionTableList!=null && functionTableList.size()!=0){
neo4jFunctionRepository.saveAll(functionTableList); neo4jFunctionRepository.saveAll(functionTableList);
} }
if(procedureTableList != null && procedureTableList.size()!=0){ if(procedureTableList != null && procedureTableList.size()!=0){
neo4jProcedureRepository.saveAll(procedureTableList); neo4jProcedureRepository.saveAll(procedureTableList);
} }
} }
} }
} }
} }
long time1 = System.currentTimeMillis();
int count = 0; int count = 0;
List<CompositionRelation> newCompositionList= new ArrayList<>(); List<CompositionRelation> newCompositionList= new ArrayList<>();
for(CompositionRelation compositionRelation:compositionRelationList){ for(CompositionRelation compositionRelation:compositionRelationList){
...@@ -1593,30 +1642,35 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1593,30 +1642,35 @@ public class MetadataServiceImpl implements IMetadataService {
logger.info("创建的system数量:"+systemDataMap.size()); logger.info("创建的system数量:"+systemDataMap.size());
logger.info("创建的schema:"+schemaCount); logger.info("创建的schema:"+schemaCount);
logger.info("创建的table数量:"+tableCount); logger.info("创建的table数量:"+tableCount);
logger.info("创建的column数量:"+columnCount);
logger.info("创建的作业数量:"+etlJobCount); logger.info("创建的作业数量:"+etlJobCount);
logger.info("创建的脚本数量:"+etlScriptCount); logger.info("创建的脚本数量:"+etlScriptCount);
logger.info("创建的sql数量:"+etlSqlCount);
long time2 = System.currentTimeMillis();
logger.info("用时:"+(time2-time1));
} }
@Override @Override
public void syschroTable2EtlJobRelations(String catalogName) { public void syschroTable2EtlJobRelations(String catalogName) {
Map<String, String> relationMap = new HashMap<>(); Map<String, String> relationMap = new HashMap<>();
Map<String,String> edgeIdMap = new HashMap<>();
int page =0,pageSize = 300; int page =0,pageSize = 300;
long totalElement = mongoDbServiceImpl.countRelation(PageRequest.of(page,pageSize),catalogName); long totalElement = mongoDbServiceImpl.countRelation(PageRequest.of(page,pageSize),catalogName);
for(; page< totalElement;page+= pageSize){ for(; page< totalElement;page+= pageSize){
List<Document> relationList = mongoDbServiceImpl.findRelationByPage(PageRequest.of(page,pageSize,Sort.by("_id")),catalogName); List<Document> relationList = mongoDbServiceImpl.findRelationByPage(PageRequest.of(page,pageSize,Sort.by("_id")),catalogName);
for(Document relation :relationList){ for(Document relation :relationList){
String type = relation.getString("type"); String type = relation.getString("type");
if(!(type.equals("Input") || type.equals("Output"))){ if(!(type.equals("Input") || type.equals("Output"))){
continue; continue;
} }
String sourceId =relation.getString("source"); //relationMongo.getSource(); String sourceId =relation.getString("source"); //relationMongo.getSource();
String targetId =relation.getString("target"); // relationMongo.getTarget(); String targetId =relation.getString("target"); // relationMongo.getTarget();
//本身的字段级关系也要同步
edgeIdMap.put(sourceId+"_"+targetId,"");
Map<String, Object> sourceData = metadataRepoRemoteService.getMetadata(sourceId); Map<String, Object> sourceData = metadataRepoRemoteService.getMetadata(sourceId);
Map<String, Object> targetData = metadataRepoRemoteService.getMetadata(targetId); Map<String, Object> targetData = metadataRepoRemoteService.getMetadata(targetId);
if(sourceData==null || targetData==null){ if(sourceData==null || targetData==null){
...@@ -1629,6 +1683,8 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1629,6 +1683,8 @@ public class MetadataServiceImpl implements IMetadataService {
Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId); Map<String, Object> sourceParent = metadataRepoRemoteService.getParent(sourceId);
Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId); Map<String, Object> targetParent = metadataRepoRemoteService.getParent(targetId);
//要记录字段级别的关系
if (sourceParent == null ) { if (sourceParent == null ) {
logger.info("没有找到id:" + sourceId + "的父节点元数据。"); logger.info("没有找到id:" + sourceId + "的父节点元数据。");
continue; continue;
...@@ -1682,19 +1738,25 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1682,19 +1738,25 @@ public class MetadataServiceImpl implements IMetadataService {
startId = (String )sourceParent.get("_id"); startId = (String )sourceParent.get("_id");
endId = (String )targetParent.get("_id"); endId = (String )targetParent.get("_id");
} }
String relationId = startId+"_"+endId; String relationId = startId+"_"+endId;
if(!relationMap.containsKey(relationId)){ edgeIdMap.put(relationId,"");
if(relationMap.size()%200 == 0){ }
logger.info("创建第"+relationMap.size()+"多少关系:"); }
} long start = System.currentTimeMillis();
relationMap.put(relationId,""); int count=1;
relationshipService.saveRelation(startId,endId,"流向"); for(Object obj : edgeIdMap.keySet()){
} if(count%100 == 0){
logger.info("创建了"+count+"关系:");
} }
String edgeId = (String) obj;
String[] dataIds = edgeId.split("_");
String startId = dataIds[0];
String endId = dataIds[1];
relationshipService.saveRelation(startId,endId,"流向");
} }
long end = System.currentTimeMillis();
logger.info("创建了多少关系:"+edgeIdMap.size() + " ,用时:"+(end-start));
// List<RelationMongo> relationMongoList = mongoDbServiceImpl.findAllRelationByCatalog(catalogName); // List<RelationMongo> relationMongoList = mongoDbServiceImpl.findAllRelationByCatalog(catalogName);
// int size = 0; // int size = 0;
...@@ -1778,7 +1840,6 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -1778,7 +1840,6 @@ public class MetadataServiceImpl implements IMetadataService {
// } // }
// //
// } // }
logger.info("创建了多少关系:"+relationMap.size());
} }
...@@ -2242,14 +2303,22 @@ public class MetadataServiceImpl implements IMetadataService { ...@@ -2242,14 +2303,22 @@ public class MetadataServiceImpl implements IMetadataService {
Session session = neo4jConnection.session(); Session session = neo4jConnection.session();
String cypher = "match (n) where n.metadataId =~'.*="+catalogName+"=.*' detach delete n"; String cypher = "match (n) where n.metadataId =~'.*="+catalogName+"=.*' detach delete n";
session.run(cypher); StatementResult statementResult = session.run(cypher);
while (statementResult.hasNext()){
logger.info("statementResult.next():"+statementResult.next());
}
} }
@Override @Override
public void deleteTempNodeByCatalogName(String catalogName) { public void deleteTempNodeByCatalogName(String catalogName) {
tempNodeRepository.deleteAllRelation(catalogName); try{
tempNodeRepository.deleteTempNodesByCatalogName(catalogName); tempNodeRepository.deleteAllRelation(catalogName);
tempNodeRepository.deleteTempNodesByCatalogName(catalogName);
}catch(Exception e){
e.printStackTrace();
}
} }
......
...@@ -10,7 +10,6 @@ import org.springframework.data.domain.Pageable; ...@@ -10,7 +10,6 @@ import org.springframework.data.domain.Pageable;
import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
...@@ -36,7 +35,7 @@ public class MongoDbServiceImpl { ...@@ -36,7 +35,7 @@ public class MongoDbServiceImpl {
public List<RelationMongo> findAllRelationByCatalog(String catalogName) { public List<RelationMongo> findAllRelationByCatalog(String catalogName) {
logger.info("--------------------->[MongoDB find start]"); logger.info("--------------------->[MongoDB find start]");
try { try {
return mongoTemplate.findAll(RelationMongo.class, "md_relation_" + catalogName); return mongoTemplate.findAll(RelationMongo.class, PREFIX_MD_RELATION + catalogName);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
return null; return null;
...@@ -53,12 +52,12 @@ public class MongoDbServiceImpl { ...@@ -53,12 +52,12 @@ public class MongoDbServiceImpl {
* *
* @return * @return
*/ */
public List<MongoData> findDatabaseByparentId(String catalogName,String collectionName) { public List<MongoData> findDatabaseByparentId(String parentName,String catalogName) {
logger.info("开始从mongo获取元数据:"+ catalogName); logger.info("开始从mongo获取元数据:"+ parentName);
try { try {
Query query = new Query(); Query query = new Query();
query.addCriteria(Criteria.where("parentId").is(catalogName)); query.addCriteria(Criteria.where("parentId").is(parentName));
List<MongoData> mongoDataList = mongoTemplate.find(query, MongoData.class,collectionName); List<MongoData> mongoDataList = mongoTemplate.find(query, MongoData.class,PREFIX_METADATA_NODE+catalogName);
return mongoDataList; return mongoDataList;
} catch (Exception e) { } catch (Exception e) {
...@@ -74,12 +73,12 @@ public class MongoDbServiceImpl { ...@@ -74,12 +73,12 @@ public class MongoDbServiceImpl {
* *
* @return * @return
*/ */
public List<MongoData> findDataByparentId(String parentId,String collectionName) { public List<MongoData> findDataByparentId(String parentId,String catalogName) {
logger.info("开始从mongo获取元数据:"+ parentId); logger.info("开始从mongo获取元数据:"+ parentId);
try { try {
Query query = new Query(); Query query = new Query();
query.addCriteria(Criteria.where("parentId").is(parentId)); query.addCriteria(Criteria.where("parentId").is(parentId));
List<MongoData> mongoDataList = mongoTemplate.find(query, MongoData.class,collectionName); List<MongoData> mongoDataList = mongoTemplate.find(query, MongoData.class,PREFIX_METADATA_NODE+catalogName);
return mongoDataList;// mongoTemplate.findAll(MongoData.class, "md_relation_" + catalogName); return mongoDataList;// mongoTemplate.findAll(MongoData.class, "md_relation_" + catalogName);
} catch (Exception e) { } catch (Exception e) {
......
...@@ -298,11 +298,19 @@ public class TableServiceImpl implements ITableService { ...@@ -298,11 +298,19 @@ public class TableServiceImpl implements ITableService {
returnNode.setType("Table"); returnNode.setType("Table");
// 获取属性 // 获取属性
Map<String,String> attributeMap = getAttributeMap(metadataId); Map<String,String> attributeMap = getAttributeMap(metadataId);
List<BaseNode> sourceEtlScriptBetween2TableList = neo4jTableRepository.getTargetEtlScriptBetween2Table(tableId, metadataId);
Map<String,String> relationMap = new HashMap<>();
for(BaseNode sourceEtlScriptBetween2Table:sourceEtlScriptBetween2TableList){
relationMap.put("name",sourceEtlScriptBetween2Table.getName());
relationMap.put("metadataId",sourceEtlScriptBetween2Table.getMetadataId());
}
returnNode.setAttributeMaps(attributeMap);
if(!targetTables.contains(returnNode)){ if(!targetTables.contains(returnNode)){
targetTables.add(returnNode); targetTables.add(returnNode);
} }
returnNode.setRelationMaps(relationMap);
returnNode.setAttributeMaps(attributeMap);
}else { }else {
getTargetTable(metadataId,targetTables); getTargetTable(metadataId,targetTables);
} }
...@@ -318,13 +326,19 @@ public class TableServiceImpl implements ITableService { ...@@ -318,13 +326,19 @@ public class TableServiceImpl implements ITableService {
returnNode.setId(baseNode.getMetadataId()); returnNode.setId(baseNode.getMetadataId());
returnNode.setName(baseNode.getName()); returnNode.setName(baseNode.getName());
returnNode.setType("Table"); returnNode.setType("Table");
// 获取属性 // 获取属性
Map<String,String> attributeMap = getAttributeMap(metadataId); Map<String,String> attributeMap = getAttributeMap(metadataId);
List<BaseNode> sourceEtlScriptBetween2TableList = neo4jTableRepository.getSourceEtlScriptBetween2Table(tableId, metadataId);
Map<String,String> relationMap = new HashMap<>();
for(BaseNode sourceEtlScriptBetween2Table:sourceEtlScriptBetween2TableList){
relationMap.put("name",sourceEtlScriptBetween2Table.getName());
relationMap.put("metadataId",sourceEtlScriptBetween2Table.getMetadataId());
}
if(!sourceTables.contains(returnNode)){ if(!sourceTables.contains(returnNode)){
sourceTables.add(returnNode); sourceTables.add(returnNode);
} }
returnNode.setRelationMaps(relationMap);
returnNode.setAttributeMaps(attributeMap); returnNode.setAttributeMaps(attributeMap);
}else { }else {
getSourceTable(metadataId,sourceTables); getSourceTable(metadataId,sourceTables);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment