[转帖]Influxdb 2.x 快速入门

influxdb,快速,入门 · 浏览次数 : 0

小编点评

1.x 和 2.x备份数据不兼容。以下为2.x的备份方法influx backup <back-path> -t <root-token># 例:influx backup ./back -t 58s6gl9hD8lk-AS_i6mUaYMMCGe6N1vIfVpJUo2xJ2HkWMlWx2yp7r7IKZsyF6h8vQdTPfIpGyHtbALayLgUQw==token 可以在管理界面Tokens处找到恢复influx 使用influx命令进行数据恢复influx restore <back-path>#例:influx restore ./back恢复指定 bucketinflux restore <back-path> --bucket exmple-bucket如果恢复的 bucket 名称已经在现有数据库中存在 则使用--new-bucket 为恢复的数据库指定一个新名称并将数据恢复到新名称的bucket中关于无法启动UI管理界面的问题问题描述这个不一定每个人都能遇到。笔者的电脑现在无法启动UI界面,但在虚拟机中是可以启动的。推测是我在第一次启动时在配置文件中手动指定了静态资源位置,而此时该位置是一个无效路径。解决办法UI界面在初始化过程中报错可自己下载UI项目手动构建后,由influx配置文件指定静态资源路径解决。UI项目Github地址(网页地址不要直接clone!!!)git clone https://github.com/influxdata/ui.gityarn build需要注意的是无论是build还是 start指令都要在linux或mac os 上进行,因为构建和启动指令包含对 export 指令的调用 windows使用set无法兼容。期间可能会有网络问题,可以根据报错手动下在所需文件放在根目录下即可。构建完成后在influxd启动目录创建 config.yml 文件并键入以下内容assets-path: 构建结果路径influxdb默认会读取启动目录下的config.yml文件,启动后前端访问的就是自构建的UI了其他更多API信息详见influxdb官方文档其他语言客户端库C#GolangJavaScript BrowserJavaScript Node.jsPython

正文

Influxdb 2.x 快速入门
https://www.jianshu.com/p/268fca65f10e

 

Influxdb是由Golang 构建的时序数据库,由于由Go语言构建使得其跨平台部署相对方便。用户只需下载其可执行文件到相应系统执行即可。

核心概念

示例数据(解释某些概念用)

_time_measurementlocationscientist_field_value
2019-08-18T00:00:00Z census klamath anderson bees 23
2019-08-18T00:00:00Z census portland mullen ants 30
2019-08-18T00:06:00Z census klamath anderson bees 28
2019-08-18T00:06:00Z census portland mullen ants 32

Organization

organization 是一组用户的工作空间,一个组下用户可以创建多个bucket

bucket

所有的 influxdb数据都存储在bucket中,bucket结合了数据库和保存期限(每条数据会被保留的时间)的概念,类似于RDMS的database的概念。bucket属于一个organization

Measurement

measurement是所有 tags fields 和时间的容器和RDMS的table的概念类似,是一个数据集的容器

Fields

数据属性包括field key 和 field value 分别存储在 _field和 _value当中, 并且一个measurement中必须包含至少一个filed

Field key

field key 是一个代表属性名称的字段,在示例数据中beesants就是field key

Field value

field value 是对应 field key 的值,在示例数据中在2019-08-18T00:00:00Z该时间点 bees的值为23,而ants的值为30

Field set

field set 表示在同一时间内 所有fields的集合

Tags

和Fields类似,Tags也有 key value。但与Fields不同的是,field key存储在_field列中 而tag key则是本省就是列

tag key 和 tag value

即tag 的 key 和 value 在Line Protocl中有更为直观的体现

timestamp

所有存储在influxdb中的数据都有一个_time列用来记录时间,在磁盘中以纳秒之间戳存储,但客户端查询时返回的是格式化的更易读的 RFC3339 UTC时间格式

对于Fields 和 Tags 简单来说他们都是一组键值对的集合。在存储形式上,field 的key 被存储在一个名为_field的列中,而tag 的key则是以列头的形式存在的,该列的内容即为tag value。可以从示例数据中直观的看出其区别。

另外值得注意的是,field 和 tag 都可以用来存储数据,但tag只能存储字符串类型数据,而filed既可以存储字符串类型又可以存储数值类型数据。

那么我是否可以在tag中存储字符串形式的数据呢?

