Quickwit获取Kafka数据源消息

news/2025/2/25 5:54:57

介绍

Quickwit可以将数据从一个或多个源插入到索引中。创建索引后,可以使用CLI 命令quickwit source create添加源,支持的源有:file、kafka、kinesis、pulsar。

本章讲解如何从Quickwit搜索引擎中创建Kafka源和获取Kafka源主题数据流,注意从Kafka流中读取数据流中的每条消息都必须包含一个JSON 对象,目前官方只支持数据源JSON格式数据导入。

官方关于Kafka源文档说明:Kafka | Quickwit

创建Quickwit索引

因此前已在docker服务主机上创建了包含quickwit服务的docker容器,本文不详述quickwit安装过程,可参见《Docker安装Quickwit搜索引擎》。

官方示例YMAL配置

#
# Index config file for gh-archive dataset.
#
version: 0.8

index_id: gh-archive

doc_mapping:
  field_mappings:
    - name: id
      type: text
      tokenizer: raw
    - name: type
      type: text
      fast: true
      tokenizer: raw
    - name: public
      type: bool
      fast: true
    - name: payload
      type: json
      tokenizer: default
    - name: org
      type: json
      tokenizer: default
    - name: repo
      type: json
      tokenizer: default
    - name: actor
      type: json
      tokenizer: default
    - name: other
      type: json
      tokenizer: default
    - name: created_at
      type: datetime
      fast: true
      input_formats:
        - rfc3339
      fast_precision: seconds
  timestamp_field: created_at

indexing_settings:
  commit_timeout_secs: 10

也可直接下载官方示例yaml文件,并通过quickwit服务命令创建索引

# Download GH Archive index config.
wget -O gh-archive.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/index-config.yaml

# Create index.
./quickwit index create --index-config gh-archive.yaml

Kafka服务安装

可自行百度,非本文核心,略过...

下载Kafka测试数据

测试数据来源于Quickwit官方,参见:Kafka | Quickwit

创建kafka主题,下载官方提供的json测试数据包,将数据推送到Kafka主题;

# Create a topic named `gh-archive` with 3 partitions.
bin/kafka-topics.sh --create --topic gh-archive --partitions 3 --bootstrap-server localhost:9092

# Download a few GH Archive files.
wget https://data.gharchive.org/2022-05-12-{10..15}.json.gz

# Load the events into Kafka topic.
gunzip -c 2022-05-12*.json.gz | \
bin/kafka-console-producer.sh --topic gh-archive --bootstrap-server localhost:9092

由于文件包太大,本地下载其中1天json数据包,执行命令如下

curl https://data.gharchive.org/2022-05-12-10.json.gz -o 2022-05-12-10.json.gz

创建Quickwit数据源

kafka数据源yaml配置示例

#
# Kafka source config file.
#
version: 0.8
source_id: kafka-source
source_type: kafka
num_pipelines: 2
params:
  topic: gh-archive
  client_params:
    bootstrap.servers: localhost:9092

也可直接下载官方提供的kafka-source.yaml文件,通过quickwit服务命令直接创建gh-archive索引对应的kafka源;

# Download Kafka source config.
wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/kafka-source.yaml

# Create source.
./quickwit source create --index gh-archive --source-config kafka-source.yaml

完成上述操作后,Quickwit会创建索引器和搜索器,索引器将会边接到Kafka源指定topic主题上,并从对应group组主题分区上获取数据,通过流式传输到Quickwit中;

Java推送到Kafka主题

基于Java发送消息到Kafka示例代码,工具类可参见《Kafka消息服务之Java工具类》,本章不作详细讲述,通过代码片段进行演示;

java">//生产者发送消息
KafkaUtils.KafkaStreamServer kafkaStreamServer =  KafkaUtils.bulidServer().createKafkaStreamServer("192.168.1.3", 9092);
String topic = "gh-archive";
int n = 0;
List<String>lines = FileUtils.readLines(new File("D:\\test\\kafka\\2022-05-12-10.json"), "UTF-8");
for (String line : lines) {
    System.out.println("发送消息:" + line.substring(0,30) + " ...");
    //向kafka队列发送数据
    kafkaStreamServer.sendMsg(topic, line);
    if (n > 0 && n / 1000  == 0) {
        //线程休眠
        TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(1, 200));
    }
    n ++;
}
//共累计推送到kafka数量:156040条
System.out.println("共累计推送到kafka数量:" + n + "条");
kafkaStreamServer.close();

因官方测试数据比较大,本章节通过提前下载Json数据压缩包,只使用了其中一个Json压缩包:2022-05-12-10.json.gz

curl https://data.gharchive.org/2022-05-12-10.json.gz -o 2022-05-12-10.json.gz

