Здесь показаны различия между двумя версиями данной страницы.
| Следующая версия | Предыдущая версия | ||
|
nodejs:steams:promise.all_restrict [2022/05/15 11:58] werwolf создано |
nodejs:steams:promise.all_restrict [2023/01/12 12:18] (текущий) |
||
|---|---|---|---|
| Строка 5: | Строка 5: | ||
| Сокращенная версия этого кода в настоящее время выглядит примерно так: | Сокращенная версия этого кода в настоящее время выглядит примерно так: | ||
| - | <code> | + | <code javascript> |
| function getCounts() { | function getCounts() { | ||
| return users.map(user => { | return users.map(user => { | ||
| Строка 26: | Строка 26: | ||
| - | ===== ОТВЕТЫ ===== | + | ====es6-promise-pool==== |
| + | |||
| + | Обратите внимание, что ''Promise.all()'' не запускает promises, чтобы начать свою работу, создавая само обещание. | ||
| + | |||
| + | Учитывая это, одним из решений было бы проверить, когда будет разрешено обещание, нужно ли начинать новое обещание или вы уже на пределе. | ||
| + | |||
| + | Однако нет необходимости изобретать колесо здесь. [[https://www.npmjs.com/package/es6-promise-pool|Одна библиотека, которую вы могли бы использовать для этой цели, - ]]''[[https://www.npmjs.com/package/es6-promise-pool|es6-promise-pool]]''. Из их примеров: | ||
| + | |||
| + | <code javascript> | ||
| + | // On the Web, leave out this line and use the script tag above instead. | ||
| + | var PromisePool = require('es6-promise-pool') | ||
| + | |||
| + | var promiseProducer = function () { | ||
| + | // Your code goes here. | ||
| + | // If there is work left to be done, return the next work item as a promise. | ||
| + | // Otherwise, return null to indicate that all promises have been created. | ||
| + | // Scroll down for an example. | ||
| + | } | ||
| + | |||
| + | // The number of promises to process simultaneously. | ||
| + | var concurrency = 3 | ||
| + | |||
| + | // Create a pool. | ||
| + | var pool = new PromisePool(promiseProducer, concurrency) | ||
| + | |||
| + | // Start the pool. | ||
| + | var poolPromise = pool.start() | ||
| + | |||
| + | // Wait for the pool to settle. | ||
| + | poolPromise.then(function () { | ||
| + | console.log('All promises fulfilled') | ||
| + | }, function (error) { | ||
| + | console.log('Some promise rejected: ' + error.message) | ||
| + | }) | ||
| + | </code> | ||
| + | |||
| + | ====p-limit==== | ||
| + | |||
| + | **С-Концевой** | ||
| + | |||
| + | Я сравнил ограничение параллелизма обещаний с пользовательским сценарием, bluebird, es6-promise-pool и p-limit. Я считаю, что у [[https://www.npmjs.com/package/p-limit|p-limit]] есть самая простая, урезанная реализация для этой потребности. [[https://www.npmjs.com/package/p-limit|Смотрите их документацию]]. | ||
| + | |||
| + | **Требования** | ||
| + | |||
| + | Быть совместимым с async в примере | ||
| + | |||
| + | * [[https://www.w3schools.com/js/js_versions.asp|ECMAScript 2017 (версия 8)]] | ||
| + | * Версия узла> [[https://node.green|8.2.1]] | ||
| + | |||
| + | **Мой пример** | ||
| + | |||
| + | В этом примере нам нужно запустить функцию для каждого URL в массиве (например, может быть, запрос API). Здесь это называется ''fetchData()''. Если бы у нас был массив тысяч элементов для обработки, параллелизм определенно был бы полезен для экономии ресурсов ЦП и памяти. | ||
| + | |||
| + | <code javascript> | ||
| + | const pLimit = require('p-limit'); | ||
| + | |||
| + | // Example Concurrency of 3 promise at once | ||
| + | const limit = pLimit(3); | ||
| + | |||
| + | let urls = [ | ||
| + | "http://www.exampleone.com/", | ||
| + | "http://www.exampletwo.com/", | ||
| + | "http://www.examplethree.com/", | ||
| + | "http://www.examplefour.com/", | ||
| + | ] | ||
| + | |||
| + | // Create an array of our promises using map (fetchData() returns a promise) | ||
| + | let promises = urls.map(url => { | ||
| + | |||
| + | // wrap the function we are calling in the limit function we defined above | ||
| + | return limit(() => fetchData(url)); | ||
| + | }); | ||
| + | |||
| + | (async () => { | ||
| + | // Only three promises are run at once (as defined above) | ||
| + | const result = await Promise.all(promises); | ||
| + | console.log(result); | ||
| + | })(); | ||
| + | </code> | ||
| + | |||
| + | Результат журнала консоли - это массив данных ответа ваших разрешенных обещаний. | ||
| + | |||
| + | ====bluebird==== | ||
| + | |||
| + | [[http://bluebirdjs.com/docs/api/promise.map.html|Bluebird Promise.map]] может использовать параметр параллелизма, чтобы контролировать, сколько обещаний должно выполняться параллельно. Иногда это проще, чем ''.all'' потому что вам не нужно создавать массив обещаний. | ||
| + | |||
| + | <code javascript> | ||
| + | const Promise = require('bluebird') | ||
| + | |||
| + | function getCounts() { | ||
| + | return Promise.map(users, user => { | ||
| + | return new Promise(resolve => { | ||
| + | remoteServer.getCount(user) // makes an HTTP request | ||
| + | .then(() => { | ||
| + | /* snip */ | ||
| + | resolve(); | ||
| + | }); | ||
| + | }); | ||
| + | }, {concurrency: 10}); // <---- at most 10 http requests at a time | ||
| + | } | ||
| + | </code> | ||
| + | |||
| + | ====http.Agent({maxSockets: 5})==== | ||
| + | |||
| + | Вместо использования обещаний для ограничения http-запросов используйте встроенный узел [[https://nodejs.org/api/http.html#http_agent_maxsockets%23http_agent_maxsockets|http.Agent.maxSockets]]. Это устраняет необходимость использования библиотеки или написания собственного кода пула и дает дополнительное преимущество, позволяющее лучше контролировать то, что вы ограничиваете. | ||
| + | |||
| + | > agent.maxSockets По умолчанию установлено бесконечность. Определяет, сколько одновременных сокетов агент может открыть для каждого источника. Источник - это либо комбинация "хост: порт", либо "хост: порт: локальный адрес". | ||
| + | |||
| + | Например: | ||
| + | |||
| + | <code javascript> | ||
| + | var http = require('http'); | ||
| + | var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin | ||
| + | var request = http.request({..., agent: agent}, ...); | ||
| + | |||
| + | </code> | ||
| + | |||
| + | Если вы делаете несколько запросов к одному и тому же источнику, вам также может быть полезно установить для ''keepAlive'' значение true (дополнительную информацию см. В документации выше). | ||
| + | |||
| + | ====iterator==== | ||
| + | |||
| + | Если вы знаете, как работают итераторы и как они используются, вам не понадобится дополнительная библиотека, так как создать собственный параллелизм очень просто. Позвольте мне продемонстрировать: | ||
| + | |||
| + | <code javascript> | ||
| + | /* [Symbol.iterator]() is equivalent to .values() | ||
| + | const iterator = [1,2,3][Symbol.iterator]() */ | ||
| + | const iterator = [1,2,3].values() | ||
| + | |||
| + | |||
| + | // loop over all items with for..of | ||
| + | for (const x of iterator) { | ||
| + | console.log('x:', x) | ||
| + | |||
| + | // notices how this loop continues the same iterator | ||
| + | // and consumes the rest of the iterator, making the | ||
| + | // outer loop not logging any more x's | ||
| + | for (const y of iterator) { | ||
| + | console.log('y:', y) | ||
| + | } | ||
| + | } | ||
| + | </code> | ||
| + | |||
| + | ====p-limit MongoDB==== | ||
| + | |||
| + | Вот основной пример для потоковой передачи и "p-limit". Это потоковое чтение HTTP потока в Монго БД. | ||
| + | |||
| + | <code javascript> | ||
| + | const stream = require('stream'); | ||
| + | const util = require('util'); | ||
| + | const pLimit = require('p-limit'); | ||
| + | const es = require('event-stream'); | ||
| + | const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB; | ||
| + | |||
| + | |||
| + | const pipeline = util.promisify(stream.pipeline) | ||
| + | |||
| + | const outputDBConfig = { | ||
| + | dbURL: 'yr-db-url', | ||
| + | collection: 'some-collection' | ||
| + | }; | ||
| + | const limit = pLimit(3); | ||
| + | |||
| + | async yrAsyncStreamingFunction(readStream) => { | ||
| + | const mongoWriteStream = streamToMongoDB(outputDBConfig); | ||
| + | const mapperStream = es.map((data, done) => { | ||
| + | let someDataPromise = limit(() => yr_async_call_to_somewhere()) | ||
| + | |||
| + | someDataPromise.then( | ||
| + | function handleResolve(someData) { | ||
| + | |||
| + | data.someData = someData; | ||
| + | done(null, data); | ||
| + | }, | ||
| + | function handleError(error) { | ||
| + | done(error) | ||
| + | } | ||
| + | ); | ||
| + | }) | ||
| + | |||
| + | await pipeline( | ||
| + | readStream, | ||
| + | JSONStream.parse('*'), | ||
| + | mapperStream, | ||
| + | mongoWriteStream | ||
| + | ); | ||
| + | } | ||
| + | </code> | ||
| + | |||
| + | ====рекурсия==== | ||
| + | |||
| + | Это может быть решено с помощью рекурсии. | ||
| + | |||
| + | Идея состоит в том, что изначально вы отправляете максимально допустимое количество запросов, и каждый из этих запросов должен рекурсивно продолжать отправлять сам себя после своего завершения. | ||
| + | |||
| + | <code javascript> | ||
| + | function batchFetch(urls, concurrentRequestsLimit) { | ||
| + | return new Promise(resolve => { | ||
| + | var documents = []; | ||
| + | var index = 0; | ||
| + | |||
| + | function recursiveFetch() { | ||
| + | if (index === urls.length) { | ||
| + | return; | ||
| + | } | ||
| + | fetch(urls[index++]).then(r => { | ||
| + | documents.push(r.text()); | ||
| + | if (documents.length === urls.length) { | ||
| + | resolve(documents); | ||
| + | } else { | ||
| + | recursiveFetch(); | ||
| + | } | ||
| + | }); | ||
| + | } | ||
| + | |||
| + | for (var i = 0; i < concurrentRequestsLimit; i++) { | ||
| + | recursiveFetch(); | ||
| + | } | ||
| + | }); | ||
| + | } | ||
| + | |||
| + | var sources = [ | ||
| + | 'http://www.example_1.com/', | ||
| + | 'http://www.example_2.com/', | ||
| + | 'http://www.example_3.com/', | ||
| + | ... | ||
| + | 'http://www.example_100.com/' | ||
| + | ]; | ||
| + | batchFetch(sources, 5).then(documents => { | ||
| + | console.log(documents); | ||
| + | }); | ||
| + | </code> | ||
| + | |||
| + | ====batch-promises==== | ||
| + | |||
| + | Поэтому я попытался заставить некоторые показанные примеры работать для моего кода, но так как это было только для сценария импорта, а не для производственного кода, использование [[https://www.npmjs.com/package/batch-promises|пакетных обещаний]] пакета npm, несомненно, было для меня самым простым путем | ||
| + | |||
| + | **ПРИМЕЧАНИЕ. Требуется время выполнения для поддержки Promise или его заполнения.** | ||
| + | |||
| + | **Api** batchPromises (int: batchSize, array: Collection, я => Promise: Iteratee) Promise: Iteratee будет вызываться после каждого пакета. | ||
| + | |||
| + | **Использование:** | ||
| + | |||
| + | <code javascript> | ||
| + | batch-promises | ||
| + | Easily batch promises | ||
| + | |||
| + | NOTE: Requires runtime to support Promise or to be polyfilled. | ||
| + | |||
| + | Api | ||
| + | batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) | ||
| + | The Promise: Iteratee will be called after each batch. | ||
| + | |||
| + | Use: | ||
| + | import batchPromises from 'batch-promises'; | ||
| + | |||
| + | batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => { | ||
| + | |||
| + | // The iteratee will fire after each batch resulting in the following behaviour: | ||
| + | // @ 100ms resolve items 1 and 2 (first batch of 2) | ||
| + | // @ 200ms resolve items 3 and 4 (second batch of 2) | ||
| + | // @ 300ms resolve remaining item 5 (last remaining batch) | ||
| + | setTimeout(() => { | ||
| + | resolve(i); | ||
| + | }, 100); | ||
| + | })) | ||
| + | .then(results => { | ||
| + | console.log(results); // [1,2,3,4,5] | ||
| + | }); | ||
| + | </code> | ||
| + | |||
| + | ====Promise.race==== | ||
| + | |||
| + | Это то, что я сделал с помощью ''Promise.race'', внутри моего кода здесь | ||
| + | |||
| + | <code javascript> | ||
| + | const identifyTransactions = async function() { | ||
| + | let promises = [] | ||
| + | let concurrency = 0 | ||
| + | for (let tx of this.transactions) { | ||
| + | if (concurrency > 4) | ||
| + | await Promise.race(promises).then(r => { promises = []; concurrency = 0 }) | ||
| + | promises.push(tx.identifyTransaction()) | ||
| + | concurrency++ | ||
| + | } | ||
| + | if (promises.length > 0) | ||
| + | await Promise.race(promises) //resolve the rest | ||
| + | } | ||
| + | </code> | ||
| + | |||
| + | Если вы хотите увидеть пример: [[https://jsfiddle.net/thecodermarcelo/av2tp83o/5|https://jsfiddle.net/thecodermarcelo/av2tp83o/5/]] | ||
| + | |||
| + | ====рекурсия с внешними библиотеками==== | ||
| + | |||
| + | Рекурсия - это ответ, если вы не хотите использовать внешние библиотеки | ||
| + | |||
| + | <code javascript> | ||
| + | downloadAll(someArrayWithData){ | ||
| + | var self = this; | ||
| + | |||
| + | var tracker = function(next){ | ||
| + | return self.someExpensiveRequest(someArrayWithData[next]) | ||
| + | .then(function(){ | ||
| + | next++;//This updates the next in the tracker function parameter | ||
| + | if(next < someArrayWithData.length){//Did I finish processing all my data? | ||
| + | return tracker(next);//Go to the next promise | ||
| + | } | ||
| + | }); | ||
| + | } | ||
| + | |||
| + | return tracker(0); | ||
| + | } | ||
| + | </code> | ||
| + | |||
| + | ====async-pool==== | ||
| + | |||
| + | Я предлагаю библиотеку async-pool: [[https://github.com/rxaviers/async-pool|https://github.com/rxaviers/async-pool]] | ||
| + | |||
| + | > Выполнить несколько обещаний, возвращающих & асинхронные функции с ограниченным параллелизмом с использованием собственного ES6/ES7 asyncPool выполняет несколько обещающих возврат & асинхронные функции в ограниченном пуле параллелизма. Он отклоняет сразу же, как только одно из обещаний отклоняется. Это решает, когда все обещания завершаются. Он вызывает функцию итератора как можно скорее (при ограничении параллелизма). | ||
| + | |||
| + | Установка: | ||
| + | |||
| + | > npm установить крошечный-асинхронный пул | ||
| + | |||
| + | ====async/await==== | ||
| + | |||
| + | Это становится относительно тривиально с async/await, в зависимости от того, что вы хотите, это хорошо переносится на карту с задержкой или forEach, вот реализация карты. | ||
| + | |||
| + | <code javascript> | ||
| + | const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)) | ||
| + | |||
| + | const delayMap = async (ms, arr, f) => { | ||
| + | const results = [] | ||
| + | let i = 0 | ||
| + | for (const item of arr) { | ||
| + | results.push(await f(item, i++, arr)) | ||
| + | await sleep(ms) | ||
| + | } | ||
| + | return results | ||
| + | } | ||
| + | |||
| + | // Example use - delaying 1 second between each call | ||
| + | delayMap(1000, [ 1, 2, 3 ], id => | ||
| + | fetch(`https://jsonplaceholder.typicode.com/posts/${id}`) | ||
| + | ) | ||
| + | .then(posts => posts.map(post => post.json())) | ||
| + | .then(Promise.all.bind(Promise)) | ||
| + | .then(posts => console.log('Posts', posts)) | ||
| + | </code> | ||