大家好,欢迎来到IT知识分享网。
InfluxDB数据库连接操作类
/* * Copyright (c) 2010-2020. */
package com.sjasoft.cloud.admin.inflixdbconn;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.*;
import org.influxdb.dto.Point.Builder;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/** * InfluxDB数据库连接操作类 * * @author 大脑补丁 */
public class InfluxDBConnection {
// 用户名
private String username;
// 密码
private String password;
// 连接地址
private String openurl;
// 数据库
private String database;
// 保留策略
private String retentionPolicy;
private InfluxDB influxDB;
public InfluxDBConnection(String username, String password, String openurl, String database,
String retentionPolicy) {
this.username = username;
this.password = password;
this.openurl = openurl;
this.database = database;
this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;
influxDbBuild();
}
/** * 创建数据库 * * @param dbName */
@SuppressWarnings("deprecation")
public void createDB(String dbName) {
influxDB.createDatabase(dbName);
}
/** * 删除数据库 * * @param dbName */
@SuppressWarnings("deprecation")
public void deleteDB(String dbName) {
influxDB.deleteDatabase(dbName);
}
/** * 测试连接是否正常 * * @return true 正常 */
public boolean ping() {
boolean isConnected = false;
Pong pong;
try {
pong = influxDB.ping();
if (pong != null) {
isConnected = true;
}
} catch (Exception e) {
e.printStackTrace();
}
return isConnected;
}
/** * 连接时序数据库 ,若不存在则创建 * * @return */
public InfluxDB influxDbBuild() {
if (influxDB == null) {
influxDB = InfluxDBFactory.connect(openurl, username, password);
}
try {
// if (!influxDB.databaseExists(database)) {
// influxDB.createDatabase(database);
// }
} catch (Exception e) {
// 该数据库可能设置动态代理,不支持创建数据库
// e.printStackTrace();
}
influxDB.setLogLevel(InfluxDB.LogLevel.NONE);
return influxDB;
}
/** * 创建自定义保留策略 * * @param policyName * 策略名 * @param duration * 保存天数 * @param replication * 保存副本数量 * @param isDefault * 是否设为默认保留策略 */
public void createRetentionPolicy(String policyName, String duration, int replication, Boolean isDefault) {
String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,
database, duration, replication);
if (isDefault) {
sql = sql + " DEFAULT";
}
this.query(sql);
}
/** * 创建默认的保留策略 * * @param :default,保存天数:30天,保存副本数量:1 * 设为默认保留策略 */
public void createDefaultRetentionPolicy() {
String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
"default", database, "30d", 1);
this.query(command);
}
/** * 查询 * * @param command * 查询语句 * @return */
public QueryResult query(String command) {
return influxDB.query(new Query(command, database));
}
/** * 插入 * * @param measurement * 表 * @param tags * 标签 * @param fields * 字段 */
public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time,
TimeUnit timeUnit) {
Builder builder = Point.measurement(measurement);
builder.tag(tags);
builder.fields(fields);
if (0 != time) {
builder.time(time, timeUnit);
}
influxDB.write(database, retentionPolicy, builder.build());
}
/** * 批量写入测点 * * @param batchPoints */
public void batchInsert(BatchPoints batchPoints) {
influxDB.write(batchPoints);
// influxDB.enableGzip();
// influxDB.enableBatch(2000,100,TimeUnit.MILLISECONDS);
// influxDB.disableGzip();
// influxDB.disableBatch();
}
/** * 批量写入数据 * * @param database * 数据库 * @param retentionPolicy * 保存策略 * @param consistency * 一致性 * @param records * 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record) */
public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
final List<String> records) {
influxDB.write(database, retentionPolicy, consistency, records);
}
/** * 删除 * * @param command * 删除语句 * @return 返回错误信息 */
public String deleteMeasurementData(String command) {
QueryResult result = influxDB.query(new Query(command, database));
return result.getError();
}
/** * 关闭数据库 */
public void close() {
influxDB.close();
}
/** * 构建Point * * @param measurement * @param time * @param fields * @return */
public Point pointBuilder(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {
Point point = Point.measurement(measurement).time(time, TimeUnit.MILLISECONDS).tag(tags).fields(fields).build();
return point;
}
}
InfluxDBUtil【influxDb工具类(将数据库中的数据提取至influx监测类集合中)】
package com.sjasoft.cloud.admin.inflixdbconn;
import com.sjasoft.cloud.admin.dto.emsapp.EnvMonitorDataVo;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
public class InfluxDBUtil {
public static List<EnvMonitorDataVo> getEnvMonitorData(List<List<Object>> valueList){
List<EnvMonitorDataVo> monitorDataList = new ArrayList<>();
EnvMonitorDataVo envMonitorDataVo;
// List<List<Object>> valueList = resultSeries.stream()
// .map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0);
if(!CollectionUtils.isEmpty(valueList)){
for (List<Object> value : valueList) {
envMonitorDataVo =new EnvMonitorDataVo();
// 数据库中字段1取值
//String field1 = value.get(0) == null ? null : value.get(0).toString();
// 数据库中字段2取值
//String field2 = value.get(1) == null ? null : value.get(1).toString();
// TODO 将数据库中的数据提取至influx监测类中
//时间
envMonitorDataVo.setTime(value.get(0) == null ? null : value.get(0).toString());
//坐标类型
envMonitorDataVo.setCoordinateType(value.get(1) == null ? 0 : Float.parseFloat(value.get(1).toString()));
//湿度
envMonitorDataVo.setHum(value.get(2) == null ? 0.0f : Float.parseFloat(value.get(2).toString()));
//纬度
envMonitorDataVo.setLat(value.get(3) == null ? 0.0f : Float.parseFloat(value.get(3).toString()));
//经度
envMonitorDataVo.setLng(value.get(4) == null ? 0.0f : Float.parseFloat(value.get(4).toString()));
//噪音
envMonitorDataVo.setNoise(value.get(5) == null ? 0.0f : Float.parseFloat(value.get(5).toString()));
//Pm10
envMonitorDataVo.setPm10(value.get(6) == null ? 0.0f : Float.parseFloat(value.get(6).toString()));
//Pm2.5
envMonitorDataVo.setPm25(value.get(7) == null ? 0.0f : Float.parseFloat(value.get(7).toString()));
//气压
envMonitorDataVo.setPressure(value.get(8) == null ? 0.0f : Float.parseFloat(value.get(8).toString()));
//继电器状态
envMonitorDataVo.setRelayStatus(value.get(9) == null ? null : value.get(9).toString());
//TSP
envMonitorDataVo.setTsp(value.get(10) == null ? 0.0f : Float.parseFloat(value.get(10).toString()));
//温度
envMonitorDataVo.setTem(value.get(11) == null ? 0.0f : Float.parseFloat(value.get(11).toString()));
//风向系数 0北风 1东北风 2东风 3 东南风 4南风 5西南风 6西风 7西北风
envMonitorDataVo.setWindDirectionCoefficient(value.get(12) == null ? 0.0f : Float.parseFloat(value.get(12).toString()));
//风向度数
envMonitorDataVo.setWindDirectionDegree(value.get(13) == null ? 0.0f : Float.parseFloat(value.get(13).toString()));
//风力
envMonitorDataVo.setWindPower(value.get(14) == null ? 0.0f : Float.parseFloat(value.get(14).toString()));
//风速
envMonitorDataVo.setWindSpeed(value.get(15) == null ? 0.0f : Float.parseFloat(value.get(15).toString()));
//设备id
envMonitorDataVo.setDeviceId(value.get(16) == null ? null : value.get(16).toString());
monitorDataList.add(envMonitorDataVo);
}
}
return monitorDataList;
}
}
监测信息使用类
package com.sjasoft.cloud.admin.service.emsapp;
import com.alibaba.fastjson.JSON;
import com.sjasoft.cloud.admin.dto.emsapp.EnvMonitorDataVo;
import com.sjasoft.cloud.admin.dto.emsapp.EnvMonitorParam;
import com.sjasoft.cloud.admin.inflixdbconn.InfluxDBConnection;
import com.sjasoft.cloud.admin.inflixdbconn.InfluxDBUtil;
import org.influxdb.dto.QueryResult;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.lang.management.MonitorInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/** * @Author lufei * @Description 环境监测 * @Date 2021/2/4 下午 05:19 **/
@Service
public class EnvMonitorService {
private static Logger logger = LoggerFactory.getLogger(EnvMonitorService.class);
@Value("${InfluxDBConnection.url}")
private String url;
@Value("${InfluxDBConnection.username}")
private String username;
@Value("${InfluxDBConnection.password}")
private String password;
@Value("${time.zone}")
private String timeZone;
@Value("${InfluxDBConnection.dataBaseName}")
private String influxDbDataBaseNm;
/** * 查看环境监测设备是否在线及在线时部门监测信息 * @param envMonitorParam * @return */
public EnvMonitorDataVo getEnvMonitorIsOnline(@NotNull EnvMonitorParam envMonitorParam){
InfluxDBConnection influxDBConnection = new InfluxDBConnection(username, password, url, influxDbDataBaseNm, "default");//'"+deviceId+"'
EnvMonitorDataVo monitorDataVo = new EnvMonitorDataVo();
try {
//String sql = "SELECT last(DATA_VALUE) from \"t_device_data\" WHERE device_id='"+envMonitorParam.getDeviceId()+"' AND time > now() - 2m GROUP BY DATA_TYPE tz( ' "+timeZone+" ')";
String sqlIsOnline = "SELECT CoordinateType,Hum,Lat,Lng,Noise,Pm10,Pm25,Pressure,RelayStatus,Tsp,Tem,WindDirectionCoefficient,WindDirectionDegree,WindPower,WindSpeed,device_id from \"Renke-RS-ZSYC\" WHERE device_id='"+envMonitorParam.getDeviceId()+"' AND time > now() - 8h order by time desc limit 1";
logger.info("=====查询是否在线执行的sql:{}==========",sqlIsOnline);
/************************判断是否在线**************************/
QueryResult queryIsOnline = influxDBConnection.query(sqlIsOnline);
logger.info("=====influxDB====查询是否在线===成功====!!!!=====");
QueryResult.Result isOnlineResult = queryIsOnline.getResults().get(0);
if (CollectionUtils.isEmpty(isOnlineResult.getSeries())) {
logger.info("=====influxDB====【不在线】=======!!!!=====");
}else {
List<List<Object>> valueList = isOnlineResult.getSeries().stream()
.map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0);
List<EnvMonitorDataVo> monitorDataList = InfluxDBUtil.getEnvMonitorData(valueList);
if(!CollectionUtils.isEmpty(monitorDataList)){
BeanUtils.copyProperties(monitorDataList.get(0),monitorDataVo);
logger.info("==getEnvMonitorIsOnline==返回:={}",JSON.toJSONString(monitorDataVo));
monitorDataVo.setIsOnline("1");
logger.info("=====influxDB====在线=======!!!!=====");
}
}
}catch (Exception e){
logger.error("======================InfluxDB===调用失败!=======================");
e.printStackTrace();
}finally {
influxDBConnection.close();
}
return monitorDataVo;
}
/** * 环境监测实时数据查看 * @param envMonitorParam * @return */
public EnvMonitorDataVo getEnvMonitorRealTimeData(@NotNull EnvMonitorParam envMonitorParam){
InfluxDBConnection influxDBConnection = new InfluxDBConnection(username, password, url, influxDbDataBaseNm, "default");//'"+deviceId+"'
EnvMonitorDataVo monitorDataVo = new EnvMonitorDataVo();
try {
//String sql = "SELECT last(DATA_VALUE) from \"t_device_data\" WHERE device_id='"+envMonitorParam.getDeviceId()+"' AND time > now() - 2m GROUP BY DATA_TYPE tz( ' "+timeZone+" ')";
String sql = "SELECT CoordinateType,Hum,Lat,Lng,Noise,Pm10,Pm25,Pressure,RelayStatus,Tsp,Tem,WindDirectionCoefficient,WindDirectionDegree,WindPower,WindSpeed,device_id from \"Renke-RS-ZSYC\" WHERE device_id='"+envMonitorParam.getDeviceId()+"' order by time desc limit 1";
logger.info("=====查询监测信息执行的sql:{}==========",sql);
QueryResult queryResult = influxDBConnection.query(sql);
logger.info("=====influxDB====查询监测信息===成功====!!!!=====");
QueryResult.Result oneResult = queryResult.getResults().get(0);
if (!CollectionUtils.isEmpty(oneResult.getSeries())) {
List<List<Object>> valueList = oneResult.getSeries().stream()
.map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0);
List<EnvMonitorDataVo> monitorDataList = InfluxDBUtil.getEnvMonitorData(valueList);
if(!CollectionUtils.isEmpty(monitorDataList)){
BeanUtils.copyProperties(monitorDataList.get(0),monitorDataVo);
logger.info("==getEnvMonitorIsOnline==返回:={}",JSON.toJSONString(monitorDataVo));
}
}
}catch (Exception e){
logger.error("======================InfluxDB===调用失败!=======================");
e.printStackTrace();
}finally {
influxDBConnection.close();
}
return monitorDataVo;
}
/** * 根据设备编号查看环境监测一天内的某项采样所有数值 * 此次的一天不可直接推日期【使用24小时计算】 * @param envMonitorParam * @return */
public List<EnvMonitorDataVo> getEnvMonitorOneDaySampInfo(@NotNull EnvMonitorParam envMonitorParam){
InfluxDBConnection influxDBConnection = new InfluxDBConnection(username, password, url, influxDbDataBaseNm, "default");//'"+deviceId+"'
List<EnvMonitorDataVo> monitorDataList = new ArrayList<>();
EnvMonitorDataVo monitorDataVo = null;
try {
//String sql = "SELECT last(DATA_VALUE) from \"t_device_data\" WHERE device_id='"+envMonitorParam.getDeviceId()+"' AND time > now() - 2m GROUP BY DATA_TYPE tz( ' "+timeZone+" ')";
String sql = "SELECT CoordinateType,Hum,Lat,Lng,Noise,Pm10,Pm25,Pressure,RelayStatus,Tsp,Tem,WindDirectionCoefficient,WindDirectionDegree,WindPower,WindSpeed,device_id from \"Renke-RS-ZSYC\" WHERE time > now() - 40h AND device_id='"+envMonitorParam.getDeviceId()+"' order by time desc";
logger.info("=====查询监测信息执行的sql:{}==========",sql);
QueryResult queryResult = influxDBConnection.query(sql);
logger.info("=====influxDB====查询监测信息===成功====!!!!=====");
//QueryResult.Result oneResult = queryResult.getResults().get(0);
//获取一天内数据集合
QueryResult.Result resultList = queryResult.getResults().get(0);
if(!CollectionUtils.isEmpty(resultList.getSeries())){
List<List<Object>> valueList = resultList.getSeries().stream()
.map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0);
monitorDataList = InfluxDBUtil.getEnvMonitorData(valueList);
}
if(CollectionUtils.isEmpty(monitorDataList)){
logger.info("=====InfluxDB===:=======未查到数据=========");
}else {
logger.info("=====InfluxDB===:=======查到{}条数据=========",monitorDataList.size());
}
}catch (Exception e){
logger.error("======================InfluxDB===调用失败!=======================");
e.printStackTrace();
}finally {
influxDBConnection.close();
}
return monitorDataList;
}
}
application.properties简单配置
# 亚洲时区
time.zone=Asia/Shanghai
# 服务器的时序数据库
InfluxDBConnection.url=http://192.168.2.50:8086
InfluxDBConnection.username=csems
InfluxDBConnection.password=qaz123/*-+ # 数据库名 InfluxDBConnection.dataBaseName = eam-buildingsite # 组织表中图片前缀 orgImage.imagePrefix = E:\\360Downloads\testImage
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/23411.html