服务端是第三方的,我方需要按照接口文档上传文件。
1 、分片上传,实体为 Flux<DataBuffer>
2 、需要携带 header ,媒体类型为 application/json
目前可以确认,服务端是 ok 的,问题出在客户端上传文件的代码。
以下是客户端上传文件的代码:
CompositeByteBuf compositeMetadata = ByteBufAllocator.DEFAULT.compositeBuffer(); // 1. 创建路由元数据 ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent( ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute())); compositeMetadata.addComponent(true, ByteBufAllocator.DEFAULT.buffer().writeBytes(routeMetadata)); // 2. 按文档要求添加请求 header Map<String, String> uploadFileHeader = new HashMap<>(); uploadFileHeader.put("token", token); uploadFileHeader.put("fileType", "jpg"); uploadFileHeader.put("fileName", "random-file-name"); ByteBuf customMetadata = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader)); CompositeMetadataCodec.encodeAndAddMetadata(compositeMetadata, ByteBufAllocator.DEFAULT, WellKnownMimeType.APPLICATION_JSON, customMetadata); // 读取本地文件 Flux<DataBuffer> dataBufferFlux = DataBufferUtils.read(Paths.get(filePath), new DefaultDataBufferFactory(), 1024 * 8); // 合并 Payloads 将每个 DataBuffer 转换为 Payload ,并附加 metadata Flux<Payload> requestPayloads = dataBufferFlux.map(dataBuffer -> { // 将每个 DataBuffer 转换为 Payload ,并附加 metadata return ByteBufPayload.create(Unpooled.wrappedBuffer(dataBuffer.asByteBuffer()), compositeMetadata); }); rsocket.requestChannel(requestPayloads) .doOnNext(payload -> log.error("=====> doOnNext")) .doOnError(error -> log.error("=====> doOnError", error)) .doOnComplete(() -> log.info("=====> doOnComplete")) .subscribe();
注:由于对 rsocket 完全不熟,所以以上代码任何地方都有可能是错的。
目前一直报错,缺少请求头:Missing header 'upload-file-header' for method parameter type , 从报错分析,已经请求到接口了,说明路由 metadata 没问题,但是请求头传递不正确。文件数据流是否传递正确还未知。
希望寻求有过 rsocket 相关开发经验的人,帮忙看下代码哪里有问题。
解决后发微信红包 200 元作为报酬。
1 sioncheng 123 天前 有点好奇,从报错信息来看,上传方是不是没有明确接收方需要的 upload-file-header 信息。还有,是不是可以先一般 java 代码去实现上传功能,确保明确了解了接收方的接口文档,然后再将一般 java 代码改为 rsocket 。 |
2 lervard358 123 天前 我接了 怎么联系 加我 YWxwaGEtZW5naW5lZXJpbmc= |
3 larisboy 122 天前 uploadFileHeader 加上 upload-file-header 看看 |
![]() | 6 lbbdefy 122 天前 ByteBuf 大小端的问题要先确认 |
7 sioncheng 121 天前 @shuang 再探讨下。我意思是 rsocket 只是一个技术手段吧,rsocket 能做到的,其他 java 方式应该也能做到;理解清楚对方的接收协议才是本质,对方是标准的 multipart/form-data 协议还是自定义协议呢,这样才能对症解决问题吧。 |
![]() | 8 shuang OP @sioncheng 没太明白你的意思。 对方的接口文档里就是要求按照 rsocket 的方式上传文件。你所说的 multipart/form-data ,应该是常规的 http 协议的文件上传,服务端是 tcp 协议的。两者技术手段不同。 |
![]() | 11 shuang OP ``` // 元数据 CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer(); // 1. 创建路由元数据 ByteBuf routeCOntent= TaggingMetadataCodec.createTaggingContent( ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute())); CompositeMetadataCodec.encodeAndAddMetadata( composite, ByteBufAllocator.DEFAULT, WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, routeContent); // 2. 创建上传文件头 ByteBuf headerCOntent= ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, getHeaderContent(token)); CompositeMetadataCodec.encodeAndAddMetadata( composite, ByteBufAllocator.DEFAULT, WellKnownMimeType.fromString("message/x.upload-file-header"), headerContent); // 读取本地文件 Flux<DataBuffer> dataBufferFlux = DataBufferUtils.read(Paths.get(filePath), new DefaultDataBufferFactory(), 1024 * 8); Flux<Payload> requestPayloads = dataBufferFlux.map(buf -> ByteBufPayload.create( Unpooled.wrappedBuffer(buf.asByteBuffer()), composite )); rsocket.requestChannel(requestPayloads) .doOnNext(payload -> log.error("=====> doOnNext")) .doOnError(error -> log.error("=====> doOnError", error)) .doOnComplete(() -> log.info("=====> doOnComplete")) .subscribe(); ``` 报错说 Missing header 'upload-file-header' ,我又换了种写法,还是不行 |
12 skyyan 121 天前 第三方提供的 api 接口文档能发下不 |
13 kvolongoto 120 天前 // 1. 创建路由元数据 (保持不变) ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent( ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute())); compositeMetadata.addComponent(true, ByteBufAllocator.DEFAULT.buffer().writeBytes(routeMetadata)); // 2. 添加服务器要求的 upload-file-header (新增) ByteBuf uploadFileHeaderBuf = ByteBufUtil.writeUtf8( ByteBufAllocator.DEFAULT, "your-header-value-here"); // 替换为实际值 CompositeMetadataCodec.encodeAndAddMetadata( compositeMetadata, ByteBufAllocator.DEFAULT, "upload-file-header", // 必须与服务器注解名称一致 uploadFileHeaderBuf ); // 3. 添加其他元数据 (JSON 格式) Map<String, String> uploadFileHeader = new HashMap<>(); uploadFileHeader.put("token", token); uploadFileHeader.put("fileType", "jpg"); uploadFileHeader.put("fileName", "random-file-name"); ByteBuf customMetadata = ByteBufUtil.writeUtf8( ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader)); CompositeMetadataCodec.encodeAndAddMetadata( compositeMetadata, ByteBufAllocator.DEFAULT, WellKnownMimeType.APPLICATION_JSON, customMetadata ); |
![]() | 14 shuang OP |
![]() | 15 shuang OP @kvolongoto 文档里说:请求 header 参数,使用键值对,媒体类型为:application/json 。参数有 token 、fileName 、fileType |
![]() | 16 shuang OP 已解决。分享一下: 方案一:使用底层的 rsocket ,更灵活,但代码比较繁琐,适合对 rsocket 原理和 api 比较熟悉的人。 RSocketClientConfig.java 关键代码: @Bean public RSocket rsocket() { ClientTransport transport = TcpClientTransport.create(platformConfig.getServerHost(), platformConfig.getServerPort()); RSocket rsocket = RSocketConnector.create() // 设置 metadata MIME Type ,方便服务端根据 MIME 类型确定 metadata 内容 .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()) .dataMimeType(WellKnownMimeType.APPLICATION_JSON.getString()) // 认证相关的参数 .setupPayload(getSetupPayload()) // 接收服务器发送的响应 .acceptor(new SocketAcceptorImpl()) // 设置重连策略 .reconnect(Retry.backoff(2, Duration.ofMillis(500))) .connect(transport) .block(); // 检查连接是否成功 if (rsocket == null || rsocket.isDisposed()) { throw new IllegalStateException("RSocket 连接失败"); } return rsocket; } 上传附件的单元测试代码: @Test public void testFileSimpleUpload() { String token = getToken(); // 复合元数据 CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(); // 1. 创建路由元数据 ByteBuf routeCOntent= TaggingMetadataCodec.createTaggingContent( ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute())); CompositeMetadataCodec.encodeAndAddMetadata( compositeByteBuf, ByteBufAllocator.DEFAULT, WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, routeContent); // 2. 创建上传文件头 Map<String, String> uploadFileHeader = new HashMap<>(); uploadFileHeader.put("platformCode", platformConfig.getPlatformCode()); uploadFileHeader.put("token", token); String fileName = fileResource.getFilename(); uploadFileHeader.put("fileType", fileName.substring(fileName.lastIndexOf(".") + 1)); uploadFileHeader.put("fileName", UUID.randomUUID().toString().replaceAll("-", "")); ByteBuf headerCOntent= ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader)); CompositeMetadataCodec.encodeAndAddMetadata( compositeByteBuf, ByteBufAllocator.DEFAULT, WellKnownMimeType.APPLICATION_JSON, headerContent); // 2. 读取本地文件 Flux<Payload> requestPayloads = DataBufferUtils .read(fileResource, new DefaultDataBufferFactory(), 1024 * 8) .map(buf -> ByteBufPayload.create(Unpooled.wrappedBuffer(buf.asByteBuffer()))) .startWith(ByteBufPayload.create(Unpooled.EMPTY_BUFFER, compositeByteBuf)); rsocket.requestChannel(requestPayloads) .doOnNext(payload -> log.info("=====> doOnNext {}", payload.getDataUtf8())) .doOnError(error -> log.error("=====> doOnError", error)) .doOnComplete(() -> log.info("=====> doOnComplete")) .blockLast(Duration.ofSeconds(10)); } |
![]() | 17 shuang OP 方案二:使用 spring 封装的 RSocketRequester ,代码简洁易懂 RSocketClientConfig.java 关键代码: @Bean public RSocketRequester rsocketRequester(RSocketRequester.Builder builder) { return RSocketRequester.wrap( rsocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()), rsocketStrategies()); } @Bean public RSocketStrategies rsocketStrategies() { return RSocketStrategies.builder() .encoders(encoders -> encoders.add(new Jackson2JsonEncoder())) .decoders(decoders -> decoders.add(new Jackson2JsonDecoder())) .build(); } 上传附件的单元测试代码: @Test public void uploadFile() { this.rSocketRequester .route(platformConfig.getFileUploadRoute()) .metadata(spec -> spec.metadata(getUploadFileHeader(), MimeTypeUtils.APPLICATION_JSON)) .data(DataBufferUtils.read(fileResource, new DefaultDataBufferFactory(), 1024 * 8)) .retrieveFlux(String.class) .doOnNext(payload -> log.info("=====> doOnNext {}", payload)) .doOnError(error -> log.error("=====> doOnError", error)) .doOnComplete(() -> log.info("=====> doOnComplete")) .blockLast(); } |