当然可以但其不可以参与flux查询中的mean() max() 等聚合函数的计算。所以field 和 tag 还是有着本质上的区别的。

安装

手动安装

influxdb 安装比较简单直接下载可执行文件运行即可

Windows下载链接

Linux下载链接

解压后直接运行influxd即可,服务启动后访问 localhost:8086 进行首次配置。

docker

使用docker方式运行时可直接docker run访问IP:8086进行初始化,也可在启动容器时初始化。

若想启动时初始化可按如下命令启动

docker run -d -p 8086:8086 \
      -e DOCKER_INFLUXDB_INIT_MODE=setup \
      -e DOCKER_INFLUXDB_INIT_USERNAME=alming \
      -e DOCKER_INFLUXDB_INIT_PASSWORD=wuxianming \
      -e DOCKER_INFLUXDB_INIT_ORG=alming \
      -e DOCKER_INFLUXDB_INIT_BUCKET=alming \
      influxdb:2.0

-e DOCKER_INFLUXDB_INIT_MODE=setup 指其定为初始化模式。

-e DOCKER_INFLUXDB_INIT_USERNAME=alming 指定用户名

-e DOCKER_INFLUXDB_INIT_PASSWORD=wuxianming 指定密码

-e DOCKER_INFLUXDB_INIT_ORG=alming 创建初始org

-e DOCKER_INFLUXDB_INIT_BUCKET=alming 创建初始bucket

influx CLI

为了避免不必要的错误以下influx命令均手动传递token,也可以使用influx config命令将一些常用选项配置在配置文件中,则每次执行命令会默认带上influx config 指定的flag,例如

influx config create --active -t <your-token>
# 执行过后,所有需要该token的指令就不需要指定token了

用户管理

创建用户

influx user create -n <username> -p <password> -o <org-name> -t <your-token>

查看用户

influx user list -t <your-token>

删除用户

influx user delete -t <user-id> -t <your-token>

更新用户

influx user update -i <user-id> -n <new-username> -t <your-token>

修改密码

influx user password -n <username> -t <your-token>
# 指令执行后,CLI会引导密码修改

oganization管理

官方建议一个influxdb实例中建组不超过20个,因为influxdb支持大约20个bucket的写入或查询,超过这个值将会对influxdb性能造成影响

创建oganization

创建组有两种方式一种是使用UI管理界面创建,一种是使用influx命令。UI界面根据引导点击就行了

influx命令

influx org create <org-name> -t <your-token>

查看oganization

influx org list -t <your-token>

更新oganization

influx org update -i <org-id> -n <new-org-name>

删除oganization

influx org delete -i <org-id>

bucket管理

bucket内的数据是有保存期限的influxdb称之为retention-period-duration,bucket在创建之初就会指定

bucket创建

influx bucket create -n <bucket-name> -o <org-name> -r <retention-period-duration> -t <your-token>
# -o 指定organization名称,-r指定数据保留时间
# 数据保留时间单位在以下单位中取值:
#纳秒 (ns)
#微秒 (us or µs)
#毫秒 (ms)
#秒 (s)
#分 (m)
#时 (h)
#日 (d)
#周 (w)

#示例
influx bucket create -n test-bucket -o wxm -r 72h5m -t 58s6gl9hD8lk-AS_i6mUaYMMCGe6N1vIfVpJUo2xJ2HkWMlWx2yp7r7IKZsyF6h8vQdTPfIpGyHtbALayLgUQw==

bucket更新

更新bucket名称

influx bucket update -i <bucket-id> -n <new-bucket-name>

更新数据保存时间

influx bucket update -i <bucket-id> -r <retention period with units>

bucket查看

influx bucket list -o <org-name or org-id> -t <your-token>

bucket删除

通过名称删除

influx bucket delete -n <bucket-name> -o <org-name>

通过id删除

influx bucket delete -i <bucket-id>

Line Protocol(数据写入协议)

通过Post请求方式向influxdb中写入数据进行测试,可使用Postman 详细见 Postman Influx http api 调用 章节。

Line Protocol 语法

# 以下取自官方文档
# 语法
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]

# 示例
myMeasurement,tag1=value1,tag2=value2 fieldKey="fieldValue" 1556813561098000000

一个\n代表一条数据协议的结束,所以数据内容不支持\n

Line Protocol 支持部分特殊字符但需要进行转义

