Influx-stress原理(初学)

Influx-stress原理(初学)InfluxDB-stress原理InfluxDB-stress是用于InfluxDB写入压力测试的工具,最近在aliyun上配置了InfluxDB数据库,使用InfluxDB-stress工具来对其进行写入测试。我对其原理十分好奇,于是研究了一下它的源代码,总结如下InfluxDB-stress是采用vspf13/cobra构建的命令行工具。其目录如下influx-stress/cmd/influx-stress/main.goinsert.goroot.goline

大家好,欢迎来到IT知识分享网。Influx-stress原理(初学)"

InfluxDB-stress原理(初学)

InfluxDB-stress是用于InfluxDB写入压力测试的工具,最近在aliyun上配置了InfluxDB数据库,使用InfluxDB-stress工具来对其进行写入测试。我对其原理十分好奇,于是研究了一下它的源代码,总结如下

InfluxDB-stress是采用vspf13/cobra构建的命令行工具。其目录如下

influx-stress/

cmd/

influx-stress/

main.go

insert.go

root.go

lineprotocol/

point/

stress/

write/

由于vspf13/cobra构建的命令行程序其项目结构都是相同的,因此可以很轻易的找到入口文件cmd/influcdb-stress/main.go。main.go只是调用命令入口,可以不用管。每个vspf13/cobra都有一个根命令,在cmd/root.go文件中定义。在此程序中为influx-stress。

 var RootCmd = &cobra.Command{
     Use:   "influx-stress",
     Short: "Create artificial load on an InfluxDB instance",
     Long:  "",
 }

然后我们可以添加任意数目的子命令,比如在此程序中使用的insert命令,在cmd/insert.go中定义

 var insertCmd = &cobra.Command{
     Use:   "insert SERIES FIELDS",
     Short: "Insert data into InfluxDB", // better descriiption
     Long:  "",
     Run:   insertRun,
 }

其中Run为当调用此命令时所执行的函数。我们所要关注的就是这个函数。另外每个子命令文件中都有一个init函数,此函数是自动调用的,用于初始化子命令的各种flag

子命令(Command):就是需要执行的操作

参数(Arg):子命令的参数,即要操作的对象

选项(Flag):调整子命令的行为

influx-stress insert -r 30s

