stream_client.js

const DevNull = require('dev-null-stream')
const axios = require('axios');
const EventEmitter = require('events');
const TweetParser = require('./parse_stream');
const http = require('http');
const https = require('https');
const { pipeline } = require('stream/promises');
const RATE_LIMIT_WINDOW = 15 * 60 * 1000;
const DATA_TIMEOUT = 30000;

/**
 * Process error to determine if it's either an axios timeout or response stream timeout
 * @private
 */
function isTimedout(error) {
  return error.code === 'ECONNABORTED' || error.isTimeout
}

/**
 * Statuses that can be retried
 * @private
 */
function retriableStatus(resp) {
  let status = resp.status;
  const valid_statuses = [429, 420, 504, 503, 502, 500];
  return valid_statuses.includes(status);
}

/**
 * Returns def if num is NaN
 * @private
 */
function defaultNaN(num, def) {
  if(isNaN(num)) {
    return def;
  } else {
    return num;
  }
}

/**
 * Returns a Promise that resolves on timeout
 * @private
 */
const sleep = (milliseconds) => { return new Promise(resolve => setTimeout(resolve, milliseconds)); }

/**
 * Calculate rate limit time in milliseconds
 * @private
 * @param {Object} resp Response object that preferably defines resp.status and resp.headers
 * @param {Date} last_retry Date of previous retry attempt
 * @returns {number} Backout time
 */
function rateLimiting(resp, last_retry) {
  const now = Date.now()
  const fallback_rate = now + RATE_LIMIT_WINDOW;
  let backoff, ratelimit, remaining;

  if(resp && resp.headers) {
    const headers = resp.headers;
    remaining = defaultNaN(parseInt(headers['x-rate-limit-remaining']), 0);
    ratelimit = defaultNaN(parseInt(headers['x-rate-limit-reset']), fallback_rate / 1000);
    ratelimit = new Date(ratelimit * 1000);
  } else {
    remaining = -1;
    ratelimit = fallback_rate;
  }

  if(remaining === 0 || resp && (resp.status === 429 || resp.status === 420)) {
    backoff = Math.min(ratelimit - now, RATE_LIMIT_WINDOW);
  } else {
    let delta = Math.min(RATE_LIMIT_WINDOW, (now - last_retry)) / RATE_LIMIT_WINDOW;
    //delta = 1.0 - delta;
    backoff = Math.max(Math.floor(delta * RATE_LIMIT_WINDOW), 1000);
  }

  return backoff;
}

/**
 * Connect to the Twitter API v2 sampled stream endpoint and emit events for processing<br/>
 * For additional information see
 * [Twitter Sampled Stream]{@link https://developer.twitter.com/en/docs/twitter-api/tweets/sampled-stream/introduction}
 * @extends EventEmitter
 * @fires StreamClient#tweet
 * @fires StreamClient#connected
 * @fires StreamClient#reconnect
 * @fires StreamClient#disconnected
 * @fires StreamClient#stream-error
 * @fires StreamClient#api-errors
 * @fires StreamClient#heartbeat
 * @fires StreamClient#other
 */
class StreamClient extends EventEmitter {
  /**
   * Initializes the client
   * @param {Object} config Configuration for client
   * @param {number} config.timeout Set request and response timeout
   * @param {string} config.token Set [OAUTH Bearer token]{@link https://developer.twitter.com/en/docs/authentication/oauth-2-0} from developer account
   */
  constructor({token, timeout = DATA_TIMEOUT, stream_timeout = DATA_TIMEOUT}) {
    super();
    this.url = 'tweets/sample/stream';
    this.timeout = timeout;
    this.twitrClient = axios.create({
      baseURL: 'https://api.twitter.com/2',
      headers: { 'Authorization': `Bearer ${token}`},
      timeout: timeout
    });
    this.stream_timeout = stream_timeout;
  }

