InfluxDB【InfluxDB数据库操作】

InfluxDB【InfluxDB数据库操作】InfluxDB数据库连接操作类/**Copyright(c)2010-2020.*/packagecom.sjasoft.cloud.admin.inflixdbconn;importorg.influxdb.InfluxDB;importorg.influxdb.InfluxDB.ConsistencyLevel;importorg.influxdb.InfluxDBFactory;importorg.influxdb.dto.*;importorg.in

大家好,欢迎来到IT知识分享网。

InfluxDB数据库连接操作类

/* * Copyright (c) 2010-2020. */

package com.sjasoft.cloud.admin.inflixdbconn;


import org.influxdb.InfluxDB;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.*;
import org.influxdb.dto.Point.Builder;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/** * InfluxDB数据库连接操作类 * * @author 大脑补丁 */
public class InfluxDBConnection { 
   

    // 用户名
    private String username;
    // 密码
    private String password;
    // 连接地址
    private String openurl;
    // 数据库
    private String database;
    // 保留策略
    private String retentionPolicy;

    private InfluxDB influxDB;

    public InfluxDBConnection(String username, String password, String openurl, String database,
                              String retentionPolicy) { 
   
        this.username = username;
        this.password = password;
        this.openurl = openurl;
        this.database = database;
        this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;
        influxDbBuild();
    }

    /** * 创建数据库 * * @param dbName */
    @SuppressWarnings("deprecation")
    public void createDB(String dbName) { 
   
        influxDB.createDatabase(dbName);
    }

    /** * 删除数据库 * * @param dbName */
    @SuppressWarnings("deprecation")
    public void deleteDB(String dbName) { 
   
        influxDB.deleteDatabase(dbName);
    }

    /** * 测试连接是否正常 * * @return true 正常 */
    public boolean ping() { 
   
        boolean isConnected = false;
        Pong pong;
        try { 
   
            pong = influxDB.ping();
            if (pong != null) { 
   
                isConnected = true;
            }
        } catch (Exception e) { 
   
            e.printStackTrace();
        }
        return isConnected;
    }

    /** * 连接时序数据库 ,若不存在则创建 * * @return */
    public InfluxDB influxDbBuild() { 
   
        if (influxDB == null) { 
   
            influxDB = InfluxDBFactory.connect(openurl, username, password);
        }
        try { 
   
            // if (!influxDB.databaseExists(database)) { 
   
            // influxDB.createDatabase(database);
            // }
        } catch (Exception e) { 
   
            // 该数据库可能设置动态代理,不支持创建数据库
            // e.printStackTrace();
        }
        influxDB.setLogLevel(InfluxDB.LogLevel.NONE);
        return influxDB;
    }

    /** * 创建自定义保留策略 * * @param policyName * 策略名 * @param duration * 保存天数 * @param replication * 保存副本数量 * @param isDefault * 是否设为默认保留策略 */
    public void createRetentionPolicy(String policyName, String duration, int replication, Boolean isDefault) { 
   
        String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,
                database, duration, replication);
        if (isDefault) { 
   
            sql = sql + " DEFAULT";
        }
        this.query(sql);
    }

    /** * 创建默认的保留策略 * * @param :default,保存天数:30天,保存副本数量:1 * 设为默认保留策略 */
    public void createDefaultRetentionPolicy() { 
   
        String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",
                "default", database, "30d", 1);
        this.query(command);
    }

    /** * 查询 * * @param command * 查询语句 * @return */
    public QueryResult query(String command) { 
   
        return influxDB.query(new Query(command, database));
    }

    /** * 插入 * * @param measurement * 表 * @param tags * 标签 * @param fields * 字段 */
    public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time,
                       TimeUnit timeUnit) { 
   
        Builder builder = Point.measurement(measurement);
        builder.tag(tags);
        builder.fields(fields);
        if (0 != time) { 
   
            builder.time(time, timeUnit);
        }
        influxDB.write(database, retentionPolicy, builder.build());
    }

    /** * 批量写入测点 * * @param batchPoints */
    public void batchInsert(BatchPoints batchPoints) { 
   
        influxDB.write(batchPoints);
        // influxDB.enableGzip();
        // influxDB.enableBatch(2000,100,TimeUnit.MILLISECONDS);
        // influxDB.disableGzip();
        // influxDB.disableBatch();
    }

    /** * 批量写入数据 * * @param database * 数据库 * @param retentionPolicy * 保存策略 * @param consistency * 一致性 * @param records * 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record) */
    public void batchInsert(final String database, final String retentionPolicy, final ConsistencyLevel consistency,
                            final List<String> records) { 
   
        influxDB.write(database, retentionPolicy, consistency, records);
    }

    /** * 删除 * * @param command * 删除语句 * @return 返回错误信息 */
    public String deleteMeasurementData(String command) { 
   
        QueryResult result = influxDB.query(new Query(command, database));
        return result.getError();
    }

    /** * 关闭数据库 */
    public void close() { 
   
        influxDB.close();
    }

    /** * 构建Point * * @param measurement * @param time * @param fields * @return */
    public Point pointBuilder(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) { 
   
        Point point = Point.measurement(measurement).time(time, TimeUnit.MILLISECONDS).tag(tags).fields(fields).build();
        return point;
    }

}

