BaseRollingFileStream.js 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. "use strict";
  2. var fs = require('fs')
  3. , zlib = require('zlib')
  4. , debug = require('debug')('streamroller:BaseRollingFileStream')
  5. , mkdirp = require('mkdirp')
  6. , path = require('path')
  7. , util = require('util')
  8. , stream = require('readable-stream');
  9. module.exports = BaseRollingFileStream;
  10. function BaseRollingFileStream(filename, options) {
  11. debug("In BaseRollingFileStream");
  12. this.filename = filename;
  13. this.options = options || {};
  14. this.options.encoding = this.options.encoding || 'utf8';
  15. this.options.mode = this.options.mode || parseInt('0644', 8);
  16. this.options.flags = this.options.flags || 'a';
  17. this.currentSize = 0;
  18. function currentFileSize(file) {
  19. var fileSize = 0;
  20. try {
  21. fileSize = fs.statSync(file).size;
  22. } catch (e) {
  23. // file does not exist
  24. }
  25. return fileSize;
  26. }
  27. function throwErrorIfArgumentsAreNotValid() {
  28. if (!filename) {
  29. throw new Error("You must specify a filename");
  30. }
  31. }
  32. throwErrorIfArgumentsAreNotValid();
  33. debug("Calling BaseRollingFileStream.super");
  34. BaseRollingFileStream.super_.call(this);
  35. this.openTheStream();
  36. this.currentSize = currentFileSize(this.filename);
  37. }
  38. util.inherits(BaseRollingFileStream, stream.Writable);
  39. BaseRollingFileStream.prototype._writeTheChunk = function(chunk, encoding, callback) {
  40. debug("writing the chunk to the underlying stream");
  41. this.currentSize += chunk.length;
  42. try {
  43. if (!this.theStream.write(chunk,encoding)) {
  44. debug('waiting for drain event');
  45. this.theStream.once('drain',callback);
  46. } else {
  47. process.nextTick(callback);
  48. }
  49. debug("chunk written");
  50. } catch (err) {
  51. debug(err);
  52. if (callback) {
  53. callback(err);
  54. }
  55. }
  56. };
  57. BaseRollingFileStream.prototype._write = function(chunk, encoding, callback) {
  58. debug("in _write");
  59. if (this.shouldRoll()) {
  60. this.currentSize = 0;
  61. this.roll(this.filename, this._writeTheChunk.bind(this, chunk, encoding, callback));
  62. } else {
  63. this._writeTheChunk(chunk, encoding, callback);
  64. }
  65. };
  66. BaseRollingFileStream.prototype.openTheStream = function(cb) {
  67. debug("opening the underlying stream");
  68. var that = this;
  69. mkdirp.sync(path.dirname(this.filename));
  70. this.theStream = fs.createWriteStream(this.filename, this.options);
  71. this.theStream.on('error', function(err) {
  72. that.emit('error', err);
  73. });
  74. if (cb) {
  75. this.theStream.on("open", cb);
  76. }
  77. };
  78. BaseRollingFileStream.prototype.closeTheStream = function(cb) {
  79. debug("closing the underlying stream");
  80. this.theStream.end(cb);
  81. };
  82. BaseRollingFileStream.prototype.compress = function(filename, cb) {
  83. debug('Compressing ', filename, ' -> ', filename, '.gz');
  84. var gzip = zlib.createGzip();
  85. var inp = fs.createReadStream(filename);
  86. var out = fs.createWriteStream(filename+".gz");
  87. inp.pipe(gzip).pipe(out);
  88. out.on('finish', function(err) {
  89. debug('Removing original ', filename);
  90. fs.unlink(filename, cb);
  91. });
  92. };
  93. BaseRollingFileStream.prototype.shouldRoll = function() {
  94. return false; // default behaviour is never to roll
  95. };
  96. BaseRollingFileStream.prototype.roll = function(filename, callback) {
  97. callback(); // default behaviour is not to do anything
  98. };
  99. BaseRollingFileStream.prototype.end = function(chunk, encoding, callback) {
  100. var self = this;
  101. debug('end called - first close myself');
  102. stream.Writable.prototype.end.call(self, function() {
  103. debug('writable end callback, now close underlying stream');
  104. self.theStream.end(chunk, encoding, function(err) {
  105. debug('underlying stream closed');
  106. if (callback) {
  107. callback(err);
  108. }
  109. });
  110. });
  111. };