Flux查询语言

Flux 可归类为一门新的编程语言,这里仅介绍针对查询需要的一些常用函数和操作,了解更多Flux见Flux官方文档

大多数Flux查询遵循如下几个步骤

  • 指定数据源
  • 过滤数据
  • 对数据整形(重新组织数据结构)
  • 数据内容计算
//例
from(bucket: "example-bucket")            // ── Source
  |> range(start: -1d)                    // ── Filter on time
  |> filter(fn: (r) => r._field == "foo") // ── Filter on column values
  |> group(columns: ["sensorID"])         // ── Shape
  |> mean()                               // ── Process

flux查询语言看起来会像下面这样

from(bucket: "wxm-influx")
  |> range(start: 0)
  |> filter(fn:(r)=>(r.owner=="wxm"))
  |> window(every: 1s)
  |> mean()

其中 from,range,filter等以下称作查询函数。flux查询语言允许使用|>分割符将上一个函数的输出传递给下一个函数,类似于linux的管道

基础规定

每个查询语句必须包含数据源,时间区间和过滤器。对应关键字分别为 from,rangefilter。实际上不包含filter也是可以的,可以理解为filter过滤条件就是all

Flux 标准库(部分)

该部分列举常用flux标准库函数,根据用法可举一反三使用所有标准库函数

buckets

buckets()返回当前组中所有的桶

from

指定从那个bucket查询数据

from(bucket:"bucket名称")

range

指定时间区间,range接收两个参数,startstop 其中stop可以省略,缺省值是当前时间。start不能省略,否则会报语法错误。

startstop 既可以使用相对的时间区间,也可以使用绝对的时间戳。相对时间区间格式为 时间间隔h/m/s例如 -1h,-5m。时间戳格式2021-01-01T00:00:00Z

#示例1
from(bucket:"bucket名称")
  |> range(start: -1h)
#示例2
from(bucket:"bucket名称")
  |> range(start: 2021-01-01T00:00:00Z, stop: 2021-01-01T12:00:00Z)

filter

对数据进行过滤,接收一个判断函数。类似于java中的lamda,它看起来像是下面这样

(r) => (r.recordProperty comparisonOperator comparisonExpression)

判断函数返回一个boolean类型值。只有符合条件的记录才会被返回。

#示例
from(bucket: "wxm-influx")
  |> range(start: -24h)
  |> filter(fn:(r)=>(r.owner=="wxm"))
#取最近24小时,含有owner标签且值为wxm的记录

当filter内含有多个条件时可用 and 或 or 连接各个条件

#示例
from(bucket: "wxm-influx")
  |> range(start: -24h)
  |> filter(fn:(r)=>(r.owner=="wxm" or r.key=="indoor"))
#取最近24小时,含有owner标签且值为wx或含有标签key且值为indoor的记录

contains

参数

value 需要比较的值

set被比较的集合

判断value是否包含在set中。

#示例
fiels = ["wxm","ali"]
from(bucket: "wxm-influx")
  |> range(start: 2022-03-12T08:14:04Z,stop:2022-03-13T08:19:05Z)
  |> filter(fn:(r)=>contains(value:r.owner,set:fiels))

window

window函数根据时间将数据分组.

参数

every:

every 指定每个时间窗口的时间,例如 every = 5 那么 时间窗口可以是 0m-5m,5m-10m,10m-15m。默认为 period 的值

period

period 明确在每个时间窗口中需要从时间窗口起始到多久的数据,例如时间窗口为10m-15m period = 3 则只取10m-13m的数据,13m-15m的数据会被抛弃。默认为 every 的值

offset

offset 指定每个时间窗口的时间偏移量,例如时间窗口为10m-15m offset = 3 那么会取 13m-15m的数据,10m-13m的数据会被抛弃,但注意当offset = every 时 offset不生效。且offset大于every 时 生效offset = offset % every

min

参数

column 求最小值的列,默认为_value

取每个分组中最小值

max

参数

column 求最大值的列,默认为_value

取每个分组最大值

mean

参数

column 求平均值的列,默认为_value

对每个分组的数据求平局值

bottom

参数

columns 按那些列进行排序

n 返回最上层 n 条数据

根据columns进行排序并返回最上层 n 条记录

count

参数

columns 按那列统计个数

统计数据个数