InfluxDBUtil【influxDb工具类(将数据库中的数据提取至influx监测类集合中)】

package com.sjasoft.cloud.admin.inflixdbconn;

import com.sjasoft.cloud.admin.dto.emsapp.EnvMonitorDataVo;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;

public class InfluxDBUtil { 
   


    public static List<EnvMonitorDataVo> getEnvMonitorData(List<List<Object>> valueList){ 
   
        List<EnvMonitorDataVo> monitorDataList = new ArrayList<>();
        EnvMonitorDataVo envMonitorDataVo;
// List<List<Object>> valueList = resultSeries.stream()
// .map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0);
            if(!CollectionUtils.isEmpty(valueList)){ 
   
                for (List<Object> value : valueList) { 
   
                    envMonitorDataVo =new EnvMonitorDataVo();
                    // 数据库中字段1取值
                    //String field1 = value.get(0) == null ? null : value.get(0).toString();
                    // 数据库中字段2取值
                    //String field2 = value.get(1) == null ? null : value.get(1).toString();
                    // TODO 将数据库中的数据提取至influx监测类中
                    //时间
                    envMonitorDataVo.setTime(value.get(0) == null ? null : value.get(0).toString());
                    //坐标类型
                    envMonitorDataVo.setCoordinateType(value.get(1) == null ? 0 : Float.parseFloat(value.get(1).toString()));
                    //湿度
                    envMonitorDataVo.setHum(value.get(2) == null ? 0.0f : Float.parseFloat(value.get(2).toString()));
                    //纬度
                    envMonitorDataVo.setLat(value.get(3) == null ? 0.0f : Float.parseFloat(value.get(3).toString()));
                    //经度
                    envMonitorDataVo.setLng(value.get(4) == null ? 0.0f : Float.parseFloat(value.get(4).toString()));
                    //噪音
                    envMonitorDataVo.setNoise(value.get(5) == null ? 0.0f : Float.parseFloat(value.get(5).toString()));
                    //Pm10
                    envMonitorDataVo.setPm10(value.get(6) == null ? 0.0f : Float.parseFloat(value.get(6).toString()));
                    //Pm2.5
                    envMonitorDataVo.setPm25(value.get(7) == null ? 0.0f : Float.parseFloat(value.get(7).toString()));
                    //气压
                    envMonitorDataVo.setPressure(value.get(8) == null ? 0.0f : Float.parseFloat(value.get(8).toString()));
                    //继电器状态
                    envMonitorDataVo.setRelayStatus(value.get(9) == null ? null : value.get(9).toString());
                    //TSP
                    envMonitorDataVo.setTsp(value.get(10) == null ? 0.0f : Float.parseFloat(value.get(10).toString()));
                    //温度
                    envMonitorDataVo.setTem(value.get(11) == null ? 0.0f : Float.parseFloat(value.get(11).toString()));
                    //风向系数 0北风 1东北风 2东风 3 东南风 4南风 5西南风 6西风 7西北风
                    envMonitorDataVo.setWindDirectionCoefficient(value.get(12) == null ? 0.0f : Float.parseFloat(value.get(12).toString()));
                    //风向度数
                    envMonitorDataVo.setWindDirectionDegree(value.get(13) == null ? 0.0f : Float.parseFloat(value.get(13).toString()));
                    //风力
                    envMonitorDataVo.setWindPower(value.get(14) == null ? 0.0f : Float.parseFloat(value.get(14).toString()));
                    //风速
                    envMonitorDataVo.setWindSpeed(value.get(15) == null ? 0.0f : Float.parseFloat(value.get(15).toString()));
                    //设备id
                    envMonitorDataVo.setDeviceId(value.get(16) == null ? null : value.get(16).toString());
                    monitorDataList.add(envMonitorDataVo);
                }
            }
        return monitorDataList;

    }

}

