用法
const CreateWriteStream = require('./CreateWriteStream')
const ws = new CreateWriteStream('./test/2.md', {
flags: 'w', // 默认读取
encoding: 'utf8', // 默认utf8
fd: null, // 取拿一个文件,默认null,createReadStream自己处理得到
mode: 0o666, // 默认值可读可写不可操作
highWaterMark: 1,
autoClose: true // 默认 ture,读取完毕后关闭文件
})
let i = 9
function write () {
let flag = true
while (i && flag) {
flag = ws.write(i-- + '')
console.log(flag)
}
}
write()
ws.on('drain', () => {
console.log('drain')
write()
})
ws.on('open', () => {
console.log('open')
})
ws.on('error', () => {
console.log('error')
})
实现
const fs = require('fs')
const EventEmitter = require('../EventEmitter/eventEmitter')
class CreateReadStream extends EventEmitter {
constructor (path, options = {}) { // 这里注意添加默认空对象
super()
this.path = path
this.flags = options.flags || 'w' // 默认写入
this.encoding = options.encoding || 'utf8' // 默认'utf8'
this.fd = options.fd || null // 取拿一个文件,默认null,createReadStream自己处理得到
this.mode = options.mode || 0o666 // 默认值可读可写不可操作
this.autoClose = options.autoClose || true // 默认 ture,读取完毕后关闭文件
this.highWaterMark = options.highWaterMark || 16 * 1024
this.open()
this.pos = this.start || 0
this.len = 0
this.writing = false
this.cache = []
}
open () {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) return this.emit('error', err)
this.fd = fd
this.emit('open', fd)
})
}
/**
*
*
* @param {*} chunk
* @param {*} [encoding=this.encoding]
* @param {*} callback
* @returns true|false
* @description 返回的布尔值标识当前要写入文件的大小大于highWaterMark值,
* 也就是说写入速度慢于读取速度。
* @memberof CreateReadStream
*/
write (chunk, encoding = this.encoding, callback) {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)
this.len += buffer.length
// console.log(this.len)
if (this.len >= this.highWaterMark) {
this.needDrain = true
}
if (!this.writing) {
this._write(chunk, encoding, () => {
typeof callback === 'function' && callback()
this.clearCache()
})
} else {
this.cache.push({
chunk,
encoding,
callback
})
}
return !this.needDrain
}
clearCache () {
const task = this.cache.pop()
if (task) {
const {chunk, encoding, callback} = task
this._write(chunk, encoding, () => {
typeof callback === 'function' && callback()
this.clearCache()
})
} else {
if (this.needDrain) {
this.needDrain = false
this.writing = false
// drain: 耗尽
// drain事件的作用是告知使用方
// 当前任务队列或当前写入任务中的buffer长度已经超过highWaterMark,
// 最好等待队列中的写入任务处理完成,再进行写入操作。
this.emit('drain')
}
}
}
_write (chunk, encoding, callback) {
if (typeof this.fd !== 'number') {
return this.once('open', () => this._write(chunk, encoding, callback))
// return this.once('open', this._write)
// 如果这样写的话,就是直将一个函数作为this.once的参数传入,相当于将该函数赋值给一个局部变量
// 所以该函数中的this指向为全局,严格模式下为undefined。
}
fs.write(this.fd, chunk, this.pos, encoding, (err, written) => {
if (err) return this.emit('error', err)
this.len -= written
this.pos += written
typeof callback === 'function' && callback()
})
}
}
module.exports = CreateReadStream
问题
如果第一次写入的chunk体积很大,那也会造成内存溢出的问题,该如何处理?