我正在阅读存储在AWS S3上的csv文件。这些csv记录由一个名为filterLogic()的函数求值。未通过测试的记录也需要写入AWS S3的错误报告csv文件中。对于csv解析,我使用的是fast-csv。
但是,对于最后一条记录,我得到一个"Error [ERR_STREAM_WRITE_AFTER_END]: write after end"
我需要在哪里以及如何正确致电csvwritestream.end()
?
const AWS = require('aws-sdk');
const utils = require('./utils');
const csv = require('fast-csv');
const s3 = new AWS.S3();
exports.handler = async (event) => {
console.log("Incoming Event: ",JSON.stringify(event));
const bucket = event.Records[0].s3.bucket.name;
const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g,' '));
const message = `File is uploaded in - ${bucket} -> ${filename}`;
console.log(message);
const splittedFilename = filename.split('.');
const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
const reportBucket = 'external.transactions.reports';
const csvwritestream = csv.format({ headers: true });
csvwritestream
.pipe(utils.uploadFromStream(s3,reportBucket,reportFilename))
.on('end',function () {
console.log("Report written to S3 " + reportFilename);
});
var request = s3.getObject({ Bucket: bucket,Key: filename });
var stream = request.createReadStream({ objectMode: true })
.pipe(csv.parse({ headers: true }))
.on('data',async function (data) {
stream.pause();
console.log("JSON: " + JSON.stringify(data));
var response = await utils.filterLogic(data);
if (response.statusCode !== 200) {
await csvwritestream.write(data);
console.log("Data: " + JSON.stringify(data) + " written.");
}
stream.resume();
})
.on('end',function(){
csvwritestream.end();
});
return new Promise(resolve => {
stream.on('close',async function () {
csvwritestream.end();
resolve();
});
});
};