cumulativeSum

参数

columns 按那列进行累加

根据给定列对该列数据进行累加,模拟输出忽略 _start, _stop 等列:

原始数据:

_measurement_field62
temperature wxm 62
temperature wxm 65
temperature wxm 81.5
temperature wxm 81.5

cumulativeSum函数执行后:

_measurement_field62
temperature wxm 62
temperature wxm 127
temperature wxm 208.5
temperature wxm 290

group

参数

columns 按那些列进行分组操作 默认值为[]

mode:

mode 可以是以下值

"by",按 columns 内指定的列进行分组

"expect",按除了columns 列中指定的列进行分组

columns

参数

column存放列标签名的列。

返回所有列标签名并存储在指定的列中默认为_value 通过column参数指定

derivative

参数

unit 求多长时间内的变化速率。变化速率=(下一个值-上一个值)/(下一个时间-上一个时间)* unit 默认值1s

nonNegative变化速率是否可以是负值,如果是负数influxdb会假定前一个值为0 默认值true

columns指定计算变化速率的列 默认值 ["_value"]

timeColumn手动指定时间列 默认值 "_time"

difference

参数

nonNegative是否允许差值为负数,如果是负数influxdb会假定前一个值为0 默认值false

计算相邻两行的差值(next-pre)

keepFirst是否保留第一行,默认值为false

distinct

参数

column 指定按那列去重

按指定列去重

influxdb-client-java

官方Client仓库地址

Spring boot 集成

引入influxdb-client-java依赖
maven

<dependency>
  <groupId>com.influxdb</groupId>
  <artifactId>influxdb-client-java</artifactId>
  <version>4.0.0</version>
</dependency>

gradle

dependencies {
    compile "com.influxdb:influxdb-client-java:4.0.0"
}

集成方式一

在配置文件中指定influxdb url,token,org,bucket信息

influx:
  url: http://172.21.220.141:8086
  token: 58s6gl9hD8lk-AS_i6mUaYMMCGe6N1vIfVpJUo2xJ2HkWMlWx2yp7r7IKZsyF6h8vQdTPfIpGyHtbALayLgUQw==
  bucket: wxm-influx
  org: wxm

添加influx配置类

//InfluxConfig.java
@Configuration
@ConfigurationProperties(prefix = "influx")
public class InfluxConfig {
    private String url;
    private String token;
    private String org;
    private String bucket;
    @Bean
    public InfluxDBClient influxDBClient(){
        return InfluxDBClientFactory.create(url, token.toCharArray(),org,bucket);
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public void setToken(String token) {
        this.token = token;
    }

    public void setOrg(String org) {
        this.org = org;
    }

    public void setBucket(String bucket) {
        this.bucket = bucket;
    }
}

随后便可以在其他位置注入并使用InfluxDBClient

集成方式二

再resource目录下创建名为influx2.properties的配置文件并添加如下配置

influx2.url=http://172.23.64.25:8086
influx2.org=wxm
influx2.token=58s6gl9hD8lk-AS_i6mUaYMMCGe6N1vIfVpJUo2xJ2HkWMlWx2yp7r7IKZsyF6h8vQdTPfIpGyHtbALayLgUQw==
influx2.bucket=wxm-influx

添加influx配置类

@Configuration
public class InfluxConfigProperties {
    @Bean
    public InfluxDBClient influxDBClient() {
        return InfluxDBClientFactory.create();
    }
}

个人推荐使用第二种方式,influx2.properties还可用于配置默认tag

Write Data

  • Write by Line Protocl
try(WriteApi writeApi = influxDBClient.makeWriteApi()) {
    String val = "temperature,location=north wxm=69.0";
    writeApi.writeRecord(WritePrecision.NS,val);
}
  • Write by POJO
//Temperature.java
@Measurement(name = "temperature")
public class Temperature {
    @Column(tag = true)
    private String location;
    @Column
    private Double value;
    @Column(timestamp = true)
    Instant time;

    getterAndSeter.....
}

try (WriteApi writeApi = influxDBClient.makeWriteApi()) {
    Temperature temperature = new Temperature();
    temperature.setLocation("east");
    temperature.setValue(106.2D);
    temperature.setTime(Instant.now());
    writeApi.writeMeasurement(WritePrecision.NS,temperature);
}
  • Write Data by Data Point
try (WriteApi writeApi = influxDBClient.makeWriteApi()) {
    Point point = Point.measurement("temperature")
            .addTag("location","south")
            .addTag("owner","wxm")
            .addField("wxm",230.8);
    writeApi.writePoint(point);
}

写数据时为所有数据指定默认标签
指定默认标签有两种方式,通过配置文件和通过编码配置