insert就是一个子命令,-r就是选项(这里没有用到参数)

 func init() {
     //添加命令
     RootCmd.AddCommand(insertCmd)
     //设置命令选项
     insertCmd.Flags().StringVarP(&statsHost, "stats-host", "", "http://localhost:8086", "Address of InfluxDB instance where runtime statistics will be recorded")
     insertCmd.Flags().StringVarP(&statsDB, "stats-db", "", "stress_stats", "Database that statistics will be written to")
     insertCmd.Flags().BoolVarP(&recordStats, "stats", "", false, "Record runtime statistics")
     insertCmd.Flags().StringVarP(&host, "host", "", "http://localhost:8086", "Address of InfluxDB instance")
     ...

 

当使用InfluxDB-stress工具,时,我们输入

 influx-stress insert -r 30s --pps 10000 --host http://localhost:8086

此时就会调用insertRun函数执行

在insertRun函数中根据参数pps和batchSize的值将写入分为几个批次

 concurrency := pps / batchSize
 // PPS takes precedence over batchSize.
 // Adjust accordingly.
 if pps < batchSize {
     batchSize = pps
     concurrency = 1
 }

通过

 pts := point.NewPoints(seriesKey, fieldStr, seriesN, lineprotocol.Nanosecond)

拼接好要写入的数据,保存在pts中,启动多个线程来同时写入数据

 //wg相当于一个线程计数器,先设置一个初始值,为要启动的子线程个数,每个子线程完成时让wg的值减1,当wg的值为0时唤醒主线程
 var wg sync.WaitGroup
 wg.Add(int(concurrency))
 ​
 var totalWritten uint64
 ​
 start := time.Now()
 for i := uint64(0); i < concurrency; i++ {
     go func(startSplit, endSplit int) {
         tick := time.Tick(tick)
         if fast {
             tick = time.Tick(time.Nanosecond)
         }
         cfg := stress.WriteConfig{
             BatchSize: batchSize,
             MaxPoints: pointsN / concurrency, // divide by concurreny
             GzipLevel: gzip,
             Deadline:  time.Now().Add(runtime),
             Tick:      tick,
             Results:   sink.Chan(),
         }
         // Ignore duration from a single call to Write.
         pointsWritten, _ := stress.Write(pts[startSplit:endSplit], c, cfg)
         //使用原子操作来记录写入point总数,原子操作可以避免数据竞争
         atomic.AddUint64(&totalWritten, pointsWritten)
         wg.Done()
     }(startSplit, endSplit)
     startSplit = endSplit
     endSplit += inc
 }
 wg.Wait()   //使主线程一直阻塞直到所有子线程完成

采用sync/waitgroup来使主线程一直阻塞直到所有子线程完成。之后就是输出各种写入统计信息

 totalTime := time.Since(start)           //记录运行时间
 if err := c.Close(); err != nil {
     fmt.Fprintf(os.Stderr, "Error closing client: %v\n", err.Error())
 }
 ​
 sink.Close()
 throughput := int(float64(totalWritten) / totalTime.Seconds())
 if quiet {
     fmt.Println(throughput)
 } else {
     fmt.Println("Write Throughput:", throughput)
     fmt.Println("Points Written:", totalWritten)
 }

在每个写入子线程中,调用stress.Write来写入数据。

 pointsWritten, _ := stress.Write(pts[startSplit:endSplit], c, cfg)

该函数首先将整个该批次的数据写入到一个缓冲区中,可以选择对数据进行压缩,压缩采用的是compress/gzip。

 buf := bytes.NewBuffer(nil)
 var w io.Writer = buf
 //如果cfg.GzipLevel不为0则进行压缩,有多种压缩水平
 //NoCompression      = 0
 //BestSpeed          = 1
 //BestCompression    = 2
 //DefaultCompression = 3
 //HuffmanOnly        = 4
 doGzip := cfg.GzipLevel != 0
 var gzw *gzip.Writer
 if doGzip {
     var err error
     gzw, err = gzip.NewWriterLevel(w, cfg.GzipLevel)  //进行压缩
     if err != nil {
         // Should only happen with an invalid gzip level?
         panic(err)
     }
     w = gzw
 }
 //将一个批次的数据汇总一起写入
 for _, pt := range pts {
     pointCount++
     pt.SetTime(t)
     lineprotocol.WritePoint(w, pt)  //将一行数据写入w
     if pointCount%cfg.BatchSize == 0 {        //一个批次
         if doGzip {
             // Must Close, not Flush, to write full gzip content to underlying bytes buffer.
             if err := gzw.Close(); err != nil {
                 panic(err)
             }
         }
         sendBatch(c, buf, cfg.Results)    //此处才是真正的写入
         if doGzip {
             // sendBatch already reset the bytes buffer.
             // Reset the gzip writer to start clean.
             gzw.Reset(buf)
         }
 ​
         t = <-cfg.Tick
         if t.After(cfg.Deadline) {
             break WRITE_BATCHES
         }
 ​
         if pointCount >= cfg.MaxPoints {
             break
         }
 ​
     }
     // Update increments the value of all of the Int and Float
     // fields by 1.
     pt.Update()
 }

汇总后调用sendBatch来写入数据

 func sendBatch(c write.Client, buf *bytes.Buffer, ch chan<- WriteResult) {
    lat, status, body, err := c.Send(buf.Bytes())   //发送数据到influxdb
    buf.Reset()   //清空缓存,但并不释放
    select {
    case ch <- WriteResult{LatNs: lat, StatusCode: status, Body: body, Err: err, Timestamp: time.Now().UnixNano()}:
    default:
    }
 }

在sendBatch中调用Send把数据发送到InfluxDB,采用的是http post方法。不过并没有使用net/http,而是使用了更高性能的valyala/fasthttp(号称比net/http快10倍)。

 func (c *client) Send(b []byte) (latNs int64, statusCode int, body string, err error) {
    //采用http post的方式向influxdb写入数据
    req := fasthttp.AcquireRequest()
    req.Header.SetContentTypeBytes([]byte("text/plain"))
    req.Header.SetMethodBytes([]byte("POST"))
    req.Header.SetRequestURIBytes(c.url)
    if c.cfg.Gzip {
       req.Header.SetBytesKV([]byte("Content-Encoding"), []byte("gzip"))
    }
    req.Header.SetContentLength(len(b))
    req.SetBody(b)    //http 请求实体
    resp := fasthttp.AcquireResponse()
    start := time.Now()
    do := fasthttp.Do
    if c.httpClient != nil {
       do = c.httpClient.Do
    }
    err = do(req, resp)  //发起请求
    latNs = time.Since(start).Nanoseconds()
    statusCode = resp.StatusCode()
    // Save the body.
    if statusCode != http.StatusNoContent {
       body = string(resp.Body())
    }
    //释放连接
    fasthttp.ReleaseResponse(resp)
    fasthttp.ReleaseRequest(req)
    return
 }

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/14636.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信