ArkFlow 高性能 Rust 流处理引擎 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
chenquan
V2EX    开源软件

ArkFlow 高性能 Rust 流处理引擎

  •  
  •   chenquan 209 天前 2076 次点击
    这是一个创建于 209 天前的主题,其中的信息可能已经有所发展或是发生改变。

    github: https://github.com/chenquan/arkflow

    高性能 Rust 流处理引擎,提供强大的数据流处理能力,支持多种输入输出源和处理器。

    特性

    • 高性能:基于 Rust 和 Tokio 异步运行时构建,提供卓越的性能和延迟
    • 多种数据源:支持 Kafka 、MQTT 、HTTP 、文件等多种输入输出源
    • 强大的处理能力:内置 SQL 查询、JSON 处理、Protobuf 编解码、批处理等多种处理器
    • 可扩展:模块化设计,易于扩展新的输入、输出和处理器组件

    安装

    从源码构建

    # 克隆仓库 git clone https://github.com/chenquan/arkflow.git cd arkflow # 构建项目 cargo build --release # 运行测试 cargo test 

    快速开始

    1. 创建配置文件 config.yaml
    logging: level: info streams: - input: type: "generate" context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }' interval: 1s batch_size: 10 pipeline: thread_num: 4 processors: - type: "json_to_arrow" - type: "sql" query: "SELECT * FROM flow WHERE value >= 10" - type: "arrow_to_json" output: type: "stdout" 
    1. 运行 ArkFlow:
    ./target/release/arkflow --config config.yaml 

    配置说明

    ArkFlow 使用 YAML 格式的配置文件,支持以下主要配置项:

    顶级配置

    logging: level: info # 日志级别:debug, info, warn, error streams: # 流定义列表 - input: # 输入配置 # ... pipeline: # 处理管道配置 # ... output: # 输出配置 # ... 

    输入组件

    ArkFlow 支持多种输入源:

    • Kafka:从 Kafka 主题读取数据
    • MQTT:从 MQTT 主题订阅消息
    • HTTP:通过 HTTP 接收数据
    • 文件:从文件读取数据
    • 生成器:生成测试数据
    • SQL:从数据库查询数据

    示例:

    input: type: kafka brokers: - localhost:9092 topics: - test-topic consumer_group: test-group client_id: arkflow start_from_latest: true 

    处理器

    ArkFlow 提供多种数据处理器:

    • JSON:JSON 数据处理和转换
    • SQL:使用 SQL 查询处理数据
    • Protobuf:Protobuf 编解码
    • 批处理:将消息批量处理

    示例:

    pipeline: thread_num: 4 processors: - type: json_to_arrow - type: sql query: "SELECT * FROM flow WHERE value >= 10" - type: arrow_to_json 

    输出组件

    ArkFlow 支持多种输出目标:

    • Kafka:将数据写入 Kafka 主题
    • MQTT:将消息发布到 MQTT 主题
    • HTTP:通过 HTTP 发送数据
    • 文件:将数据写入文件
    • 标准输出:将数据输出到控制台

    示例:

    output: type: kafka brokers: - localhost:9092 topic: output-topic client_id: arkflow-producer 

    示例

    Kafka 到 Kafka 的数据处理

    streams: - input: type: kafka brokers: - localhost:9092 topics: - test-topic consumer_group: test-group pipeline: thread_num: 4 processors: - type: json_to_arrow - type: sql query: "SELECT * FROM flow WHERE value > 100" - type: arrow_to_json output: type: kafka brokers: - localhost:9092 topic: processed-topic 

    生成测试数据并处理

    streams: - input: type: "generate" context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }' interval: 1ms batch_size: 10000 pipeline: thread_num: 4 processors: - type: "json_to_arrow" - type: "sql" query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor" - type: "arrow_to_json" output: type: "stdout" 
    第 1 条附言    209 天前
    欢迎大家讨论,一起完善它哈,
    感兴趣的朋友可以帮忙点个星星哦!
    11 条回复    2025-03-19 20:26:37 +08:00
    des
        2
    des  
       209 天前
    基于 yaml 配置文件配置的?感觉迟早是个麻烦的事,除非你一次配置好就不改了
    chenquan
        3
    chenquan  
    OP
       209 天前
    @des 是的,目前的设计中是采用 yaml 配置哈,不过今后会考虑使用动态更新 yaml 配置的方式来提升体验。如果有更好的方式欢迎随时交流哈
    3085570450tt
        4
    3085570450tt  
       209 天前
    已 star. 同 2 楼一样,yaml 配置可能会劝退一部分人;
    对比其他类似的流引擎,比如 risingwave/arroyo, 发现似乎轻量是它的优点?目前暂时没有 benchmark 可用参考的,同时其他两个工具支持的数据源格式更多;
    计划有没有其他客户端 sdk, 比如 node/python 等等,与引擎本身进行交互等
    如果暂时支持不了更多数据源,是否将数据源 input/output 这一块,抽离成扩展的方式,让用户自定义更好呢?
    chenquan
        5
    chenquan  
    OP
       209 天前
    @3085570450tt
    1. 目前其实 yaml 的配置也很简单的,但是也有一定的门槛(适合用于一定开发经验的人),这个在后期会进一步优化哈。
    2. input 、output 、processor 已经支持扩展( https://github.com/chenquan/arkflow/pull/75 ),会尽快完善这块的内容。
    chenquan
        6
    chenquan  
    OP
       209 天前
    @3085570450tt 欢迎继续关注 arkflow 、有什么想法可以畅所欲言。
    des
        7
    des  
       207 天前
    @chenquan 使用 yaml 两个不好的点,一个是不方便调试,另外一个是不方便修改
    des
        8
    des  
       207 天前
    @3085570450tt 轻量不是好听点的说法嘛 哈哈哈
    chenquan
        9
    chenquan  
    OP
       206 天前
    @des 有什么好的解法吗?
    des
        10
    des  
       206 天前
    @chenquan 我没有想法,要不就不会光提问题了,哈哈哈
    des
        11
    des   206 天前
    @chenquan 提供一个可能的做法供参考
    就是提供一个接口和页面可以用来在线编辑,以及从数据里面抽取一些数据用来调试
    想想这个感觉和 arroyo 有点像哎
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     891 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 27ms UTC 20:21 PVG 04:21 LAX 13:21 JFK 16:21
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86