  1. 通过配置文件

在influx2.properties中指定默认标签及标签值

# 自定义默认标签
influx2.tags.customer=ALMING
  1. 通过编码配置

首先编写配置

private InfluxDBClientOptions influxDBClientOptions() {
  return InfluxDBClientOptions.builder()
          .url("http://172.23.64.25:8086")
          .org("wxm")
          .bucket("wxm-influx")
          .authenticateToken("58s6gl9hD8lk-AS_i6mUaYMMCGe6N1vIfVpJUo2xJ2HkWMlWx2yp7r7IKZsyF6h8vQdTPfIpGyHtbALayLgUQw==".toCharArray())
          .addDefaultTag("id", "wxm-id")
          .build();
}

随后在创建客户端时指定配置

@Bean
public InfluxDBClient influxDBClient() {
  return InfluxDBClientFactory.create(influxDBClientOptions());
}

需要注意的是使用编码配置会覆盖默认从配置文件读取的InfluxDBClientOptions,所以使用编码方式指定配置文件会失效

  • 同步阻塞写入
    以上写入都是异步写入,influxdb同时支持同步阻塞写入。使用如下方式获取阻塞写入API
WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking();

Query Data

  • 查询返回FluxTable
String flux = "from(bucket: \"wxm-influx\")\n" +
       "  |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" +
       "  |> filter(fn:(r)=>(r.owner==\"wxm\"))";
List<FluxTable> tables = influxDBClient.getQueryApi().query(flux);
for (FluxTable fluxTable : tables) {
   List<FluxRecord> records = fluxTable.getRecords();
   for (FluxRecord fluxRecord : records) {
       System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));
   }
}
  • 查询结果影射成为POJO
String flux = "from(bucket: \"wxm-influx\")\n" +
        "  |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" +
        "  |> filter(fn:(r)=>(r.owner==\"wxm\"))";
List<Temperature> query = influxDBClient.getQueryApi().query(flux, Temperature.class);
query.forEach(System.out::println);
  • 异步查询
String flux = "from(bucket: \"wxm-influx\")\n" +
        "  |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" +
        "  |> filter(fn:(r)=>(r.owner==\"wxm\"))";
influxDBClient.getQueryApi().query(flux,
        (cancellable, fluxRecord) -> {
            System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value"));
        },
        throwable -> {
            //invoke when exception
            System.out.println("Error");
        },
        () -> {
            //complete
            System.out.println("complete");
        });
System.out.println("Query invoke done");
TimeUnit.SECONDS.sleep(3);
/**
 * 输出结果:
 * Query invoke done
 * 2022-03-12T08:18:16.543834Z: 62.0
 * 2022-03-12T08:19:44.833253500Z: 65.0
 * 2022-03-12T09:25:43.388488100Z: 81.5
 * complete
 */
  • 异步查询并映射成为POJO
String flux = "from(bucket: \"wxm-influx\")\n" +
        "  |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" +
        "  |> filter(fn:(r)=>(r.owner==\"wxm\"))";
influxDBClient.getQueryApi().query(flux,Temperature.class,
        (cancellable, fluxRecord) -> {
            System.out.println(fluxRecord);
        },
        throwable -> {
            //invoke when exception
            System.out.println("Error");
        },
        () -> {
            //complete
            System.out.println("complete");
        });
System.out.println("Query invoke done");
TimeUnit.SECONDS.sleep(3);
/**
 * 输出结果:
 * Query invoke done
 * Temperature(location=north, value=62.0, time=2022-03-12T08:18:16.543834Z)
 * Temperature(location=north, value=65.0, time=2022-03-12T08:19:44.833253500Z)
 * Temperature(location=north, value=81.5, time=2022-03-12T09:25:43.388488100Z)
 * complete
 */
  • 查询并返回源CSV格式数据
    返回一个字符串,其内容是以逗号为分隔的CSV格式。使用Postman进行测试时返回就是该格式。
