用法按照文档中的,比较好理解。
const CreateReadStream = require('./CreateReadStream')
const rs = new CreateReadStream('./test/1.md', {
flags: 'r', // 默认读取
encoding: null, // 默认null
fd: null, // 取拿一个文件,默认null,createReadStream自己处理得到
mode: 0o666, // 默认值可读可写不可操作
autoClose: true, // 默认 ture,读取完毕后关闭文件
start: 0, // 开始读取的位置
end: 6666, // 默认 Infinity无限读取,start和end都包含在内
highWaterMark: 2 // 默认64kb 也就是1024字节 * 64
})
// 如果是每次读取两个字节,就会造成乱码,为什么当highWaterMark为非3倍数的时候,chunk.toString()乱码数量和字节数匹配不上
// let str = ''
let buffers = []
rs.on('data', (chunk) => {
// str += chunk
buffers.push(chunk)
if (buffers.length === 1) {
rs.pause()
setTimeout(() => {
rs.resume()
}, 3000)
}
})
rs.on('end', () => {
console.log(Buffer.concat(buffers).toString())
})
实现
const fs = require('fs')
const EventEmitter = require('../EventEmitter/eventEmitter')
class CreateReadStream extends EventEmitter {
constructor (path, options = {}) { // 这里注意添加默认空对象
super()
this.path = path
this.flags = options.flags || 'r' // 默认读取
this.encoding = options.encoding || null // 默认null
this.fd = options.fs || null // 取拿一个文件,默认null,createReadStream自己处理得到
this.mode = options.mode || 0o666 // 默认值可读可写不可操作
this.autoClose = options.autoClose || true // 默认 ture,读取完毕后关闭文件
this.start = options.start || 0 // 开始读取的位置
this.end = options.end || Infinity // 默认 Infinity无限读取
this.highWaterMark = options.highWaterMark || 64 * 1024 // 默认64kb 也就是1024字节 * 64
this.open()
this.on('newListener', (name) => {
if (name === 'data') {
this.flowing = true
this.read()
}
})
this.pos = this.start
this.flowing = false
}
open () {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) return this.emit('error', err)
this.fd = fd
this.emit('open', fd)
})
}
read () {
if (typeof this.fd !== 'number') {
return this.once('open', () => this.read())
}
const {end, highWaterMark, pos, fd, encoding} = this
// start和end都包含在内
const howManyToRead = this.end ? Math.min(end - this.pos + 1, highWaterMark) : highWaterMark
const buffer = Buffer.alloc(howManyToRead)
fs.read(fd, buffer, 0, howManyToRead, pos, (err, bytesRead) => {
if (err) return this.emit('error', err)
if (bytesRead > 0) {
this.pos += bytesRead
this.emit('data', encoding ? buffer.toString(encoding) : buffer)
if (this.flowing) {
this.read()
}
} else {
this.emit('end')
if (this.autoClose) {
fs.close(this.fd, (err) => {
if (err) return this.emit('error', err)
this.emit('close')
this.flowing = false
})
}
}
})
}
pause () {
this.flowing = false
}
resume () {
this.flowing = true
this.read()
}
}
module.exports = CreateReadStream
遗留问题
乱码个数