大家好,欢迎来到IT知识分享网。
目前项目采用springboot + jpa (多数据源)+es开发,主要目的是实现每5分钟从es中统计各个扫描类型的统计信息。为方便测试,还特意留了一个controller方法进行测试。接口测试一切正常,原以为这样就万事大吉了,没想到定时任务触发的时候,却直接抛出了no transaction is in progress的异常。顿时懵逼!跟踪调试,发现主要在threadlocal中没有发现flush的相关key,所以报错。一顿google + 百度下来,终于发现原来国内外开发的同学都遇到了相同的问题。答案大多类似,但是很多根本解决不了我的问题。于是决定将我的解决方案分享出来,希望可以给遇到类似问题的同学一点帮助。
环境:springboot+jpa(多数据源)+es
问题描述:某个接口使用@Scheduled定时调度,内部方法中使用了jpa的saveAll() 和flush()方法,方法执行时报异常’no transaction in progress’。但是,通过controller发送接口请求的时候,却是正常的。
问题原因:在controller请求中,因为是在主线程的context上下文中,所以,是通过@autowired 注解是可以识别@Transaction注解的,但是,在@scheduled注解的定时任务中,spring是通过子线程来实现的。而在子线程中,它与主线程的context上下文不同,它无法识别@transaction注解,从而导致报错。
解决方案1:在@Scheduled注解的地方注入事务管理器。具体参考
https://stackoverflow.com/questions//spring4-scheduled-transaction-throws-no-transaction-is-in-progress-at-flush-fo
解决方案2:不要采用@autowired注解,而是通过构造函数的方式传递repository对象。
具体实现代码如下:
1、maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.20</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency>
2、目录结构及配置文件
spring: ##多数据源配置 外网轨迹相关数据源 datasource: primary: url: jdbc:mysql://xx.xx.xx.xxx:3403/db_waybill_center?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=true username: xxxxxx password: xxxxxx max-active: 10 max-idle: 5 min-idle: 0 driver-class-name: com.mysql.cj.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource #hibernate配置 second: url: jdbc:mysql://xx.xx.xx.xxx:3403/db_waybill_center_shipid_mgmt?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=true username: xxxxxx password: xxxxxx max-active: 10 max-idle: 5 min-idle: 0 driver-class-name: com.mysql.cj.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource #hibernate配置 jpa: properties: hibernate: hbm2ddl: auto: none dialect: org.hibernate.dialect.MySQL5InnoDBDialect show-sql: true ##es相关的配置 elasticsearch: rest: connection-timeout: 60000 read-timeout: 60000 uris: xx.xx.xx.xxx:1039,xx.xx.xx.xxx:1033,xx.xx.xx.xxx:1029 username: xxxxxx password: xxxxxx
3、多数据源配置类:
package com.yundasys.waybillcenter.waybillcenterstreamcalculation.conf.db; import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties; import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings; import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.annotation.Resource; import javax.persistence.EntityManager; import javax.sql.DataSource; import java.util.Map; import java.util.Objects; /** * @author liumch * date 2020/11/4 17:29 * description jpa 多相关entity manager配置相关类 */ @Configuration @EnableTransactionManagement @EnableJpaRepositories( entityManagerFactoryRef="entityManagerFactoryPrimary", transactionManagerRef="transactionManagerPrimary", basePackages= { "com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.primary" }) public class PrimaryEntityManagerConfig { @Bean(name = "primaryDS") @Qualifier("primaryDS") @ConfigurationProperties(prefix = "spring.datasource.primary") @Primary public DataSource primaryDS(){ return DruidDataSourceBuilder.create().build(); } @Resource private JpaProperties jpaProperties; @Resource private HibernateProperties hibernateProperties; @Primary @Bean(name = "entityManagerPrimary") public EntityManager entityManager(EntityManagerFactoryBuilder builder) { return Objects.requireNonNull(entityManagerFactoryPrimary(builder).getObject()).createEntityManager(); } @Primary @Bean(name = "entityManagerFactoryPrimary") public LocalContainerEntityManagerFactoryBean entityManagerFactoryPrimary (EntityManagerFactoryBuilder builder) { // 解决2.1.x版本jpaProperties.getHibernateProperties(hibernateSettings);失效的问题 Map<String, Object> properties = hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings()); return builder.dataSource(primaryDS()) // .properties(getVendorProperties()) .properties(properties) // 实体类位置 .packages("com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity") // 持久性单元的名称。 .persistenceUnit("primaryPersistenceUnit") .build(); } // 会导致no transaction的错误 // @Primary // @Bean(name = "transactionManagerPrimary") // public PlatformTransactionManager transactionManagerPrimary(DataSource dataSource) // { // return new DataSourceTransactionManager(dataSource); // } /*** 这个地方很关键,如果写成PlatformTransactionManager 则会报错 ***/ /**** builder飘红没关系,忽略掉 **/ @Primary @Bean(name = "transactionManagerPrimary") public JpaTransactionManager transactionManager(EntityManagerFactoryBuilder builder) { JpaTransactionManager transactionManager = new JpaTransactionManager(); transactionManager.setEntityManagerFactory(entityManagerFactoryPrimary(builder).getObject()); return transactionManager; } }
package com.yundasys.waybillcenter.waybillcenterstreamcalculation.conf.db; import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties; import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings; import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.annotation.Resource; import javax.persistence.EntityManager; import javax.sql.DataSource; import java.util.Map; import java.util.Objects; /** * @author liumch * date 2020/11/4 17:29 * description jpa 多相关entity manager配置相关类 */ @Configuration @EnableTransactionManagement @EnableJpaRepositories( entityManagerFactoryRef="entityManagerFactorySecond", transactionManagerRef="transactionManagerSecond", basePackages= { "com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.second" }) public class SecondEntityManagerConfig { @Bean(name = "SecondDS") @Qualifier("SecondDS") @ConfigurationProperties(prefix = "spring.datasource.second") public DataSource secondDS(){ return DruidDataSourceBuilder.create().build(); } @Resource private JpaProperties jpaProperties; @Resource private HibernateProperties hibernateProperties; @Bean(name = "entityManagerSecond") public EntityManager entityManager(EntityManagerFactoryBuilder builder) { return Objects.requireNonNull(entityManagerFactorySecond(builder).getObject()).createEntityManager(); } @Bean(name = "entityManagerFactorySecond") public LocalContainerEntityManagerFactoryBean entityManagerFactorySecond (EntityManagerFactoryBuilder builder) { // 解决2.1.x版本jpaProperties.getHibernateProperties(hibernateSettings);失效的问题 Map<String, Object> properties = hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings()); return builder.dataSource(secondDS()) // .properties(getVendorProperties()) .properties(properties) // 实体类位置 .packages("com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity") // 持久性单元的名称。 .persistenceUnit("SecondPersistenceUnit") .build(); } // @Bean(name = "transactionManagerSecond") // public PlatformTransactionManager transactionManagerYdServer(DataSource dataSource) // { // return new DataSourceTransactionManager(dataSource); // } @Bean(name = "transactionManagerSecond") public JpaTransactionManager transactionManager(EntityManagerFactoryBuilder builder) { JpaTransactionManager transactionManager = new JpaTransactionManager(); transactionManager.setEntityManagerFactory(entityManagerFactorySecond(builder).getObject()); return transactionManager; } }
4、repository接口
package com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.primary; import com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity.EsStaticScanTypeEntity; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; import org.springframework.stereotype.Repository; /** * @author liuyu */ @Repository public interface EsStaticScanTypeRepository extends JpaRepository<EsStaticScanTypeEntity, Long>, JpaSpecificationExecutor<EsStaticScanTypeEntity> { }
5、核心的计算服务(从es中聚合查询,并添加到数据库)
package com.yundasys.waybillcenter.waybillcenterstreamcalculation.service; import com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.dto.CountScanTypeDto; import com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity.EsStaticScanTypeEntity; import com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.primary.EsStaticScanTypeRepository; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.FastDateFormat; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Locale; import java.util.stream.Collectors; /** * @program: waybill-center-stream * @description: * @author: liumch * @create: 2021-07-09 16:03 */ @Service @Slf4j public class CalculateRuleService { @Resource RestHighLevelClient elasticsearchClient; final EsStaticScanTypeRepository repository; /** 这个地方很关键,如果写成@autowired 会报错**/ public CalculateRuleService(EsStaticScanTypeRepository repository) { this.repository = repository; } private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); /** * 索引前缀格式信息 */ private static final FastDateFormat INDEX_PREFIX_FORMATTER = FastDateFormat.getInstance("'smi_'yyyyMMdd", Locale.CHINA); /** * es统计 */ public void esCalc(long start, long end) { // start = 00L; // end = 00L; List<CountScanTypeDto> countScanTypeDtos = new ArrayList<>(20); final long[] scanTypes = new long[]{14L, 24L, 10L, 18L, 64L}; BoolQueryBuilder externalQueryBuilder = QueryBuilders.boolQuery(); externalQueryBuilder.filter( QueryBuilders.termsQuery("scanType", scanTypes)) .filter(QueryBuilders.rangeQuery("scanTime").gte(start).lt(end)) .queryName("queryByScanType"); //对于聚合函数,必须要有一个聚合的字段。相当于sql中的聚合函数必须要有 GroupBy TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("GroupByScanType").field("scanType").size(50) .subAggregation(AggregationBuilders.count("count").field("shipId.keyword")); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(externalQueryBuilder) .aggregation(aggregationBuilder) .size(0) .clearRescorers(); // String index = "smi_"; String index = INDEX_PREFIX_FORMATTER.format(new Date()); SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(index); searchRequest.source(sourceBuilder); log.info("请求语句:" + searchRequest.toString()); SearchResponse search = null; try { search = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT); } catch (Exception e) { log.error("获取es数据失败", e); return; } final Aggregations aggregations = search.getAggregations(); for (Aggregation agg : aggregations) { String name = agg.getName(); if (name.equals(aggregationBuilder.getName())) { for (long scanType : scanTypes) { MultiBucketsAggregation.Bucket bucket = ((ParsedLongTerms) agg).getBucketByKey("" + scanType); if (null != bucket) { countScanTypeDtos.add(new CountScanTypeDto(scanType, bucket.getDocCount())); } } } } log.info("返回结果" + countScanTypeDtos.toString()); saveDb(countScanTypeDtos); } @Transactional(rollbackFor = RuntimeException.class) public int saveDb(List<CountScanTypeDto> dtos) { final Date date = new Date(); final Integer day = Integer.parseInt(LocalDate.now().format(DATE_TIME_FORMATTER)); final List<EsStaticScanTypeEntity> entities = dtos.stream() .map(x -> new EsStaticScanTypeEntity(null, (int) x.getScanType(), (int) x.getCount(), day, date, date) ).collect(Collectors.toList()); final List<EsStaticScanTypeEntity> typeEntities = repository.saveAll(entities); repository.flush(); return typeEntities.size(); } }
6、Scheduled task任务
@Component @EnableScheduling @Slf4j public class OfflineCalcTask { private final CalculateRuleService calculateRuleService; public OfflineCalcTask(CalculateRuleService calculateRuleService) { this.calculateRuleService = calculateRuleService; } @Scheduled(cron = "0 0/5 * * * ?") public void calcByScanType() { TimeRangeDto timeRangeDto = CommonUtil.timeRangeByInterval(); calculateRuleService.esCalc(timeRangeDto.getStart(), timeRangeDto.getEnd()); } }
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/46408.html