String flux = "from(bucket: \"wxm-influx\")\n" +
        "  |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" +
        "  |> filter(fn:(r)=>(r.owner==\"wxm\"))";
String raw = influxDBClient.getQueryApi().queryRaw(flux);
System.out.println(raw);
  • 源数据异步查询
String flux = "from(bucket: \"wxm-influx\")\n" +
        "  |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" +
        "  |> filter(fn:(r)=>(r.owner==\"wxm\"))";
influxDBClient.getQueryApi().queryRaw(flux,
        (cancellable, fluxRecord) -> {
            System.out.println(fluxRecord);
        },
        throwable -> {
            //invoke when exception
            System.out.println("Error");
        },
        () -> {
            //complete
            System.out.println("complete");
        });
System.out.println("Query invoke done");
TimeUnit.SECONDS.sleep(3);
/**
 * 输出结果:
 * Query invoke done
 * ,result,table,_start,_stop,_time,_value,_field,_measurement,location,owner
 * ,_result,0,2022-03-12T08:18:04Z,2022-03-13T08:19:05Z,2022-03-12T08:18:16.543834Z,62,wxm,temperature,north,wxm
 * ,_result,0,2022-03-12T08:18:04Z,2022-03-13T08:19:05Z,2022-03-12T08:19:44.8332535Z,65,wxm,temperature,north,wxm
 * ,_result,0,2022-03-12T08:18:04Z,2022-03-13T08:19:05Z,2022-03-12T09:25:43.3884881Z,81.5,wxm,temperature,north,wxm
 *
 * complete
 */

Delete Data

OffsetDateTime start = OffsetDateTime.now().minus(24, ChronoUnit.HOURS);
OffsetDateTime stop = OffsetDateTime.now();
influxDBClient.getDeleteApi().delete(start,stop,"","wxm-influx","wxm");

flux-dsl

官方文档中有一章节是描写如何使用flux-dsl来构建flux语句,但在使用过程中发现官方使用的Flux类在当前情况下并不能被引入,实际influxdb-client-java这个依赖也确实不包含Flux。所以需要手动构建 flux-dsl module 并在项目中引用。

flux-dsl 模块构建

  1. clone 官方仓库地址,并在idea中引入。
  2. 进入命令行在项目根目录执行mvn install 如果时windows系统该指令会失败但不用担心,flux-dsl模块需要的module已经构建成功。 然后cd到flux-dsl目录下执行mvn clean install
  3. 成功执行后在本台电脑上其他项目中以如下GAV引入maven依赖。


     

图片截取自flux-dsl模块pom.xml,版本号(V)根据下载的项目版本进行填写
在其他项目中引用如下

<dependency>
   <groupId>com.influxdb</groupId>
   <artifactId>flux-dsl</artifactId>
   <version>3.5.0-SNAPSHOT</version>
</dependency>

gradle用户需先在repositories下添加 mavenLocal()然后在依赖中引入如下

repositories {
    ...
    mavenLocal()
}
dependencies {
    ...
    implementation 'com.influxdb:flux-dsl:4.3.0-SNAPSHOT'
}

flux-dsl 使用

  • 使用内建函数
Flux flux = Flux
    .from("wxm-influx")
    .window(15L, ChronoUnit.MINUTES, 20L, ChronoUnit.SECONDS)
    .sum();

/**
 * from(bucket:"wxm-influx")
 *  |> window(every:15m, period:20s)
 *  |> sum()
 */
  • 使用内建函数属性
SumFlux flux = Flux.from("wxm-influx")
                .window()
                .withEvery(15L, ChronoUnit.MINUTES)
                .withPeriod(20L, ChronoUnit.SECONDS)
                .sum();
/**
 * from(bucket:"wxm-influx")
 *  |> window(every:15m, period:20s)
 *  |> sum()
 */
  • 手动声明属性名称
SumFlux flux = Flux.from("wxm-influx")
                .window()
                .withPropertyValue("every", 15L, ChronoUnit.MINUTES)
                .withPropertyValue("period", 20L, ChronoUnit.SECONDS)
                .sum();
/**
 * from(bucket:"wxm-influx")
 *  |> window(every:15m, period:20s)
 *  |> sum()
 */               
  • 声明一个可重用的属性集合
Map<String, Object> properties = new HashMap<>();
properties.put("every", new TimeInterval(15L, ChronoUnit.MINUTES));
properties.put("period", new TimeInterval(20L, ChronoUnit.SECONDS));

