Инструменты пользователя

Инструменты сайта


nodejs:steams:promise.all_restrict

Различия

Здесь показаны различия между двумя версиями данной страницы.

Ссылка на это сравнение

Следующая версия
Предыдущая версия
nodejs:steams:promise.all_restrict [2022/05/15 11:58]
werwolf создано
nodejs:steams:promise.all_restrict [2023/01/12 12:18] (текущий)
Строка 5: Строка 5:
 Сокращенная версия этого кода в настоящее время выглядит примерно так: Сокращенная версия этого кода в настоящее время выглядит примерно так:
  
-<​code>​+<​code ​javascript>
 function getCounts() { function getCounts() {
   return users.map(user => {   return users.map(user => {
Строка 26: Строка 26:
  
  
-===== ОТВЕТЫ =====+====es6-promise-pool==== 
 + 
 +Обратите внимание,​ что ''​Promise.all()''​ не запускает promises, чтобы начать свою работу,​ создавая само обещание. 
 + 
 +Учитывая это, одним из решений было бы проверить,​ когда будет разрешено обещание,​ нужно ли начинать новое обещание или вы уже на пределе. 
 + 
 +Однако нет необходимости изобретать колесо здесь. [[https://​www.npmjs.com/​package/​es6-promise-pool|Одна библиотека,​ которую вы могли бы использовать для этой цели, - ]]''​[[https://​www.npmjs.com/​package/​es6-promise-pool|es6-promise-pool]]''​. Из их примеров:​ 
 + 
 +<code javascript>​ 
 +  // On the Web, leave out this line and use the script tag above instead.  
 +var PromisePool = require('​es6-promise-pool'​) 
 + 
 +var promiseProducer = function () { 
 +  // Your code goes here.  
 +  // If there is work left to be done, return the next work item as a promise.  
 +  // Otherwise, return null to indicate that all promises have been created.  
 +  // Scroll down for an example.  
 +
 + 
 +// The number of promises to process simultaneously.  
 +var concurrency = 3 
 + 
 +// Create a pool.  
 +var pool = new PromisePool(promiseProducer,​ concurrency) 
 + 
 +// Start the pool.  
 +var poolPromise = pool.start() 
 + 
 +// Wait for the pool to settle.  
 +poolPromise.then(function () { 
 +  console.log('​All promises fulfilled'​) 
 +}, function (error) { 
 +  console.log('​Some promise rejected: ' + error.message) 
 +}) 
 +</​code>​ 
 + 
 +====p-limit==== 
 + 
 +**С-Концевой** 
 + 
 +Я сравнил ограничение параллелизма обещаний с пользовательским сценарием,​ bluebird, es6-promise-pool и p-limit. Я считаю,​ что у [[https://​www.npmjs.com/​package/​p-limit|p-limit]] есть самая простая,​ урезанная реализация для этой потребности. [[https://​www.npmjs.com/​package/​p-limit|Смотрите их документацию]]. 
 + 
 +**Требования** 
 + 
 +Быть совместимым с async в примере 
 + 
 +* [[https://​www.w3schools.com/​js/​js_versions.asp|ECMAScript 2017 (версия 8)]] 
 +Версия узла>​ [[https://​node.green|8.2.1]] 
 + 
 +**Мой пример** 
 + 
 +В этом примере нам нужно запустить функцию для каждого URL в массиве (например,​ может быть, запрос API). Здесь это называется ''​fetchData()''​. ​Если бы у нас был массив тысяч элементов для обработки,​ параллелизм определенно был бы полезен для экономии ресурсов ЦП и памяти. 
 + 
 +<code javascript>​ 
 +const pLimit = require('​p-limit'​);​ 
 + 
 +// Example Concurrency of 3 promise at once 
 +const limit = pLimit(3);​ 
 + 
 +let urls = [ 
 +    "​http://​www.exampleone.com/",​ 
 +    "​http://​www.exampletwo.com/",​ 
 +    "​http://​www.examplethree.com/",​ 
 +    "​http://​www.examplefour.com/",​ 
 +
 + 
 +// Create an array of our promises using map (fetchData() returns a promise) 
 +let promises = urls.map(url => { 
 + 
 +    // wrap the function we are calling in the limit function we defined above 
 +    return limit(() => fetchData(url));​ 
 +}); 
 + 
 +(async () => { 
 +    // Only three promises are run at once (as defined above) 
 +    const result = await Promise.all(promises);​ 
 +    console.log(result);​ 
 +})(); 
 +</​code>​ 
 + 
 +Результат журнала консоли - это массив данных ответа ваших разрешенных обещаний. 
 + 
 +====bluebird==== 
 + 
 +[[http://​bluebirdjs.com/​docs/​api/​promise.map.html|Bluebird Promise.map]] может использовать параметр параллелизма,​ чтобы контролировать,​ сколько обещаний должно выполняться параллельно. Иногда это проще, чем ''​.all''​ потому что вам не нужно создавать массив обещаний. 
 + 
 +<code javascript>​ 
 +const Promise = require('​bluebird'​) 
 + 
 +function getCounts() { 
 +  return Promise.map(users,​ user => { 
 +    return new Promise(resolve => { 
 +      remoteServer.getCount(user) // makes an HTTP request 
 +      .then(() => { 
 +        /* snip */ 
 +        resolve();​ 
 +       }); 
 +    }); 
 +  }, {concurrency:​ 10}); // <---- at most 10 http requests at a time 
 +
 +</​code>​ 
 + 
 +====http.Agent({maxSockets:​ 5})==== 
 + 
 +Вместо использования обещаний для ограничения http-запросов используйте встроенный узел [[https://​nodejs.org/​api/​http.html#​http_agent_maxsockets%23http_agent_maxsockets|http.Agent.maxSockets]]. Это устраняет необходимость использования библиотеки или написания собственного кода пула и дает дополнительное преимущество,​ позволяющее лучше контролировать то, что вы ограничиваете. 
 + 
 +> agent.maxSockets По умолчанию установлено бесконечность. Определяет,​ сколько одновременных сокетов агент может открыть для каждого источника. Источник - это либо комбинация "​хост:​ порт",​ либо "​хост:​ порт: локальный адрес"​. 
 + 
 +Например:​ 
 + 
 +<code javascript>​ 
 +var http = require('​http'​);​ 
 +var agent = new http.Agent({maxSockets:​ 5}); // 5 concurrent connections per origin 
 +var request = http.request({...,​ agent: agent}, ...); 
 + 
 +</​code>​ 
 + 
 +Если вы делаете несколько запросов к одному и тому же источнику,​ вам также может быть полезно установить для ''​keepAlive''​ значение true (дополнительную информацию см. В документации выше). 
 + 
 +====iterator==== 
 + 
 +Если вы знаете,​ как работают итераторы и как они используются,​ вам не понадобится дополнительная библиотека,​ так как создать собственный параллелизм очень просто. Позвольте мне продемонстрировать:​ 
 + 
 +<code javascript>​ 
 +    /* [Symbol.iterator]() is equivalent to .values() 
 +const iterator = [1,​2,​3][Symbol.iterator]() */ 
 +const iterator = [1,​2,​3].values() 
 + 
 + 
 +// loop over all items with for..of 
 +for (const x of iterator) { 
 +  console.log('​x:',​ x) 
 +   
 +  // notices how this loop continues the same iterator 
 +  // and consumes the rest of the iterator, making the 
 +  // outer loop not logging any more x's 
 +  for (const y of iterator) { 
 +    console.log('​y:',​ y) 
 +  } 
 +
 +</​code>​ 
 + 
 +====p-limit MongoDB==== 
 + 
 +Вот основной пример для потоковой передачи и "​p-limit"​. Это потоковое чтение HTTP потока в Монго БД. 
 + 
 +<code javascript>​ 
 +const stream = require('​stream'​);​ 
 +const util = require('​util'​);​ 
 +const pLimit = require('​p-limit'​);​ 
 +const es = require('​event-stream'​);​ 
 +const streamToMongoDB = require('​stream-to-mongo-db'​).streamToMongoDB;​ 
 + 
 + 
 +const pipeline = util.promisify(stream.pipeline) 
 + 
 +const outputDBConfig = { 
 +    dbURL: '​yr-db-url',​ 
 +    collection: '​some-collection'​ 
 +}; 
 +const limit = pLimit(3);​ 
 + 
 +async yrAsyncStreamingFunction(readStream) => { 
 +        const mongoWriteStream = streamToMongoDB(outputDBConfig);​ 
 +        const mapperStream = es.map((data,​ done) => { 
 +                let someDataPromise = limit(() => yr_async_call_to_somewhere()) 
 + 
 +                    someDataPromise.then( 
 +                        function handleResolve(someData) { 
 + 
 +                            data.someData = someData; ​    
 +                            done(null, data); 
 +                        }, 
 +                        function handleError(error) { 
 +                            done(error) 
 +                        } 
 +                    ); 
 +                }) 
 + 
 +            await pipeline( 
 +                readStream,​ 
 +                JSONStream.parse('​*'​),​ 
 +                mapperStream,​ 
 +                mongoWriteStream 
 +            ); 
 +        } 
 +</​code>​ 
 + 
 +====рекурсия==== 
 + 
 +Это может быть решено с помощью рекурсии. 
 + 
 +Идея состоит в том, что изначально вы отправляете максимально допустимое количество запросов,​ и каждый из этих запросов должен рекурсивно продолжать отправлять сам себя после своего завершения. 
 + 
 +<code javascript>​ 
 +function batchFetch(urls,​ concurrentRequestsLimit) { 
 +    return new Promise(resolve => { 
 +        var documents = []; 
 +        var index = 0; 
 + 
 +        function recursiveFetch() { 
 +            if (index === urls.length) { 
 +                return; 
 +            } 
 +            fetch(urls[index++]).then(r => { 
 +                documents.push(r.text());​ 
 +                if (documents.length === urls.length) { 
 +                    resolve(documents);​ 
 +                } else { 
 +                    recursiveFetch();​ 
 +                } 
 +            }); 
 +        } 
 + 
 +        for (var i = 0; i < concurrentRequestsLimit;​ i++) { 
 +            recursiveFetch();​ 
 +        } 
 +    }); 
 +
 + 
 +var sources = [ 
 +    '​http://​www.example_1.com/',​ 
 +    '​http://​www.example_2.com/',​ 
 +    '​http://​www.example_3.com/',​ 
 +    ... 
 +    '​http://​www.example_100.com/'​ 
 +]; 
 +batchFetch(sources,​ 5).then(documents => { 
 +   ​console.log(documents);​ 
 +}); 
 +</​code>​ 
 + 
 +====batch-promises==== 
 + 
 +Поэтому я попытался заставить некоторые показанные примеры работать для моего кода, но так как это было только для сценария импорта,​ а не для производственного кода, использование [[https://​www.npmjs.com/​package/​batch-promises|пакетных обещаний]] пакета npm, несомненно,​ было для меня самым простым путем 
 + 
 +**ПРИМЕЧАНИЕ. ​Требуется время выполнения для поддержки Promise или его заполнения.** 
 + 
 +**Api** batchPromises (int: batchSize, array: Collection, я => Promise: Iteratee) Promise: Iteratee будет вызываться после каждого пакета. 
 + 
 +**Использование:​** 
 + 
 +<code javascript>​ 
 +batch-promises 
 +Easily batch promises 
 + 
 +NOTE: Requires runtime to support Promise or to be polyfilled. 
 + 
 +Api 
 +batchPromises(int:​ batchSize, array: Collection, i => Promise: Iteratee) 
 +The Promise: Iteratee will be called after each batch. 
 + 
 +Use: 
 +import batchPromises from '​batch-promises';​ 
 +  
 +batchPromises(2,​ [1,​2,​3,​4,​5],​ i => new Promise((resolve,​ reject) => { 
 +  
 +  // The iteratee will fire after each batch resulting in the following behaviour:​ 
 +  // @ 100ms resolve items 1 and 2 (first batch of 2) 
 +  // @ 200ms resolve items 3 and 4 (second batch of 2) 
 +  // @ 300ms resolve remaining item 5 (last remaining batch) 
 +  setTimeout(() => { 
 +    resolve(i);​ 
 +  }, 100); 
 +})) 
 +.then(results => { 
 +  console.log(results);​ // [1,​2,​3,​4,​5] 
 +}); 
 +</​code>​ 
 + 
 +====Promise.race==== 
 + 
 +Это то, что я сделал с помощью ''​Promise.race'',​ внутри моего кода здесь 
 + 
 +<code javascript>​ 
 +const identifyTransactions = async function() { 
 +  let promises = [] 
 +  let concurrency = 0 
 +  for (let tx of this.transactions) { 
 +    if (concurrency > 4) 
 +      await Promise.race(promises).then(r => { promises = []; concurrency = 0 }) 
 +    promises.push(tx.identifyTransaction()) 
 +    concurrency++ 
 +  } 
 +  if (promises.length > 0) 
 +    await Promise.race(promises) //resolve the rest 
 +
 +</​code>​ 
 + 
 +Если вы хотите увидеть пример:​ [[https://​jsfiddle.net/​thecodermarcelo/​av2tp83o/​5|https://​jsfiddle.net/​thecodermarcelo/​av2tp83o/​5/​]] 
 + 
 +====рекурсия с внешними библиотеками==== 
 + 
 +Рекурсия - это ответ, если вы не хотите использовать внешние библиотеки 
 + 
 +<code javascript>​ 
 +downloadAll(someArrayWithData){ 
 +  var self = this; 
 + 
 +  var tracker = function(next){ 
 +    return self.someExpensiveRequest(someArrayWithData[next]) 
 +    .then(function(){ 
 +      next++;//​This updates the next in the tracker function parameter 
 +      if(next < someArrayWithData.length){//​Did I finish processing all my data? 
 +        return tracker(next);//​Go to the next promise 
 +      } 
 +    }); 
 +  } 
 + 
 +  return tracker(0);  
 +
 +</​code>​ 
 + 
 +====async-pool==== 
 + 
 +Я предлагаю библиотеку async-pool: [[https://​github.com/​rxaviers/​async-pool|https://​github.com/​rxaviers/​async-pool]] 
 + 
 +> Выполнить несколько обещаний,​ возвращающих & асинхронные функции с ограниченным параллелизмом с использованием собственного ES6/ES7 asyncPool выполняет несколько обещающих возврат & асинхронные функции в ограниченном пуле параллелизма. Он отклоняет сразу же, как только одно из обещаний отклоняется. Это решает,​ когда все обещания завершаются. Он вызывает функцию итератора как можно скорее (при ограничении параллелизма). 
 + 
 +Установка:​ 
 + 
 +> npm установить крошечный-асинхронный пул 
 + 
 +====async/​await==== 
 + 
 +Это становится относительно тривиально с async/​await,​ в зависимости от того, что вы хотите,​ это хорошо переносится на карту с задержкой или forEach, вот реализация карты. 
 + 
 +<code javascript>​ 
 +const sleep = ms => new Promise(resolve => setTimeout(resolve,​ ms)) 
 + 
 +const delayMap = async (ms, arr, f) => { 
 +  const results = [] 
 +  let i 
 +  for (const item of arr) { 
 +    results.push(await f(item, i++, arr)) 
 +    await sleep(ms) 
 +  } 
 +  return results 
 +
 + 
 +// Example use - delaying 1 second between each call 
 +delayMap(1000,​ [ 1, 2, 3 ], id =>  
 +  fetch(`https://​jsonplaceholder.typicode.com/​posts/​${id}`) 
 +
 +  .then(posts ​=> posts.map(post ​=> post.json())) 
 +  .then(Promise.all.bind(Promise)) 
 +  .then(posts ​=> console.log('​Posts',​ posts)) 
 +</​code>​
nodejs/steams/promise.all_restrict.1652605125.txt.gz · Последние изменения: 2023/01/12 12:17 (внешнее изменение)