How to use pipe in levelup (node.js)?

Leonardo

LevelUP Documentation says that pipe() can be used (https://github.com/rvagg/node-levelup/#pipes-and-node-stream-compatibility).

I've tried the following code:

db.createValueStream().pipe(response)

But I could't do it, I've got an error:

events.js:72
        throw er; // Unhandled 'error' event
              ^
TypeError: Invalid non-string/buffer chunk
    at validChunk (_stream_writable.js:150:14)
    at Writable.write (_stream_writable.js:179:12)
    at write (_stream_readable.js:573:24)
    at flow (_stream_readable.js:582:7)
    at ReadStream.pipeOnReadable (_stream_readable.js:614:5)
    at ReadStream.EventEmitter.emit (events.js:92:17)
    at emitReadable_ (_stream_readable.js:408:10)
    at emitReadable (_stream_readable.js:404:5)
    at readableAddChunk (_stream_readable.js:165:9)
    at ReadStream.Readable.push (_stream_readable.js:127:10)

The actual problem is about memory usage when a use the event 'data' (). Then i was trying do make a stream.Transform and use pipe() to do what a I need. Once memory leak in event emmiter is a problem: Memory leak when using streams in Node.js?

UPDATE

I've tried @paul-mougel without success. The function of error event are not called, and the it crashed. This is a piece of the code:

    var rs = db.createValueStream();

    request.on('close', function(){
        rs.destroy();
        response.end();
    });

    rs.on('end', function(){
        response.end();
    });
    rs.on('error', function(err){
        console.err('READ STREAM ERROR:',err.message);
        response.end();
        rs.destroy();
    });

    response.on('error', function(err){
        console.log('RESPONSE ERROR:',err);
        rs.destroy();
    });

    rs.pipe(stringifier).pipe(response);
Paul Mougel

There are multiple things to consider.

First, you get this exception because you don't listen to the error event. In the case of streams, always listen to it, which i) will allow you to log the issue ii) won't make the program crash.

var valueStream = db.createValueStream()
valueStream.on('error', function (err) {
  console.error('valueStream.on error ' + err.message);
});
valueStream.pipe(response);
response('error', function (err) {
  console.error('response error ' + err.message);
});

Second, db.createValueStream() creates a readable stream in object mode (see source): it will output javascript objects. On the other hand, your response is a writable stream in byte mode: it only takes bytes as input, hence the error event. What you can do is create a Transform stream that will take javascript objects as input and output their stringified version:

var stream = require('stream')
var stringifier = new stream.Transform();
stringifier._writableState.objectMode = true;
stringifier._transform = function (data, encoding, done) {
    this.push(JSON.stringify(data));
    this.push('\n');
    done();
}

valueStream.pipe(stringifier).pipe(response);

Please note that we create a transform stream that takes objects as input and outputs bytes. See the documentation for more information.

However, you'll have to tell us more about the specific problem you're trying to solve by piping the leveled stream into a request: the above solution isn't a very good solution.

Third, you didn't encounter a memory leak when using .on('data'). Adding this listener transforms the stream into flowing mode, which means that it will output the data as fast as possible. You could always use the .pause() and .resume() methods to stop and restart the stream. But using the new v0.10 stream interface (aka streams2) helps you to deal with this issue, as the Readable, Writable and Transform classes take care of all of that for you.

Collected from the Internet

Please contact [email protected] to delete if infringement.

edited at
0

Comments

0 comments
Login to comment

Related