Flux flux = Flux
        .from("telegraf")
        .window()
        .withPropertyNamed("every")
        .withPropertyNamed("period")
        .sum();
//使用时指定使用的属性集
System.out.println(flux.toString(properties));
/**
 * from(bucket:"telegraf")
 *  |> window(every:15m, period:20s)
 *  |> sum()
 */
  • 自定义表达式
Flux flux = Flux.from("wxm-influx")
                .expression("filter(fn:(r)=>(r.owner==\"wxm\"))")
                .expression("sum()");
/**
 * from(bucket:"wxm-influx")
 *  |> filter(fn:(r)=>(r.owner=="wxm"))
 *  |> sum()
 */

其余函数都比较类似,详见flux-dsl README

influxdb task

influxdb支持创建任务,可选周期执行或以cron表达式定义的时间执行。执行任务的内容为自定义的一段flux脚本。
通过UI创建一个简单任务
选择Task选项卡 > create task > 定义task执行规则,编写task脚本。

 

 

influxdb alerts

alerts中有三个核心概念 Checks,Notification Endpoint 和 Notification Rules。
Checks 负责检测报警并将检测结果分级。
Notification Endpoint 负责定义通知切点,即报警通知会发送到哪里
Notification Rules 负责绑定 Checks 和 Notification Endpoint

Checks

Checks 分为两种,一种是threshold Checks 一种是deadman Checks,Checks会周期执行一段指定查询脚本,并查看结果是否匹配其Checks规则,一旦匹配就会生成一条Checks记录。
threshold checks
threshold alert 为某个点指定阈值和比较规则(大于小于等),判断查询结果是否符合阈值表达式

 
threshold-check.png

 

deadman alert
deadman alert 在测试时并没有符合预期,按照官方的介绍应该时当某一个点在规定时间内没有上报数据就会报警。测试时未得到预期结果

Notification Endpoint

定义 Notification Endpoint

 
notification-endpint.png

 

Notification Ruls

定义 Notification Ruls

 
Notification Rul.png

 

定义绑定 Checks 和 Notification Endpoint 后当Check产生相应等级的报警后等待Notification Rules检测周期到即可将预先定义好的报警信息推送到通知节点

Postman Influx http api 调用

鉴权设置

进入Postman Collections 或单个请求的 Authorization 功能选项。

  1. Type 选择API Key
  2. Key 填入 Authorization
  3. Value 填入 Token YOUR_TOKEN
  4. Add to 选择Header

例:


 
postman.png

随后在访问influxdb的各项http API

Query API

鉴权后调用 /api/query 可使用查询语句进行查询。

如何使用

  • orgorgID通过url传参的形式添加到参数中

  • 添加或修改Accept请求头为 appliction/csv(Postman中不需要添加)

  • 添加或修改Content-Type请求头为 application/vnd.flux

  • 将查询语句放在请求体中

Write API

  • org,bucket 通过url传参形式添加到参数中
  • 添加或修改Accept请求头为 appliction/json(实测该接口不反回数据,可选填)
  • 添加或修改Content-Type请求头为 text/plain
  • 将Line Protocol的数据添加到请求体中

数据备份与恢复

备份

influxdb 使用 influx 命令进行数据和元数据的备份。1.x 和 2.x备份数据不兼容。以下为2.x的备份方法

influx backup <back-path> -t <root-token>

# 例:
influx backup ./back -t 58s6gl9hD8lk-AS_i6mUaYMMCGe6N1vIfVpJUo2xJ2HkWMlWx2yp7r7IKZsyF6h8vQdTPfIpGyHtbALayLgUQw==

token 可以在管理界面Tokens处找到

恢复

influx 使用influx命令进行数据恢复

influx restore <back-path>
#例:
influx restore ./back

恢复指定 bucket

influx restore <back-path> --bucket exmple-bucket

如果恢复的 bucket 名称已经在现有数据库中存在 则使用--new-bucket 为恢复的数据库指定一个新名称并将数据恢复到新名称的bucket中

关于无法启动UI管理界面的问题

问题描述

这个不一定每个人都能遇到。笔者的电脑现在无法启动UI界面,但在虚拟机中是可以启动的。推测是我在第一次启动时在配置文件中手动指定了静态资源位置,而此时该位置是一个无效路径。

