200 元有偿求助,使用 Java 的 rsocket 上传文件 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
shuang
0.04D
V2EX    外包

200 元有偿求助,使用 Java 的 rsocket 上传文件

  •  
  •   shuang 123 天前 1784 次点击
    这是一个创建于 123 天前的主题,其中的信息可能已经有所发展或是发生改变。

    服务端是第三方的,我方需要按照接口文档上传文件。
    1 、分片上传,实体为 Flux<DataBuffer>
    2 、需要携带 header ,媒体类型为 application/json
    目前可以确认,服务端是 ok 的,问题出在客户端上传文件的代码。

    以下是客户端上传文件的代码:

    CompositeByteBuf compositeMetadata = ByteBufAllocator.DEFAULT.compositeBuffer(); // 1. 创建路由元数据 ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent( ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute())); compositeMetadata.addComponent(true, ByteBufAllocator.DEFAULT.buffer().writeBytes(routeMetadata)); // 2. 按文档要求添加请求 header Map<String, String> uploadFileHeader = new HashMap<>(); uploadFileHeader.put("token", token); uploadFileHeader.put("fileType", "jpg"); uploadFileHeader.put("fileName", "random-file-name"); ByteBuf customMetadata = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader)); CompositeMetadataCodec.encodeAndAddMetadata(compositeMetadata, ByteBufAllocator.DEFAULT, WellKnownMimeType.APPLICATION_JSON, customMetadata); // 读取本地文件 Flux<DataBuffer> dataBufferFlux = DataBufferUtils.read(Paths.get(filePath), new DefaultDataBufferFactory(), 1024 * 8); // 合并 Payloads 将每个 DataBuffer 转换为 Payload ,并附加 metadata Flux<Payload> requestPayloads = dataBufferFlux.map(dataBuffer -> { // 将每个 DataBuffer 转换为 Payload ,并附加 metadata return ByteBufPayload.create(Unpooled.wrappedBuffer(dataBuffer.asByteBuffer()), compositeMetadata); }); rsocket.requestChannel(requestPayloads) .doOnNext(payload -> log.error("=====> doOnNext")) .doOnError(error -> log.error("=====> doOnError", error)) .doOnComplete(() -> log.info("=====> doOnComplete")) .subscribe(); 

    注:由于对 rsocket 完全不熟,所以以上代码任何地方都有可能是错的。

    目前一直报错,缺少请求头:Missing header 'upload-file-header' for method parameter type , 从报错分析,已经请求到接口了,说明路由 metadata 没问题,但是请求头传递不正确。文件数据流是否传递正确还未知。

    希望寻求有过 rsocket 相关开发经验的人,帮忙看下代码哪里有问题。
    解决后发微信红包 200 元作为报酬。

    第 1 条附言    121 天前
    各位大神,如果觉得报酬低了,都好商量,重点在于解决问题。
    第 2 条附言    116 天前
    已解决,结帖。感谢各位大神的帮助,好几位给红包都不收,太实诚了,respect !
    17 条回复    2025-06-17 10:57:23 +08:00
    sioncheng
        1
    sioncheng  
       123 天前
    有点好奇,从报错信息来看,上传方是不是没有明确接收方需要的 upload-file-header 信息。还有,是不是可以先一般 java 代码去实现上传功能,确保明确了解了接收方的接口文档,然后再将一般 java 代码改为 rsocket 。
    lervard358
        2
    lervard358  
       123 天前
    我接了 怎么联系 加我  YWxwaGEtZW5naW5lZXJpbmc=
    larisboy
        3
    larisboy  
       122 天前
    uploadFileHeader 加上 upload-file-header 看看
    shuang
        4
    shuang  
    OP
       122 天前 via Android
    @larisboy 不知道怎么加,试了几种写法都不对
    shuang
        5
    shuang  
    OP
       122 天前 via Android
    @sioncheng 目前问题就在于如何用 rsocket 与服务端交互,不知道这个请求头该如何传递
    lbbdefy
        6
    lbbdefy  
       122 天前
    ByteBuf 大小端的问题要先确认
    sioncheng
        7
    sioncheng  
       121 天前
    @shuang 再探讨下。我意思是 rsocket 只是一个技术手段吧,rsocket 能做到的,其他 java 方式应该也能做到;理解清楚对方的接收协议才是本质,对方是标准的 multipart/form-data 协议还是自定义协议呢,这样才能对症解决问题吧。
    shuang
        8
    shuang  
    OP
       121 天前 via Android
    @sioncheng
    没太明白你的意思。
    对方的接口文档里就是要求按照 rsocket 的方式上传文件。你所说的 multipart/form-data ,应该是常规的 http 协议的文件上传,服务端是 tcp 协议的。两者技术手段不同。
    shuang
        9
    shuang  
    OP
       121 天前 via Android
    @lbbdefy 第一次听说这个词,我去搜一下什么是大小端
    sioncheng
        10
    sioncheng  
       121 天前
    @shuang 懂了,就是必须 rsocket ,并且对方也不是常见的 http multipart/form-data 协议。头疼,哈哈。
    shuang
        11
    shuang  
    OP
       121 天前
    ```
    // 元数据
    CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer();

    // 1. 创建路由元数据
    ByteBuf routeCOntent= TaggingMetadataCodec.createTaggingContent(
    ByteBufAllocator.DEFAULT,
    Collections.singletonList(platformConfig.getFileUploadRoute()));

    CompositeMetadataCodec.encodeAndAddMetadata(
    composite,
    ByteBufAllocator.DEFAULT,
    WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
    routeContent);

    // 2. 创建上传文件头
    ByteBuf headerCOntent= ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, getHeaderContent(token));
    CompositeMetadataCodec.encodeAndAddMetadata(
    composite,
    ByteBufAllocator.DEFAULT,
    WellKnownMimeType.fromString("message/x.upload-file-header"),
    headerContent);

    // 读取本地文件
    Flux<DataBuffer> dataBufferFlux = DataBufferUtils.read(Paths.get(filePath), new DefaultDataBufferFactory(), 1024 * 8);

    Flux<Payload> requestPayloads = dataBufferFlux.map(buf -> ByteBufPayload.create(
    Unpooled.wrappedBuffer(buf.asByteBuffer()),
    composite
    ));

    rsocket.requestChannel(requestPayloads)
    .doOnNext(payload -> log.error("=====> doOnNext"))
    .doOnError(error -> log.error("=====> doOnError", error))
    .doOnComplete(() -> log.info("=====> doOnComplete"))
    .subscribe();
    ```

    报错说 Missing header 'upload-file-header' ,我又换了种写法,还是不行
    skyyan
        12
    skyyan  
       121 天前
    第三方提供的 api 接口文档能发下不
    kvolongoto
        13
    kvolongoto  
       120 天前
    // 1. 创建路由元数据 (保持不变)
    ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(
    ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute()));
    compositeMetadata.addComponent(true, ByteBufAllocator.DEFAULT.buffer().writeBytes(routeMetadata));

    // 2. 添加服务器要求的 upload-file-header (新增)
    ByteBuf uploadFileHeaderBuf = ByteBufUtil.writeUtf8(
    ByteBufAllocator.DEFAULT, "your-header-value-here"); // 替换为实际值
    CompositeMetadataCodec.encodeAndAddMetadata(
    compositeMetadata,
    ByteBufAllocator.DEFAULT,
    "upload-file-header", // 必须与服务器注解名称一致
    uploadFileHeaderBuf
    );

    // 3. 添加其他元数据 (JSON 格式)
    Map<String, String> uploadFileHeader = new HashMap<>();
    uploadFileHeader.put("token", token);
    uploadFileHeader.put("fileType", "jpg");
    uploadFileHeader.put("fileName", "random-file-name");
    ByteBuf customMetadata = ByteBufUtil.writeUtf8(
    ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader));
    CompositeMetadataCodec.encodeAndAddMetadata(
    compositeMetadata,
    ByteBufAllocator.DEFAULT,
    WellKnownMimeType.APPLICATION_JSON,
    customMetadata
    );
    shuang
        14
    shuang  
    OP
       120 天前
    @kvolongoto
    感谢答复。
    没看明白,your-header-value-here 这里应该传什么
    shuang
        15
    shuang  
    OP
       120 天前
    @kvolongoto
    文档里说:请求 header 参数,使用键值对,媒体类型为:application/json 。参数有 token 、fileName 、fileType
    shuang
        16
    shuang  
    OP
       116 天前
    已解决。分享一下:

    方案一:使用底层的 rsocket ,更灵活,但代码比较繁琐,适合对 rsocket 原理和 api 比较熟悉的人。
    RSocketClientConfig.java 关键代码:
    @Bean
    public RSocket rsocket() {
    ClientTransport transport = TcpClientTransport.create(platformConfig.getServerHost(), platformConfig.getServerPort());

    RSocket rsocket = RSocketConnector.create()
    // 设置 metadata MIME Type ,方便服务端根据 MIME 类型确定 metadata 内容
    .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
    .dataMimeType(WellKnownMimeType.APPLICATION_JSON.getString())
    // 认证相关的参数
    .setupPayload(getSetupPayload())
    // 接收服务器发送的响应
    .acceptor(new SocketAcceptorImpl())
    // 设置重连策略
    .reconnect(Retry.backoff(2, Duration.ofMillis(500)))
    .connect(transport)
    .block();

    // 检查连接是否成功
    if (rsocket == null || rsocket.isDisposed()) {
    throw new IllegalStateException("RSocket 连接失败");
    }

    return rsocket;
    }

    上传附件的单元测试代码:
    @Test
    public void testFileSimpleUpload() {
    String token = getToken();

    // 复合元数据
    CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer();

    // 1. 创建路由元数据
    ByteBuf routeCOntent= TaggingMetadataCodec.createTaggingContent(
    ByteBufAllocator.DEFAULT,
    Collections.singletonList(platformConfig.getFileUploadRoute()));
    CompositeMetadataCodec.encodeAndAddMetadata(
    compositeByteBuf,
    ByteBufAllocator.DEFAULT,
    WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
    routeContent);

    // 2. 创建上传文件头
    Map<String, String> uploadFileHeader = new HashMap<>();
    uploadFileHeader.put("platformCode", platformConfig.getPlatformCode());
    uploadFileHeader.put("token", token);
    String fileName = fileResource.getFilename();
    uploadFileHeader.put("fileType", fileName.substring(fileName.lastIndexOf(".") + 1));
    uploadFileHeader.put("fileName", UUID.randomUUID().toString().replaceAll("-", ""));
    ByteBuf headerCOntent= ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader));
    CompositeMetadataCodec.encodeAndAddMetadata(
    compositeByteBuf,
    ByteBufAllocator.DEFAULT,
    WellKnownMimeType.APPLICATION_JSON,
    headerContent);

    // 2. 读取本地文件
    Flux<Payload> requestPayloads = DataBufferUtils
    .read(fileResource, new DefaultDataBufferFactory(), 1024 * 8)
    .map(buf -> ByteBufPayload.create(Unpooled.wrappedBuffer(buf.asByteBuffer())))
    .startWith(ByteBufPayload.create(Unpooled.EMPTY_BUFFER, compositeByteBuf));

    rsocket.requestChannel(requestPayloads)
    .doOnNext(payload -> log.info("=====> doOnNext {}", payload.getDataUtf8()))
    .doOnError(error -> log.error("=====> doOnError", error))
    .doOnComplete(() -> log.info("=====> doOnComplete"))
    .blockLast(Duration.ofSeconds(10));
    }
    shuang
        17
    shuang  
    OP
       116 天前
    方案二:使用 spring 封装的 RSocketRequester ,代码简洁易懂
    RSocketClientConfig.java 关键代码:
    @Bean
    public RSocketRequester rsocketRequester(RSocketRequester.Builder builder) {
    return RSocketRequester.wrap(
    rsocket(),
    MimeTypeUtils.APPLICATION_JSON,
    MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()),
    rsocketStrategies());
    }
    @Bean
    public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
    .encoders(encoders -> encoders.add(new Jackson2JsonEncoder()))
    .decoders(decoders -> decoders.add(new Jackson2JsonDecoder()))
    .build();
    }

    上传附件的单元测试代码:
    @Test
    public void uploadFile() {
    this.rSocketRequester
    .route(platformConfig.getFileUploadRoute())
    .metadata(spec -> spec.metadata(getUploadFileHeader(), MimeTypeUtils.APPLICATION_JSON))
    .data(DataBufferUtils.read(fileResource, new DefaultDataBufferFactory(), 1024 * 8))
    .retrieveFlux(String.class)
    .doOnNext(payload -> log.info("=====> doOnNext {}", payload))
    .doOnError(error -> log.error("=====> doOnError", error))
    .doOnComplete(() -> log.info("=====> doOnComplete"))
    .blockLast();
    }
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     870 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 26ms UTC 20:31 PVG 04:31 LAX 13:31 JFK 16:31
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86