  /**
   * Connect to twitter stream and emit events.
   * @param {Object} config Configuration for connection
   * @param {number} config.params Set any filter parameters for stream, etc.
   * @param {number} config.max_reconnects Specify max number of reconnects. Default: -1 (infinity)
   * @returns {(Promise<void>)} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
   * -- the Promise rejects if the number of reconnects exceeds the max or an irrecoverable error occurs
   * -- the Promise resolves with that last error returned. Error object defines .reconnects if reconnects are exceeded
   * @see retriableStatus
   */
  async connect({ params = {}, max_reconnects = -1, writeable_stream = null} = {}) {
    let reconnects = -1;
    let last_try = Date.now();
    let last_error;

    while(max_reconnects === -1 || reconnects <= max_reconnects)
    {
      reconnects += 1;
      this.params = params;
      this.max_reconnects = max_reconnects;
      try {
        await this.buildConnection(this.params, writeable_stream);
      } catch(request_error) {
        let resp = request_error.response;
        if(axios.isCancel(request_error)) {
          return Promise.resolve();
        } else if(isTimedout(request_error) || resp && retriableStatus(resp)) {
          let timeout_wait = rateLimiting(resp, last_try);
          this.emit('reconnect', request_error, timeout_wait);
          if(!this.skip_sleep) {
            await sleep(timeout_wait);
          }
        } else {
          return Promise.reject(request_error);
        }
      }
      last_try = Date.now();
    }

    let reject_error = new Error({
      message: `Max reconnects exceeded (${reconnects})`,
      wrapped_error: last_error
    })
    reject_error.reconnects = reconnects;
    return Promise.reject(reject_error);
  }

  /**
   * Disconnects an active request if any
   * @returns {boolean} Returns true if there is a request to disconnect
   */
  disconnect() {
    this.emit('disconnected');
    if(this.cancelToken) {
      this.cancelToken.cancel('Disconnecting stream');
      this.cancelToken = null;
      return true;
    } else {
      return false;
    }
  }

  /**
   * Build Promises for handling data stream in [.buildConnection]{@link StreamClient#buildConnection}
   * @private
   * @returns {Promise} Promise that initiates HTTP streaming
   */
  buildStreamPromise(resp, writable_stream) {
    const hose = resp.data;
    const timer = setTimeout(() => {
      const e = new Error(`Timed out after ${this.stream_timeout / 1000} seconds of no data`);
      e.isTimeout = true;
      hose.destroy(e);
      clearTimeout(timer);
    }, this.stream_timeout);
    const jsonStream = new TweetParser({
      emitter: this,
      timer: timer
    });
    const streams = [hose, jsonStream];
    if(writable_stream) {
      streams.push(writable_stream);
    } else {
      streams.push(new DevNull());
    }
    let listener = () => {
      hose.end();
    }

    this.once('disconnected', listener);
    let thicc = pipeline(...streams);
    thicc.finally(() => clearTimeout(timer));
    this.emit('connected');
    return thicc;
  }

  /**
   * Connect to twitter stream and emit events. Errors unhandled.
   * @private
   * @returns {Promise} Promise that resolves on [disconnect]{@link StreamClient#disconnect}
   * -- the Promise chain is unhandled so consider using [.connect]{@link StreamClient#connect}
   */
  buildConnection(params, writable_stream) {
    if(this.cancelToken) {
      this.disconnect();
    }
    this.cancelToken = axios.CancelToken.source();

    let streamReq = this.twitrClient.get(this.url, {
      responseType: 'stream',
      cancelToken: this.cancelToken.token,
      params: params,
      decompress: true,
      httpAgent: new http.Agent({ keepAlive: true }),
      httpsAgent: new https.Agent({ keepAlive: true }),
      timeout: 0,
      maxContentLength: Infinity,
    });

    return streamReq.then((resp) => {
      return this.buildStreamPromise(resp, writable_stream);
    });
  }
}

module.exports = StreamClient;