Java创建Quickwit索引和Kafka数据源

Quickwit提供了丰富的REST API,因此支持通过HTTP请求创建、维护索引、索引查询以及数据源维护等;以下直接使用Java程序通过Http演示示例;

QuickwitKafkaSourceTest.java

java">package com.example;

import org.junit.jupiter.api.Test;

public class QuickwitKafkaSourceTest {
    private final static String QUICKWIT_URL = "http://192.168.1.3:7280/";
    /**
     * 创建索引
     * @throws Exception
     */
    @Test
    public void createIndex() throws Exception {
        String indexConf = """
                {
                    "version": "0.8",
                    "index_id": "gh-archive",
                    "doc_mapping": {
                        "field_mappings": [
                            {
                                "name": "id",
                                "type": "text",
                                "tokenizer": "raw"
                            },
                            {
                                "name": "type",
                                "type": "text",
                                "fast": true,
                                "tokenizer": "raw"
                            },
                            {
                                "name": "public",
                                "type": "bool",
                                "fast": true
                            },
                            {
                                "name": "payload",
                                "type": "json",
                                "tokenizer": "default"
                            },
                            {
                                "name": "org",
                                "type": "json",
                                "tokenizer": "default"
                            },
                            {
                                "name": "repo",
                                "type": "json",
                                "tokenizer": "default"
                            },
                            {
                                "name": "actor",
                                "type": "json",
                                "tokenizer": "default"
                            },
                            {
                                "name": "other",
                                "type": "json",
                                "tokenizer": "default"
                            },
                            {
                                "name": "created_at",
                                "type": "datetime",
                                "fast": true,
                                "input_formats": ["rfc3339"],
                                "fast_precision": "seconds"
                            }
                        ],
                        "timestamp_field": "created_at"
                    },
                    "indexing_settings": {
                        "commit_timeout_secs": 10
                    }
                }
                """;
        System.out.println("响应:" + HttpUtils.build().httpPost(QUICKWIT_URL + "api/v1/indexes", indexConf));
    }

    /**
     * 创建数据源, 源支持:(kafka,kinesis,file)
     * @throws Exception
     */
    @Test
    public void createSources() throws Exception {
        String indexId = "gh-archive";
        String jsonDate = """
                {
                    "version": "0.8",
                    "source_id": "kafka-source",
                    "source_type": "kafka",
                    "num_pipelines": 1,
                    "input_format": "json",
                    "params": {
                        "topic": "gh-archive",
                        "client_params": {
                            "auto.offset.reset": "earliest",
                            "bootstrap.servers": "192.168.1.5:9092"
                        }
                    }
                }
                """;
        System.out.println("响应:" + HttpUtils.build().httpPost(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources", jsonDate));
    }

    /**
     * 将索引切换到一个新的源上
     * @throws Exception
     */
    @Test
    public void indexesSourceToggle() throws Exception {
        String indexId = "gh-archive";
        String sourceId = "kafka-source";
        System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId + "/toggle", "PUT", ""));
    }
    
    /**
     * 重置索引绑定的源
     * @throws Exception
     */
    @Test
    public void indexesSourceReset() throws Exception {
        String indexId = "gh-archive";
        String sourceId = "kafka-source";
        System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId + "/reset-checkpoint", "PUT", ""));
    }

    /**
     * 删除索引绑定的源
     * @throws Exception
     */
    @Test
    public void indexesSourceDelete() throws Exception {
        String indexId = "gh-archive";
        String sourceId = "my-kafka-source";
        System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId, "DELETE", null));
    }

    /**
     * 获取索引描述信息
     * @throws Exception
     */
    @Test
    public void getIndexesDescribe() throws Exception {
        String indexId = "gh-archive";
        System.out.println("响应:" + HttpUtils.build().httpGet(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/describe"));
    }

    /**
     * 删除索引数据和所有元数据
     * @throws Exception
     */
    @Test
    public void deleteIndexes() throws Exception {
        String indexId = "gh-archive";
        System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId , "DELETE", null));
    }

}

HttpUtils.java

java">package com.example;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;

public class HttpUtils {
    private HttpClient httpClient;

    public static HttpUtils build(){
        return new HttpUtils();
    }

    private HttpUtils() {
        httpClient = HttpClient.newHttpClient();
    }

    public String httpPost(String url, String jsonData) throws Exception {
        return http(url, "POST", jsonData);
    }

    public String httpGet(String url) throws Exception {
        return http(url, "GET", null);
    }

