multiprocess.js 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. 'use strict';
  2. const debug = require('debug')('log4js:multiprocess');
  3. const net = require('net');
  4. const CircularJSON = require('circular-json');
  5. const END_MSG = '__LOG4JS__';
  6. /**
  7. * Creates a server, listening on config.loggerPort, config.loggerHost.
  8. * Output goes to config.actualAppender (config.appender is used to
  9. * set up that appender).
  10. */
  11. function logServer(config, actualAppender, levels) {
  12. /**
  13. * Takes a utf-8 string, returns an object with
  14. * the correct log properties.
  15. */
  16. function deserializeLoggingEvent(clientSocket, msg) {
  17. debug('deserialising log event');
  18. let loggingEvent;
  19. try {
  20. loggingEvent = CircularJSON.parse(msg);
  21. loggingEvent.startTime = new Date(loggingEvent.startTime);
  22. loggingEvent.level = levels.getLevel(loggingEvent.level.levelStr);
  23. } catch (e) {
  24. // JSON.parse failed, just log the contents probably a naughty.
  25. loggingEvent = {
  26. startTime: new Date(),
  27. categoryName: 'log4js',
  28. level: levels.ERROR,
  29. data: ['Unable to parse log:', msg]
  30. };
  31. }
  32. loggingEvent.remoteAddress = clientSocket.remoteAddress;
  33. loggingEvent.remotePort = clientSocket.remotePort;
  34. return loggingEvent;
  35. }
  36. /* eslint prefer-arrow-callback:0 */
  37. const server = net.createServer(function serverCreated(clientSocket) {
  38. clientSocket.setEncoding('utf8');
  39. let logMessage = '';
  40. function logTheMessage(msg) {
  41. if (logMessage.length > 0) {
  42. debug('deserialising log event and sending to actual appender');
  43. actualAppender(deserializeLoggingEvent(clientSocket, msg));
  44. }
  45. }
  46. function chunkReceived(chunk) {
  47. debug('chunk of data received');
  48. let event;
  49. logMessage += chunk || '';
  50. if (logMessage.indexOf(END_MSG) > -1) {
  51. event = logMessage.substring(0, logMessage.indexOf(END_MSG));
  52. logTheMessage(event);
  53. logMessage = logMessage.substring(event.length + END_MSG.length) || '';
  54. // check for more, maybe it was a big chunk
  55. chunkReceived();
  56. }
  57. }
  58. function handleError(error) {
  59. const loggingEvent = {
  60. startTime: new Date(),
  61. categoryName: 'log4js',
  62. level: levels.ERROR,
  63. data: ['A worker log process hung up unexpectedly', error],
  64. remoteAddress: clientSocket.remoteAddress,
  65. remotePort: clientSocket.remotePort
  66. };
  67. actualAppender(loggingEvent);
  68. }
  69. clientSocket.on('data', chunkReceived);
  70. clientSocket.on('end', chunkReceived);
  71. clientSocket.on('error', handleError);
  72. });
  73. server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost', function () {
  74. debug('master server listening');
  75. // allow the process to exit, if this is the only socket active
  76. server.unref();
  77. });
  78. function app(event) {
  79. debug('log event sent directly to actual appender (local event)');
  80. return actualAppender(event);
  81. }
  82. app.shutdown = function (cb) {
  83. debug('master shutdown called, closing server');
  84. server.close(cb);
  85. };
  86. return app;
  87. }
  88. function workerAppender(config) {
  89. let canWrite = false;
  90. const buffer = [];
  91. let socket;
  92. let shutdownAttempts = 3;
  93. function write(loggingEvent) {
  94. debug('Writing log event to socket');
  95. // JSON.stringify(new Error('test')) returns {}, which is not really useful for us.
  96. // The following allows us to serialize errors correctly.
  97. // Validate that we really are in this case
  98. const logData = loggingEvent.data.map((e) => {
  99. if (e && e.stack && CircularJSON.stringify(e) === '{}') {
  100. e = { stack: e.stack };
  101. }
  102. return e;
  103. });
  104. loggingEvent.data = logData;
  105. socket.write(CircularJSON.stringify(loggingEvent), 'utf8');
  106. socket.write(END_MSG, 'utf8');
  107. }
  108. function emptyBuffer() {
  109. let evt;
  110. debug('emptying worker buffer');
  111. /* eslint no-cond-assign:0 */
  112. while ((evt = buffer.shift())) {
  113. write(evt);
  114. }
  115. }
  116. function createSocket() {
  117. debug(`worker appender creating socket to ${config.loggerHost || 'localhost'}:${config.loggerPort || 5000}`);
  118. socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
  119. socket.on('connect', () => {
  120. debug('worker socket connected');
  121. emptyBuffer();
  122. canWrite = true;
  123. });
  124. socket.on('timeout', socket.end.bind(socket));
  125. // don't bother listening for 'error', 'close' gets called after that anyway
  126. socket.on('close', createSocket);
  127. }
  128. createSocket();
  129. function log(loggingEvent) {
  130. if (canWrite) {
  131. write(loggingEvent);
  132. } else {
  133. debug('worker buffering log event because it cannot write at the moment');
  134. buffer.push(loggingEvent);
  135. }
  136. }
  137. log.shutdown = function (cb) {
  138. debug('worker shutdown called');
  139. if (buffer.length && shutdownAttempts) {
  140. debug('worker buffer has items, waiting 100ms to empty');
  141. shutdownAttempts -= 1;
  142. setTimeout(() => {
  143. log.shutdown(cb);
  144. }, 100);
  145. } else {
  146. socket.removeAllListeners('close');
  147. socket.end(cb);
  148. }
  149. };
  150. return log;
  151. }
  152. function createAppender(config, appender, levels) {
  153. if (config.mode === 'master') {
  154. debug('Creating master appender');
  155. return logServer(config, appender, levels);
  156. }
  157. debug('Creating worker appender');
  158. return workerAppender(config);
  159. }
  160. function configure(config, layouts, findAppender, levels) {
  161. let appender;
  162. debug(`configure with mode = ${config.mode}`);
  163. if (config.mode === 'master') {
  164. if (!config.appender) {
  165. debug(`no appender found in config ${config}`);
  166. throw new Error('multiprocess master must have an "appender" defined');
  167. }
  168. debug(`actual appender is ${config.appender}`);
  169. appender = findAppender(config.appender);
  170. if (!appender) {
  171. debug(`actual appender "${config.appender}" not found`);
  172. throw new Error(`multiprocess master appender "${config.appender}" not defined`);
  173. }
  174. }
  175. return createAppender(config, appender, levels);
  176. }
  177. module.exports.configure = configure;