解决办法

UI界面在初始化过程中报错可自己下载UI项目手动构建后,由influx配置文件指定静态资源路径解决。

UI项目Github地址(网页地址不要直接clone!!!)

git clone https://github.com/influxdata/ui.git

yarn build

需要注意的是无论是build还是 start指令都要在linux或mac os 上进行,因为构建和启动指令包含对 export 指令的调用 windows使用set无法兼容。期间可能会有网络问题,可以根据报错手动下在所需文件放在根目录下即可。

构建完成后在influxd启动目录创建 config.yml 文件并键入以下内容

assets-path: 构建结果路径

influxdb默认会读取启动目录下的config.yml文件,启动后前端访问的就是自构建的UI了

其他

更多API信息详见influxdb官方文档

其他语言客户端库

与[转帖]Influxdb 2.x 快速入门相似的内容:

[转帖]Influxdb 2.x 快速入门

Influxdb 2.x 快速入门 https://www.jianshu.com/p/268fca65f10e Influxdb是由Golang 构建的时序数据库,由于由Go语言构建使得其跨平台部署相对方便。用户只需下载其可执行文件到相应系统执行即可。 核心概念 示例数据(解释某些概念用) _ti

[转帖]grafana 连接 influxdb 1.x 和 2.x

文章目录 一、安装 influxdbⅠ、docker 安装 二、常用操作Ⅰ、influxdb 1.x版本添加用户认证Ⅱ、influxdb 2.x 使用命令行Ⅲ、CLI 配置tokenⅤ、CLI 查询测试 三、grafana 安装使用Ⅱ、docker 安装 grafana 四、grafana 连接 i

[转帖]Jmeter连接InfluxDB2.0.4

Jmeter连接InfluxDB2.0.4 问题描述:在用Jmeter+InfluxDB构建监控时,因为docker构建的InfluxDB的版本是2.0.4,按照网上的教程进行后端监听器的填写,但是一直出现错误提示401等问题。网上的教程大多是1.X版本的,怀疑是数据库版本不一致导致的数据无法写入,

[转帖]influxdb 2.0.3 tar.gz的安装与配置

下载地址:https://dl.influxdata.com/influxdb/releases/influxdb2-2.0.3_linux_amd64.tar.gz 安装influxdb ### 解压 [root@jyzbdb2 soft]# tar -zxf influxdb2-2.0.3_li

[转帖]JMeter InfluxDB v2.0 listener plugin

https://github.com/mderevyankoaqa/jmeter-influxdb2-listener-plugin Support my Ukrainian Family ❤️ Like what you see? 😍 Description The goal of the pr

[转帖]【InfluxDB V2.0】介绍与使用,flux查询、数据可视化

目录 一、关键概念 二、系统结构 三、配置文件 四、Flux查询语句 五、可视化数据 附录 一、关键概念 相比V1 移除了database 和 RP,增加了bucket。 V2具有以下几个概念: timestamp、field key、field value、field set、tag key、ta

[转帖]手摸手搭建简单的jmeter+influxdb+grafana性能监控平台

我安装的机器是阿里云的centos8机器,其他的系统暂未验证 1.安装influxdb influxdb 下载地址https://portal.influxdata.com/downloads/,也可以直接在服务器上执行以下命令下载 我的软件一般下载在/usr/local/soft文件夹下,soft

[转帖]

Linux ubuntu20.04 网络配置(图文教程) 因为我是刚装好的最小系统,所以很多东西都没有,在开始配置之前需要做下准备 环境准备 系统:ubuntu20.04网卡:双网卡 网卡一:供连接互联网使用网卡二:供连接内网使用(看情况,如果一张网卡足够,没必要做第二张网卡) 工具: net-to

[转帖]

https://cloud.tencent.com/developer/article/2168105?areaSource=104001.13&traceId=zcVNsKTUApF9rNJSkcCbB 前言 Redis作为高性能的内存数据库,在大数据量的情况下也会遇到性能瓶颈,日常开发中只有时刻

[转帖]ISV 、OSV、 SIG 概念

ISV 、OSV、 SIG 概念 2022-10-14 12:29530原创大杂烩 本文链接:https://www.cndba.cn/dave/article/108699 1. ISV: Independent Software Vendors “独立软件开发商”,特指专门从事软件的开发、生产、