    public String http(String url, String method, String jsonData) throws Exception {
        System.out.println("请求:" + url);
        HttpRequest request = null;
        HttpRequest.Builder builder = HttpRequest.newBuilder()
                .uri(new URI( url))
                .header("Content-Type", "application/json; charset=UTF-8" )
                .header("Timeout", "5000");
        if ("POST".equals(method)) {
            request = builder.POST(HttpRequest.BodyPublishers.ofString(jsonData, StandardCharsets.UTF_8)).build();
        } else if ("PUT".equals(method)) {
            request = builder.PUT(HttpRequest.BodyPublishers.ofString(jsonData, StandardCharsets.UTF_8)).build();
        } else if ("DELETE".equals(method)) {
            request = builder.DELETE().build();
        } else {
            request = builder.GET().build();
        }
        HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
        System.out.println("状态码:" + response.statusCode());
        return response.body();
    }
}

在Java开发工具中分别执行createIndex()和createSources()测试方法后,即可创建索引、绑定Kafka数据源;

检索Kafka数据

在推送Kafka数据后,通过Redpanda控制台工具(此工具本文不做详述,参见官方《redpanda-console-kafka-ui》)查看Kfaka主题大小约438M;并且Json数据格式中有多层json嵌套;

在浏览器上输入Quickwit UI访问地址:http://127.0.0.1:7280,在Quickwit UI 》 Indexs 中查看已创建的gh-archive索引;

点击gh-archive索引,进入到SOURCES中查看已配Kafka源;

Quickwit索引器从Kafka源中获取数据流后写入到指定索引中,在Query editor功能中可以查询到已存储的索引数据;

参考:

REST API | Quickwit


http://www.niftyadmin.cn/n/5865060.html

相关文章

【NLP 38、激活函数 ④ GELU激活函数】

别盲目&#xff0c;别着急&#xff0c;慢慢走&#xff0c;没事的 —— 25.2.24 一、定义与数学表达式 GELU&#xff08;Gaussian Error Linear Unit&#xff0c;高斯误差线性单元&#xff09;是一种结合概率分布的非线性激活函数&#xff0c;其核心思想是通过输入值服从标准正…

牛客周赛 Round 82(思维、差分、树状数组、大根堆、前后缀、递归)

文章目录 牛客周赛 Round 82&#xff08;思维、差分、树状数组、大根堆、前后缀、递归&#xff09;A. 夹心饼干B. C. 食堂大作战&#xff08;思维&#xff09;D. 小苯的排列计数(差分、树状数组)E. 和和&#xff08;大根堆&#xff0c;前缀和&#xff09;F. 怎么写线性SPJ &…

小程序高度问题背景scss

不同的机型&#xff0c;他的比例啥的都会不一样&#xff0c;同样的rpx也会有不同的效果。所以这里选择了取消高度。 <view class"box-border" :style"{padding-top: ${navHeight}px,}"><!-- 已登录 --><view v-if"userStore.userInfo&…

Image Collections操作

在Google Earth Engine&#xff08;GEE&#xff09;中处理影像集合&#xff08;Image Collections&#xff09;是遥感数据分析的核心操作。以下是详细的步骤和示例代码&#xff0c;涵盖影像集合的常见操作&#xff1a; 1. 影像集合基础 影像集合是GEE中存储多幅影像的数据结构…

文件上传-Windows点空格点绕过

[题目信息]&#xff1a; 题目名称题目难度文件上传-Windows点空格点绕过1 [题目考点]&#xff1a; Windowsw文件特性考察[Flag格式]: SangFor{UDOaJfziTs4c-dceIyGxa53-Ybrg9dtF}[环境部署]&#xff1a; docker-compose.yml文件或者docker tar原始文件。 docker-compose u…

数据同步的中间件

以下是10个支持MySQL、HBase、ClickHouse、HDFS等不同数据库之间数据同步的GitHub项目推荐&#xff1a; 项目名称语言主要特点支持的数据库GitHub链接DataXPython阿里巴巴开源的数据同步工具&#xff0c;支持多种数据库和文件系统。MySQL、ClickHouse、HDFS等GitHub链接Apache…

STM32-智能小车项目

项目框图 ST-link接线 实物图&#xff1a; 正面&#xff1a; 反面&#xff1a; 相关内容 使用L9110S电机模块 电机驱动模块L9110S详解 | 良许嵌入式 一、让小车动起来 新建文件夹智能小车项目 在里面复制19-串口打印功能 重命名为01-让小车动起来 新建文件夹motor&…

Spring Cloud Gateway 网关的使用

在之前的学习中&#xff0c;所有的微服务接口都是对外开放的&#xff0c;这就意味着用户可以直接访问&#xff0c;为了保证对外服务的安全性&#xff0c;服务端实现的微服务接口都带有一定的权限校验机制&#xff0c;但是由于使用了微服务&#xff0c;就需要每一个服务都进行一…