大家好,欢迎来到IT知识分享网。
InfluxDB-stress原理(初学)
InfluxDB-stress是用于InfluxDB写入压力测试的工具,最近在aliyun上配置了InfluxDB数据库,使用InfluxDB-stress工具来对其进行写入测试。我对其原理十分好奇,于是研究了一下它的源代码,总结如下
InfluxDB-stress是采用vspf13/cobra构建的命令行工具。其目录如下
influx-stress/
cmd/
influx-stress/
main.go
insert.go
root.go
lineprotocol/
…
point/
…
stress/
…
write/
…
由于vspf13/cobra构建的命令行程序其项目结构都是相同的,因此可以很轻易的找到入口文件cmd/influcdb-stress/main.go。main.go只是调用命令入口,可以不用管。每个vspf13/cobra都有一个根命令,在cmd/root.go文件中定义。在此程序中为influx-stress。
var RootCmd = &cobra.Command{
Use: "influx-stress",
Short: "Create artificial load on an InfluxDB instance",
Long: "",
}
然后我们可以添加任意数目的子命令,比如在此程序中使用的insert命令,在cmd/insert.go中定义
var insertCmd = &cobra.Command{
Use: "insert SERIES FIELDS",
Short: "Insert data into InfluxDB", // better descriiption
Long: "",
Run: insertRun,
}
其中Run为当调用此命令时所执行的函数。我们所要关注的就是这个函数。另外每个子命令文件中都有一个init函数,此函数是自动调用的,用于初始化子命令的各种flag
子命令(Command):就是需要执行的操作
参数(Arg):子命令的参数,即要操作的对象
选项(Flag):调整子命令的行为
influx-stress insert -r 30s
insert就是一个子命令,-r就是选项(这里没有用到参数)
func init() {
//添加命令
RootCmd.AddCommand(insertCmd)
//设置命令选项
insertCmd.Flags().StringVarP(&statsHost, "stats-host", "", "http://localhost:8086", "Address of InfluxDB instance where runtime statistics will be recorded")
insertCmd.Flags().StringVarP(&statsDB, "stats-db", "", "stress_stats", "Database that statistics will be written to")
insertCmd.Flags().BoolVarP(&recordStats, "stats", "", false, "Record runtime statistics")
insertCmd.Flags().StringVarP(&host, "host", "", "http://localhost:8086", "Address of InfluxDB instance")
...
当使用InfluxDB-stress工具,时,我们输入
influx-stress insert -r 30s --pps 10000 --host http://localhost:8086
此时就会调用insertRun函数执行
在insertRun函数中根据参数pps和batchSize的值将写入分为几个批次
concurrency := pps / batchSize
// PPS takes precedence over batchSize.
// Adjust accordingly.
if pps < batchSize {
batchSize = pps
concurrency = 1
}
通过
pts := point.NewPoints(seriesKey, fieldStr, seriesN, lineprotocol.Nanosecond)
拼接好要写入的数据,保存在pts中,启动多个线程来同时写入数据
//wg相当于一个线程计数器,先设置一个初始值,为要启动的子线程个数,每个子线程完成时让wg的值减1,当wg的值为0时唤醒主线程
var wg sync.WaitGroup
wg.Add(int(concurrency))
var totalWritten uint64
start := time.Now()
for i := uint64(0); i < concurrency; i++ {
go func(startSplit, endSplit int) {
tick := time.Tick(tick)
if fast {
tick = time.Tick(time.Nanosecond)
}
cfg := stress.WriteConfig{
BatchSize: batchSize,
MaxPoints: pointsN / concurrency, // divide by concurreny
GzipLevel: gzip,
Deadline: time.Now().Add(runtime),
Tick: tick,
Results: sink.Chan(),
}
// Ignore duration from a single call to Write.
pointsWritten, _ := stress.Write(pts[startSplit:endSplit], c, cfg)
//使用原子操作来记录写入point总数,原子操作可以避免数据竞争
atomic.AddUint64(&totalWritten, pointsWritten)
wg.Done()
}(startSplit, endSplit)
startSplit = endSplit
endSplit += inc
}
wg.Wait() //使主线程一直阻塞直到所有子线程完成
采用sync/waitgroup来使主线程一直阻塞直到所有子线程完成。之后就是输出各种写入统计信息
totalTime := time.Since(start) //记录运行时间
if err := c.Close(); err != nil {
fmt.Fprintf(os.Stderr, "Error closing client: %v\n", err.Error())
}
sink.Close()
throughput := int(float64(totalWritten) / totalTime.Seconds())
if quiet {
fmt.Println(throughput)
} else {
fmt.Println("Write Throughput:", throughput)
fmt.Println("Points Written:", totalWritten)
}
在每个写入子线程中,调用stress.Write来写入数据。
pointsWritten, _ := stress.Write(pts[startSplit:endSplit], c, cfg)
该函数首先将整个该批次的数据写入到一个缓冲区中,可以选择对数据进行压缩,压缩采用的是compress/gzip。
buf := bytes.NewBuffer(nil)
var w io.Writer = buf
//如果cfg.GzipLevel不为0则进行压缩,有多种压缩水平
//NoCompression = 0
//BestSpeed = 1
//BestCompression = 2
//DefaultCompression = 3
//HuffmanOnly = 4
doGzip := cfg.GzipLevel != 0
var gzw *gzip.Writer
if doGzip {
var err error
gzw, err = gzip.NewWriterLevel(w, cfg.GzipLevel) //进行压缩
if err != nil {
// Should only happen with an invalid gzip level?
panic(err)
}
w = gzw
}
//将一个批次的数据汇总一起写入
for _, pt := range pts {
pointCount++
pt.SetTime(t)
lineprotocol.WritePoint(w, pt) //将一行数据写入w
if pointCount%cfg.BatchSize == 0 { //一个批次
if doGzip {
// Must Close, not Flush, to write full gzip content to underlying bytes buffer.
if err := gzw.Close(); err != nil {
panic(err)
}
}
sendBatch(c, buf, cfg.Results) //此处才是真正的写入
if doGzip {
// sendBatch already reset the bytes buffer.
// Reset the gzip writer to start clean.
gzw.Reset(buf)
}
t = <-cfg.Tick
if t.After(cfg.Deadline) {
break WRITE_BATCHES
}
if pointCount >= cfg.MaxPoints {
break
}
}
// Update increments the value of all of the Int and Float
// fields by 1.
pt.Update()
}
汇总后调用sendBatch来写入数据
func sendBatch(c write.Client, buf *bytes.Buffer, ch chan<- WriteResult) {
lat, status, body, err := c.Send(buf.Bytes()) //发送数据到influxdb
buf.Reset() //清空缓存,但并不释放
select {
case ch <- WriteResult{LatNs: lat, StatusCode: status, Body: body, Err: err, Timestamp: time.Now().UnixNano()}:
default:
}
}
在sendBatch中调用Send把数据发送到InfluxDB,采用的是http post方法。不过并没有使用net/http,而是使用了更高性能的valyala/fasthttp(号称比net/http快10倍)。
func (c *client) Send(b []byte) (latNs int64, statusCode int, body string, err error) {
//采用http post的方式向influxdb写入数据
req := fasthttp.AcquireRequest()
req.Header.SetContentTypeBytes([]byte("text/plain"))
req.Header.SetMethodBytes([]byte("POST"))
req.Header.SetRequestURIBytes(c.url)
if c.cfg.Gzip {
req.Header.SetBytesKV([]byte("Content-Encoding"), []byte("gzip"))
}
req.Header.SetContentLength(len(b))
req.SetBody(b) //http 请求实体
resp := fasthttp.AcquireResponse()
start := time.Now()
do := fasthttp.Do
if c.httpClient != nil {
do = c.httpClient.Do
}
err = do(req, resp) //发起请求
latNs = time.Since(start).Nanoseconds()
statusCode = resp.StatusCode()
// Save the body.
if statusCode != http.StatusNoContent {
body = string(resp.Body())
}
//释放连接
fasthttp.ReleaseResponse(resp)
fasthttp.ReleaseRequest(req)
return
}
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/14636.html