1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
var through = require('through');
var bz2 = require('./lib/bzip2');
var bitIterator = require('./lib/bit_iterator');
module.exports = unbzip2Stream;
function unbzip2Stream() {
var bufferQueue = [];
var hasBytes = 0;
var blockSize = 0;
var broken = false;
var done = false;
var bitReader = null;
var streamCRC = null;
function decompressBlock(push){
if(!blockSize){
blockSize = bz2.header(bitReader);
//console.error("got header of", blockSize);
streamCRC = 0;
return true;
}else{
var bufsize = 100000 * blockSize;
var buf = new Int32Array(bufsize);
var chunk = [];
var f = function(b) {
chunk.push(b);
};
streamCRC = bz2.decompress(bitReader, f, buf, bufsize, streamCRC);
if (streamCRC === null) {
// reset for next bzip2 header
blockSize = 0;
return false;
}else{
//console.error('decompressed', chunk.length,'bytes');
push(Buffer.from(chunk));
return true;
}
}
}
var outlength = 0;
function decompressAndQueue(stream) {
if (broken) return;
try {
return decompressBlock(function(d) {
stream.queue(d);
if (d !== null) {
//console.error('write at', outlength.toString(16));
outlength += d.length;
} else {
//console.error('written EOS');
}
});
} catch(e) {
//console.error(e);
stream.emit('error', e);
broken = true;
return false;
}
}
return through(
function write(data) {
//console.error('received', data.length,'bytes in', typeof data);
bufferQueue.push(data);
hasBytes += data.length;
if (bitReader === null) {
bitReader = bitIterator(function() {
return bufferQueue.shift();
});
}
while (!broken && hasBytes - bitReader.bytesRead + 1 >= ((25000 + 100000 * blockSize) || 4)){
//console.error('decompressing with', hasBytes - bitReader.bytesRead + 1, 'bytes in buffer');
decompressAndQueue(this);
}
},
function end(x) {
//console.error(x,'last compressing with', hasBytes, 'bytes in buffer');
while (!broken && bitReader && hasBytes > bitReader.bytesRead){
decompressAndQueue(this);
}
if (!broken) {
if (streamCRC !== null)
this.emit('error', new Error("input stream ended prematurely"));
this.queue(null);
}
}
);
}
|