Здесь показаны различия между двумя версиями данной страницы.
| Предыдущая версия справа и слева Предыдущая версия Следующая версия | Предыдущая версия | ||
|
nodejs:steams:promise.all_restrict [2022/05/15 12:01] werwolf |
nodejs:steams:promise.all_restrict [2023/01/12 12:18] (текущий) |
||
|---|---|---|---|
| Строка 6: | Строка 6: | ||
| <code javascript> | <code javascript> | ||
| - | function getCounts() { | + | function getCounts() { |
| - | return users.map(user => { | + | return users.map(user => { |
| return new Promise(resolve => { | return new Promise(resolve => { | ||
| - | remoteServer.getCount(user) // makes an HTTP request | + | remoteServer.getCount(user) // makes an HTTP request |
| - | .then(() => { | + | .then(() => { |
| - | /* snip */ | + | /* snip */ |
| - | resolve(); | + | resolve(); |
| + | }); | ||
| }); | }); | ||
| - | }); | + | }); |
| - | }); | + | } |
| - | } | + | |
| - | Promise.all(getCounts()).then(() => { /* snip */}); | + | Promise.all(getCounts()).then(() => { /* snip */}); |
| </code> | </code> | ||
| Строка 24: | Строка 24: | ||
| - | ===== ОТВЕТЫ ===== | ||
| - | === Ответ 1 === | + | |
| + | ====es6-promise-pool==== | ||
| Обратите внимание, что ''Promise.all()'' не запускает promises, чтобы начать свою работу, создавая само обещание. | Обратите внимание, что ''Promise.all()'' не запускает promises, чтобы начать свою работу, создавая само обещание. | ||
| Строка 35: | Строка 35: | ||
| <code javascript> | <code javascript> | ||
| - | // On the Web, leave out this line and use the script tag above instead. | + | // On the Web, leave out this line and use the script tag above instead. |
| - | var PromisePool = require('es6-promise-pool') | + | var PromisePool = require('es6-promise-pool') |
| - | var promiseProducer = function () { | + | var promiseProducer = function () { |
| - | // Your code goes here. | + | // Your code goes here. |
| - | // If there is work left to be done, return the next work item as a promise. | + | // If there is work left to be done, return the next work item as a promise. |
| - | // Otherwise, return null to indicate that all promises have been created. | + | // Otherwise, return null to indicate that all promises have been created. |
| - | // Scroll down for an example. | + | // Scroll down for an example. |
| - | } | + | } |
| - | // The number of promises to process simultaneously. | + | // The number of promises to process simultaneously. |
| - | var concurrency = 3 | + | var concurrency = 3 |
| - | // Create a pool. | + | // Create a pool. |
| - | var pool = new PromisePool(promiseProducer, concurrency) | + | var pool = new PromisePool(promiseProducer, concurrency) |
| - | // Start the pool. | + | // Start the pool. |
| - | var poolPromise = pool.start() | + | var poolPromise = pool.start() |
| - | // Wait for the pool to settle. | + | // Wait for the pool to settle. |
| - | poolPromise.then(function () { | + | poolPromise.then(function () { |
| - | console.log('All promises fulfilled') | + | console.log('All promises fulfilled') |
| - | }, function (error) { | + | }, function (error) { |
| - | console.log('Some promise rejected: ' + error.message) | + | console.log('Some promise rejected: ' + error.message) |
| - | }) | + | }) |
| </code> | </code> | ||
| - | === Ответ 2 === | + | ====p-limit==== |
| **С-Концевой** | **С-Концевой** | ||
| Строка 80: | Строка 80: | ||
| <code javascript> | <code javascript> | ||
| - | const pLimit = require('p-limit'); | + | const pLimit = require('p-limit'); |
| - | // Example Concurrency of 3 promise at once | + | // Example Concurrency of 3 promise at once |
| - | const limit = pLimit(3); | + | const limit = pLimit(3); |
| - | let urls = [ | + | let urls = [ |
| "http://www.exampleone.com/", | "http://www.exampleone.com/", | ||
| "http://www.exampletwo.com/", | "http://www.exampletwo.com/", | ||
| "http://www.examplethree.com/", | "http://www.examplethree.com/", | ||
| "http://www.examplefour.com/", | "http://www.examplefour.com/", | ||
| - | ] | + | ] |
| - | // Create an array of our promises using map (fetchData() returns a promise) | + | // Create an array of our promises using map (fetchData() returns a promise) |
| - | let promises = urls.map(url => { | + | let promises = urls.map(url => { |
| // wrap the function we are calling in the limit function we defined above | // wrap the function we are calling in the limit function we defined above | ||
| return limit(() => fetchData(url)); | return limit(() => fetchData(url)); | ||
| - | }); | + | }); |
| - | (async () => { | + | (async () => { |
| // Only three promises are run at once (as defined above) | // Only three promises are run at once (as defined above) | ||
| const result = await Promise.all(promises); | const result = await Promise.all(promises); | ||
| console.log(result); | console.log(result); | ||
| - | })(); | + | })(); |
| </code> | </code> | ||
| Результат журнала консоли - это массив данных ответа ваших разрешенных обещаний. | Результат журнала консоли - это массив данных ответа ваших разрешенных обещаний. | ||
| - | === Ответ 3 === | + | ====bluebird==== |
| [[http://bluebirdjs.com/docs/api/promise.map.html|Bluebird Promise.map]] может использовать параметр параллелизма, чтобы контролировать, сколько обещаний должно выполняться параллельно. Иногда это проще, чем ''.all'' потому что вам не нужно создавать массив обещаний. | [[http://bluebirdjs.com/docs/api/promise.map.html|Bluebird Promise.map]] может использовать параметр параллелизма, чтобы контролировать, сколько обещаний должно выполняться параллельно. Иногда это проще, чем ''.all'' потому что вам не нужно создавать массив обещаний. | ||
| <code javascript> | <code javascript> | ||
| - | const Promise = require('bluebird') | + | const Promise = require('bluebird') |
| - | function getCounts() { | + | function getCounts() { |
| - | return Promise.map(users, user => { | + | return Promise.map(users, user => { |
| return new Promise(resolve => { | return new Promise(resolve => { | ||
| - | remoteServer.getCount(user) // makes an HTTP request | + | remoteServer.getCount(user) // makes an HTTP request |
| - | .then(() => { | + | .then(() => { |
| - | /* snip */ | + | /* snip */ |
| - | resolve(); | + | resolve(); |
| + | }); | ||
| }); | }); | ||
| - | }); | + | }, {concurrency: 10}); // <---- at most 10 http requests at a time |
| - | }, {concurrency: 10}); // <---- at most 10 http requests at a time | + | } |
| - | } | + | |
| </code> | </code> | ||
| - | === Ответ 4 === | + | ====http.Agent({maxSockets: 5})==== |
| Вместо использования обещаний для ограничения http-запросов используйте встроенный узел [[https://nodejs.org/api/http.html#http_agent_maxsockets%23http_agent_maxsockets|http.Agent.maxSockets]]. Это устраняет необходимость использования библиотеки или написания собственного кода пула и дает дополнительное преимущество, позволяющее лучше контролировать то, что вы ограничиваете. | Вместо использования обещаний для ограничения http-запросов используйте встроенный узел [[https://nodejs.org/api/http.html#http_agent_maxsockets%23http_agent_maxsockets|http.Agent.maxSockets]]. Это устраняет необходимость использования библиотеки или написания собственного кода пула и дает дополнительное преимущество, позволяющее лучше контролировать то, что вы ограничиваете. | ||
| Строка 137: | Строка 137: | ||
| <code javascript> | <code javascript> | ||
| - | var http = require('http'); | + | var http = require('http'); |
| - | var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin | + | var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin |
| - | var request = http.request({..., agent: agent}, ...); | + | var request = http.request({..., agent: agent}, ...); |
| </code> | </code> | ||
| Если вы делаете несколько запросов к одному и тому же источнику, вам также может быть полезно установить для ''keepAlive'' значение true (дополнительную информацию см. В документации выше). | Если вы делаете несколько запросов к одному и тому же источнику, вам также может быть полезно установить для ''keepAlive'' значение true (дополнительную информацию см. В документации выше). | ||
| - | === Ответ 5 === | + | ====iterator==== |
| Если вы знаете, как работают итераторы и как они используются, вам не понадобится дополнительная библиотека, так как создать собственный параллелизм очень просто. Позвольте мне продемонстрировать: | Если вы знаете, как работают итераторы и как они используются, вам не понадобится дополнительная библиотека, так как создать собственный параллелизм очень просто. Позвольте мне продемонстрировать: | ||
| Строка 150: | Строка 151: | ||
| <code javascript> | <code javascript> | ||
| /* [Symbol.iterator]() is equivalent to .values() | /* [Symbol.iterator]() is equivalent to .values() | ||
| - | const iterator = [1,2,3][Symbol.iterator]() */ | + | const iterator = [1,2,3][Symbol.iterator]() */ |
| - | const iterator = [1,2,3].values() | + | const iterator = [1,2,3].values() |
| - | // loop over all items with for..of | + | // loop over all items with for..of |
| - | for (const x of iterator) { | + | for (const x of iterator) { |
| - | console.log('x:', x) | + | console.log('x:', x) |
| - | + | ||
| - | // notices how this loop continues the same iterator | + | // notices how this loop continues the same iterator |
| - | // and consumes the rest of the iterator, making the | + | // and consumes the rest of the iterator, making the |
| - | // outer loop not logging any more x's | + | // outer loop not logging any more x's |
| - | for (const y of iterator) { | + | for (const y of iterator) { |
| console.log('y:', y) | console.log('y:', y) | ||
| - | } | + | } |
| - | } | + | } |
| </code> | </code> | ||
| - | === Ответ 6 === | + | ====p-limit MongoDB==== |
| Вот основной пример для потоковой передачи и "p-limit". Это потоковое чтение HTTP потока в Монго БД. | Вот основной пример для потоковой передачи и "p-limit". Это потоковое чтение HTTP потока в Монго БД. | ||
| <code javascript> | <code javascript> | ||
| - | const stream = require('stream'); | + | const stream = require('stream'); |
| - | const util = require('util'); | + | const util = require('util'); |
| - | const pLimit = require('p-limit'); | + | const pLimit = require('p-limit'); |
| - | const es = require('event-stream'); | + | const es = require('event-stream'); |
| - | const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB; | + | const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB; |
| - | const pipeline = util.promisify(stream.pipeline) | + | const pipeline = util.promisify(stream.pipeline) |
| - | const outputDBConfig = { | + | const outputDBConfig = { |
| dbURL: 'yr-db-url', | dbURL: 'yr-db-url', | ||
| collection: 'some-collection' | collection: 'some-collection' | ||
| - | }; | + | }; |
| - | const limit = pLimit(3); | + | const limit = pLimit(3); |
| - | async yrAsyncStreamingFunction(readStream) => { | + | async yrAsyncStreamingFunction(readStream) => { |
| - | const mongoWriteStream = streamToMongoDB(outputDBConfig); | + | const mongoWriteStream = streamToMongoDB(outputDBConfig); |
| - | const mapperStream = es.map((data, done) => { | + | const mapperStream = es.map((data, done) => { |
| - | let someDataPromise = limit(() => yr_async_call_to_somewhere()) | + | let someDataPromise = limit(() => yr_async_call_to_somewhere()) |
| - | someDataPromise.then( | + | someDataPromise.then( |
| - | function handleResolve(someData) { | + | function handleResolve(someData) { |
| - | data.someData = someData; | + | data.someData = someData; |
| - | done(null, data); | + | done(null, data); |
| - | }, | + | }, |
| - | function handleError(error) { | + | function handleError(error) { |
| - | done(error) | + | done(error) |
| - | } | + | } |
| - | ); | + | ); |
| - | }) | + | }) |
| - | await pipeline( | + | await pipeline( |
| - | readStream, | + | readStream, |
| - | JSONStream.parse('*'), | + | JSONStream.parse('*'), |
| - | mapperStream, | + | mapperStream, |
| - | mongoWriteStream | + | mongoWriteStream |
| - | ); | + | ); |
| - | } | + | } |
| </code> | </code> | ||
| - | === Ответ 7 === | + | ====рекурсия==== |
| Это может быть решено с помощью рекурсии. | Это может быть решено с помощью рекурсии. | ||
| Строка 220: | Строка 221: | ||
| <code javascript> | <code javascript> | ||
| - | function batchFetch(urls, concurrentRequestsLimit) { | + | function batchFetch(urls, concurrentRequestsLimit) { |
| return new Promise(resolve => { | return new Promise(resolve => { | ||
| - | var documents = []; | + | var documents = []; |
| - | var index = 0; | + | var index = 0; |
| - | function recursiveFetch() { | + | function recursiveFetch() { |
| - | if (index === urls.length) { | + | if (index === urls.length) { |
| - | return; | + | return; |
| - | } | + | } |
| - | fetch(urls[index++]).then(r => { | + | fetch(urls[index++]).then(r => { |
| - | documents.push(r.text()); | + | documents.push(r.text()); |
| - | if (documents.length === urls.length) { | + | if (documents.length === urls.length) { |
| - | resolve(documents); | + | resolve(documents); |
| - | } else { | + | } else { |
| - | recursiveFetch(); | + | recursiveFetch(); |
| - | } | + | } |
| - | }); | + | }); |
| - | } | + | } |
| - | for (var i = 0; i < concurrentRequestsLimit; i++) { | + | for (var i = 0; i < concurrentRequestsLimit; i++) { |
| - | recursiveFetch(); | + | recursiveFetch(); |
| - | } | + | } |
| }); | }); | ||
| - | } | + | } |
| - | var sources = [ | + | var sources = [ |
| 'http://www.example_1.com/', | 'http://www.example_1.com/', | ||
| 'http://www.example_2.com/', | 'http://www.example_2.com/', | ||
| Строка 251: | Строка 252: | ||
| ... | ... | ||
| 'http://www.example_100.com/' | 'http://www.example_100.com/' | ||
| - | ]; | + | ]; |
| - | batchFetch(sources, 5).then(documents => { | + | batchFetch(sources, 5).then(documents => { |
| - | console.log(documents); | + | console.log(documents); |
| - | }); | + | }); |
| </code> | </code> | ||
| - | === Ответ 8 === | + | ====batch-promises==== |
| Поэтому я попытался заставить некоторые показанные примеры работать для моего кода, но так как это было только для сценария импорта, а не для производственного кода, использование [[https://www.npmjs.com/package/batch-promises|пакетных обещаний]] пакета npm, несомненно, было для меня самым простым путем | Поэтому я попытался заставить некоторые показанные примеры работать для моего кода, но так как это было только для сценария импорта, а не для производственного кода, использование [[https://www.npmjs.com/package/batch-promises|пакетных обещаний]] пакета npm, несомненно, было для меня самым простым путем | ||
| Строка 268: | Строка 269: | ||
| <code javascript> | <code javascript> | ||
| - | batch-promises | + | batch-promises |
| - | Easily batch promises | + | Easily batch promises |
| - | NOTE: Requires runtime to support Promise or to be polyfilled. | + | NOTE: Requires runtime to support Promise or to be polyfilled. |
| - | Api | + | Api |
| - | batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) | + | batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) |
| - | The Promise: Iteratee will be called after each batch. | + | The Promise: Iteratee will be called after each batch. |
| - | Use: | + | Use: |
| - | import batchPromises from 'batch-promises'; | + | import batchPromises from 'batch-promises'; |
| - | + | ||
| - | batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => { | + | batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => { |
| - | + | ||
| - | // The iteratee will fire after each batch resulting in the following behaviour: | + | // The iteratee will fire after each batch resulting in the following behaviour: |
| - | // @ 100ms resolve items 1 and 2 (first batch of 2) | + | // @ 100ms resolve items 1 and 2 (first batch of 2) |
| - | // @ 200ms resolve items 3 and 4 (second batch of 2) | + | // @ 200ms resolve items 3 and 4 (second batch of 2) |
| - | // @ 300ms resolve remaining item 5 (last remaining batch) | + | // @ 300ms resolve remaining item 5 (last remaining batch) |
| - | setTimeout(() => { | + | setTimeout(() => { |
| resolve(i); | resolve(i); | ||
| - | }, 100); | + | }, 100); |
| - | })) | + | })) |
| - | .then(results => { | + | .then(results => { |
| - | console.log(results); // [1,2,3,4,5] | + | console.log(results); // [1,2,3,4,5] |
| - | }); | + | }); |
| </code> | </code> | ||
| - | === Ответ 9 === | + | ====Promise.race==== |
| Это то, что я сделал с помощью ''Promise.race'', внутри моего кода здесь | Это то, что я сделал с помощью ''Promise.race'', внутри моего кода здесь | ||
| <code javascript> | <code javascript> | ||
| - | const identifyTransactions = async function() { | + | const identifyTransactions = async function() { |
| - | let promises = [] | + | let promises = [] |
| - | let concurrency = 0 | + | let concurrency = 0 |
| - | for (let tx of this.transactions) { | + | for (let tx of this.transactions) { |
| if (concurrency > 4) | if (concurrency > 4) | ||
| - | await Promise.race(promises).then(r => { promises = []; concurrency = 0 }) | + | await Promise.race(promises).then(r => { promises = []; concurrency = 0 }) |
| promises.push(tx.identifyTransaction()) | promises.push(tx.identifyTransaction()) | ||
| concurrency++ | concurrency++ | ||
| - | } | + | } |
| - | if (promises.length > 0) | + | if (promises.length > 0) |
| await Promise.race(promises) //resolve the rest | await Promise.race(promises) //resolve the rest | ||
| - | } | + | } |
| </code> | </code> | ||
| Если вы хотите увидеть пример: [[https://jsfiddle.net/thecodermarcelo/av2tp83o/5|https://jsfiddle.net/thecodermarcelo/av2tp83o/5/]] | Если вы хотите увидеть пример: [[https://jsfiddle.net/thecodermarcelo/av2tp83o/5|https://jsfiddle.net/thecodermarcelo/av2tp83o/5/]] | ||
| - | === Ответ 10 === | + | ====рекурсия с внешними библиотеками==== |
| Рекурсия - это ответ, если вы не хотите использовать внешние библиотеки | Рекурсия - это ответ, если вы не хотите использовать внешние библиотеки | ||
| <code javascript> | <code javascript> | ||
| - | downloadAll(someArrayWithData){ | + | downloadAll(someArrayWithData){ |
| - | var self = this; | + | var self = this; |
| - | var tracker = function(next){ | + | var tracker = function(next){ |
| return self.someExpensiveRequest(someArrayWithData[next]) | return self.someExpensiveRequest(someArrayWithData[next]) | ||
| .then(function(){ | .then(function(){ | ||
| - | next++;//This updates the next in the tracker function parameter | + | next++;//This updates the next in the tracker function parameter |
| - | if(next < someArrayWithData.length){//Did I finish processing all my data? | + | if(next < someArrayWithData.length){//Did I finish processing all my data? |
| - | return tracker(next);//Go to the next promise | + | return tracker(next);//Go to the next promise |
| - | } | + | } |
| }); | }); | ||
| - | } | + | } |
| - | return tracker(0); | + | return tracker(0); |
| - | } | + | } |
| </code> | </code> | ||
| - | === Ответ 11 === | + | ====async-pool==== |
| Я предлагаю библиотеку async-pool: [[https://github.com/rxaviers/async-pool|https://github.com/rxaviers/async-pool]] | Я предлагаю библиотеку async-pool: [[https://github.com/rxaviers/async-pool|https://github.com/rxaviers/async-pool]] | ||
| Строка 348: | Строка 349: | ||
| > npm установить крошечный-асинхронный пул | > npm установить крошечный-асинхронный пул | ||
| - | === Ответ 12 === | + | ====async/await==== |
| Это становится относительно тривиально с async/await, в зависимости от того, что вы хотите, это хорошо переносится на карту с задержкой или forEach, вот реализация карты. | Это становится относительно тривиально с async/await, в зависимости от того, что вы хотите, это хорошо переносится на карту с задержкой или forEach, вот реализация карты. | ||
| <code javascript> | <code javascript> | ||
| - | const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)) | + | const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)) |
| - | const delayMap = async (ms, arr, f) => { | + | const delayMap = async (ms, arr, f) => { |
| - | const results = [] | + | const results = [] |
| - | let i = 0 | + | let i = 0 |
| - | for (const item of arr) { | + | for (const item of arr) { |
| results.push(await f(item, i++, arr)) | results.push(await f(item, i++, arr)) | ||
| await sleep(ms) | await sleep(ms) | ||
| - | } | + | } |
| - | return results | + | return results |
| - | } | + | } |
| - | // Example use - delaying 1 second between each call | + | // Example use - delaying 1 second between each call |
| - | delayMap(1000, [ 1, 2, 3 ], id => | + | delayMap(1000, [ 1, 2, 3 ], id => |
| - | fetch(`https://jsonplaceholder.typicode.com/posts/${id}`) | + | fetch(`https://jsonplaceholder.typicode.com/posts/${id}`) |
| - | ) | + | ) |
| - | .then(posts => posts.map(post => post.json())) | + | .then(posts => posts.map(post => post.json())) |
| - | .then(Promise.all.bind(Promise)) | + | .then(Promise.all.bind(Promise)) |
| - | .then(posts => console.log('Posts', posts)) | + | .then(posts => console.log('Posts', posts)) |
| </code> | </code> | ||