Webflux - マルチパートでのファイルアップロード

この記事を読むとわかること

Webfluxサーバー上でのファイルアップロードとその処理方法について解説します。

以下に非常にシンプルなコードを記述しました。このコードに従えば、他の処理を行いながら複数のアップロードファイルを扱うことができます。

コード

fun uploadImage(serverRequest: ServerRequest): Mono<ServerResponse> {
    val monos = serverRequest.multipartData().flatMap { multipart ->
        // 1. マルチパートファイルを取得
        val files = multipart["file"] ?: listOf()
        // 2. Monoパブリッシャーを作成(flatMap内で記述しているため)
        return @flatMap Mono.just(files.map { part ->
            val filePart = part.cast<FilePart>()
            // 3. Flux<DataBuffer>からDataBufferを読み込み、SequenceInputStreamを使用してreduceでMonoに変換
            filePart.content().reduce(object : InputStream() {
                override fun read() = -1
            }) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream(true)) }
                    .flatMap { inputStream ->
                    // 4. FluxからInputStreamの読み込みが完了した後、InputStreamのバイト列を読み込む(** readBytes関数を実行するとストリームが閉じられるため、InputStreamは再利用できません)
                        val bytes = inputStream.readBytes() 

                        // 5. これでバイト列を自由に使用できます。この例では、MonoでラップしてファイルをS3バケットにアップロードしています。

                        return @flatMap Mono.just(s3Uploader.upload(filePart.filename(), filePart.headers().contentType.toString(), ByteArrayInputStream(bytes), bytes.size.toLong()))
                    }
        })
    }
    return monos.flatMap { responses ->
        // 6. すべてのファイルがアップロードされた後に応答するために Mono.zip を使用
        Mono.zip(responses, this::resultWithUpload).flatMap {
            // 7. すべてのファイルがS3にアップロードされた後、クライアントに応答
            ServerResponse.ok().body(BodyInserters.fromValue(it))
        }
    }
}

fun resultWithUpload(results: Array<Any>): List<s3UploadResponse> {
    return results.map { return @map it as s3UploadResponse }.toList()
}

結論

非常にシンプルなアップロードの例でした。これでWebfluxスタックでファイルを扱えるようになります。

ハングリーであれ。愚か者であれ。