监测信息使用类

package com.sjasoft.cloud.admin.service.emsapp;

import com.alibaba.fastjson.JSON;
import com.sjasoft.cloud.admin.dto.emsapp.EnvMonitorDataVo;
import com.sjasoft.cloud.admin.dto.emsapp.EnvMonitorParam;
import com.sjasoft.cloud.admin.inflixdbconn.InfluxDBConnection;
import com.sjasoft.cloud.admin.inflixdbconn.InfluxDBUtil;
import org.influxdb.dto.QueryResult;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.lang.management.MonitorInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/** * @Author lufei * @Description 环境监测 * @Date 2021/2/4 下午 05:19 **/
@Service
public class EnvMonitorService { 
   
    private static Logger logger = LoggerFactory.getLogger(EnvMonitorService.class);
    @Value("${InfluxDBConnection.url}")
    private  String url;
    @Value("${InfluxDBConnection.username}")
    private  String username;
    @Value("${InfluxDBConnection.password}")
    private  String password;
    @Value("${time.zone}")
    private String timeZone;
    @Value("${InfluxDBConnection.dataBaseName}")
    private String influxDbDataBaseNm;

    /** * 查看环境监测设备是否在线及在线时部门监测信息 * @param envMonitorParam * @return */
    public EnvMonitorDataVo getEnvMonitorIsOnline(@NotNull EnvMonitorParam envMonitorParam){ 
   
        InfluxDBConnection influxDBConnection = new InfluxDBConnection(username, password, url, influxDbDataBaseNm, "default");//'"+deviceId+"'
        EnvMonitorDataVo monitorDataVo = new EnvMonitorDataVo();
        try { 
   
            //String sql = "SELECT last(DATA_VALUE) from \"t_device_data\" WHERE device_id='"+envMonitorParam.getDeviceId()+"' AND time > now() - 2m GROUP BY DATA_TYPE tz( ' "+timeZone+" ')";
            String sqlIsOnline = "SELECT CoordinateType,Hum,Lat,Lng,Noise,Pm10,Pm25,Pressure,RelayStatus,Tsp,Tem,WindDirectionCoefficient,WindDirectionDegree,WindPower,WindSpeed,device_id from \"Renke-RS-ZSYC\" WHERE device_id='"+envMonitorParam.getDeviceId()+"' AND time > now() - 8h order by time desc limit 1";
            logger.info("=====查询是否在线执行的sql:{}==========",sqlIsOnline);
            /************************判断是否在线**************************/
            QueryResult queryIsOnline = influxDBConnection.query(sqlIsOnline);
            logger.info("=====influxDB====查询是否在线===成功====!!!!=====");
            QueryResult.Result isOnlineResult = queryIsOnline.getResults().get(0);
            if (CollectionUtils.isEmpty(isOnlineResult.getSeries())) { 
   
                logger.info("=====influxDB====【不在线】=======!!!!=====");
            }else { 
   
                List<List<Object>> valueList = isOnlineResult.getSeries().stream()
                        .map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0);
                List<EnvMonitorDataVo> monitorDataList = InfluxDBUtil.getEnvMonitorData(valueList);
                if(!CollectionUtils.isEmpty(monitorDataList)){ 
   
                    BeanUtils.copyProperties(monitorDataList.get(0),monitorDataVo);
                    logger.info("==getEnvMonitorIsOnline==返回:={}",JSON.toJSONString(monitorDataVo));
                    monitorDataVo.setIsOnline("1");
                    logger.info("=====influxDB====在线=======!!!!=====");
                }
            }
        }catch (Exception e){ 
   
            logger.error("======================InfluxDB===调用失败!=======================");
            e.printStackTrace();
        }finally { 
   
            influxDBConnection.close();
        }
        return monitorDataVo;
    }



    /** * 环境监测实时数据查看 * @param envMonitorParam * @return */
    public EnvMonitorDataVo getEnvMonitorRealTimeData(@NotNull EnvMonitorParam envMonitorParam){ 
   
        InfluxDBConnection influxDBConnection = new InfluxDBConnection(username, password, url, influxDbDataBaseNm, "default");//'"+deviceId+"'
        EnvMonitorDataVo monitorDataVo = new EnvMonitorDataVo();
        try { 
   
            //String sql = "SELECT last(DATA_VALUE) from \"t_device_data\" WHERE device_id='"+envMonitorParam.getDeviceId()+"' AND time > now() - 2m GROUP BY DATA_TYPE tz( ' "+timeZone+" ')";
            String sql = "SELECT CoordinateType,Hum,Lat,Lng,Noise,Pm10,Pm25,Pressure,RelayStatus,Tsp,Tem,WindDirectionCoefficient,WindDirectionDegree,WindPower,WindSpeed,device_id from \"Renke-RS-ZSYC\" WHERE device_id='"+envMonitorParam.getDeviceId()+"' order by time desc limit 1";
            logger.info("=====查询监测信息执行的sql:{}==========",sql);
            QueryResult queryResult = influxDBConnection.query(sql);
            logger.info("=====influxDB====查询监测信息===成功====!!!!=====");
            QueryResult.Result oneResult = queryResult.getResults().get(0);
            if (!CollectionUtils.isEmpty(oneResult.getSeries())) { 
   
                List<List<Object>> valueList = oneResult.getSeries().stream()
                    .map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0);
                List<EnvMonitorDataVo> monitorDataList = InfluxDBUtil.getEnvMonitorData(valueList);
                if(!CollectionUtils.isEmpty(monitorDataList)){ 
   
                    BeanUtils.copyProperties(monitorDataList.get(0),monitorDataVo);
                    logger.info("==getEnvMonitorIsOnline==返回:={}",JSON.toJSONString(monitorDataVo));
                }
            }
        }catch (Exception e){ 
   
            logger.error("======================InfluxDB===调用失败!=======================");
            e.printStackTrace();
        }finally { 
   
            influxDBConnection.close();
        }
        return monitorDataVo;
    }


    /** * 根据设备编号查看环境监测一天内的某项采样所有数值 * 此次的一天不可直接推日期【使用24小时计算】 * @param envMonitorParam * @return */
    public List<EnvMonitorDataVo> getEnvMonitorOneDaySampInfo(@NotNull EnvMonitorParam envMonitorParam){ 
   
        InfluxDBConnection influxDBConnection = new InfluxDBConnection(username, password, url, influxDbDataBaseNm, "default");//'"+deviceId+"'
        List<EnvMonitorDataVo> monitorDataList = new ArrayList<>();
        EnvMonitorDataVo monitorDataVo = null;
        try { 
   
            //String sql = "SELECT last(DATA_VALUE) from \"t_device_data\" WHERE device_id='"+envMonitorParam.getDeviceId()+"' AND time > now() - 2m GROUP BY DATA_TYPE tz( ' "+timeZone+" ')";
            String sql = "SELECT CoordinateType,Hum,Lat,Lng,Noise,Pm10,Pm25,Pressure,RelayStatus,Tsp,Tem,WindDirectionCoefficient,WindDirectionDegree,WindPower,WindSpeed,device_id from \"Renke-RS-ZSYC\" WHERE time > now() - 40h AND device_id='"+envMonitorParam.getDeviceId()+"' order by time desc";
            logger.info("=====查询监测信息执行的sql:{}==========",sql);
            QueryResult queryResult = influxDBConnection.query(sql);
            logger.info("=====influxDB====查询监测信息===成功====!!!!=====");
            //QueryResult.Result oneResult = queryResult.getResults().get(0);
            //获取一天内数据集合
            QueryResult.Result resultList = queryResult.getResults().get(0);
            if(!CollectionUtils.isEmpty(resultList.getSeries())){ 
   
                List<List<Object>> valueList = resultList.getSeries().stream()
                        .map(QueryResult.Series::getValues).collect(Collectors.toList()).get(0);
                monitorDataList = InfluxDBUtil.getEnvMonitorData(valueList);
            }
            if(CollectionUtils.isEmpty(monitorDataList)){ 
   
                logger.info("=====InfluxDB===:=======未查到数据=========");
            }else { 
   
                logger.info("=====InfluxDB===:=======查到{}条数据=========",monitorDataList.size());
            }
        }catch (Exception e){ 
   
            logger.error("======================InfluxDB===调用失败!=======================");
            e.printStackTrace();
        }finally { 
   
            influxDBConnection.close();
        }
        return monitorDataList;
    }


}

application.properties简单配置


# 亚洲时区
time.zone=Asia/Shanghai
# 服务器的时序数据库
InfluxDBConnection.url=http://192.168.2.50:8086
InfluxDBConnection.username=csems
InfluxDBConnection.password=qaz123/*-+ # 数据库名 InfluxDBConnection.dataBaseName = eam-buildingsite # 组织表中图片前缀 orgImage.imagePrefix = E:\\360Downloads\testImage 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/23411.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信