Node.js在读取流的末尾关闭写入流

我正在阅读存储在AWS S3上的csv文件。这些csv记录由一个名为filterLogic()的函数求值。未通过测试的记录也需要写入AWS S3的错误报告csv文件中。对于csv解析,我使用的是fast-csv。

但是,对于最后一条记录,我得到一个"Error [ERR_STREAM_WRITE_AFTER_END]: write after end"

我需要在哪里以及如何正确致电csvwritestream.end()

const AWS = require('aws-sdk');
const utils = require('./utils');
const csv = require('fast-csv');
const s3 = new AWS.S3();

exports.handler = async (event) => {
    console.log("Incoming Event: ",JSON.stringify(event));
    const bucket = event.Records[0].s3.bucket.name;
    const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g,' '));
    const message = `File is uploaded in - ${bucket} -> ${filename}`;
    console.log(message);

    const splittedFilename = filename.split('.');
    const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
    const reportBucket = 'external.transactions.reports';

    const csvwritestream = csv.format({ headers: true });
    csvwritestream
        .pipe(utils.uploadFromStream(s3,reportBucket,reportFilename))
        .on('end',function () {
            console.log("Report written to S3 " + reportFilename);
        });

    var request = s3.getObject({ Bucket: bucket,Key: filename });
    var stream = request.createReadStream({ objectMode: true })
        .pipe(csv.parse({ headers: true }))
        .on('data',async function (data) {
            stream.pause();
            console.log("JSON: " + JSON.stringify(data));
            var response = await utils.filterLogic(data);
            if (response.statusCode !== 200) {
                await csvwritestream.write(data);
                console.log("Data: " + JSON.stringify(data) + " written."); 
            }
            stream.resume();
        })
        .on('end',function(){
            csvwritestream.end();   
        });
    return new Promise(resolve => {
        stream.on('close',async function () {
            csvwritestream.end();
            resolve();
        });
    });
};

x551857914h 回答:Node.js在读取流的末尾关闭写入流

错误的原因很简单。主要问题是node.js event emitter中没有等待侦听器data的事实-因此,您可能希望数据处理按顺序开始,但完成的时间却没有。现在考虑到这一点,可以看到end事件在最后一个data事件之后立即被触发,因此在关闭写流之后完成了一些最后处理的项目的写入...是,我知道您确实暂停了,但是在async function中,您是在节点处理了同步内容并且end是其中之一之后这样做的。

您可以做的是实现一个Transform流,但是鉴于您的用例,它可能比仅使用另一个模块(例如我的scramjet)要复杂得多,该模块将允许您在数据过滤器中运行异步代码。

const AWS = require('aws-sdk');
const utils = require('./utils');
const {StringStream} = require('scramjet');
const s3 = new AWS.S3();

exports.handler = async (event) => {
    console.log("Incoming Event: ",JSON.stringify(event));
    const bucket = event.Records[0].s3.bucket.name;
    const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g,' '));
    const message = `File is uploaded in - ${bucket} -> ${filename}`;
    console.log(message);

    const splittedFilename = filename.split('.');
    const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
    const reportBucket = 'external.transactions.reports';

    var request = s3.getObject({ Bucket: bucket,Key: filename });
    var stream = StringStream
        // create a StringStream from a scramjet stream
        .from(
            request.createReadStream()
        )
        // then just parse the data
        .CSVParse({headers: true})
        // then filter using asynchronous function as if it was an array
        .filter(async data => {
            var response = await utils.filterLogic(data);
            return response.statusCode === 200;
        })
        // then stringify
        .CSVStringify({headers: true})
        // then upload
        .pipe(
            utils.uploadFromStream(s3,reportBucket,reportFilename)
        );

    return new Promise(
        (res,rej) => stream
            .on("finish",res)
            .on("error",rej)
    );
};

Scramjet将通过上述方法来创建流管道,因此您无需暂停/继续-所有这些都已得到解决。

您可能想阅读:

哦,超燃冲压发动机只增加了3点,因此您的节点模块不会出现这种相对论的笑话。 ;)

本文链接:https://www.f2er.com/2631968.html

大家都在问