c# - Processing requests at a maximum rate -


i'm using rx ensure our backend obeys third-party api's request limits.

the implementation below utilizes simple subject<t> input queue tamed using james world's custom pace operator.

this works, long throttledrequests not observed on main-thread enforced observeon(taskpoolscheduler.default).

as comment out line (line 61), program behaves if pace operator not used @ , request again processed fast queued up. can explain behavior?

using system; using system.linq; using system.reactive.concurrency; using system.reactive.linq; using system.reactive.subjects; using system.reactive.threading.tasks; using system.threading; using system.threading.tasks;  namespace consoleapplication1 {     public static class observableextensions     {         /// <summary>         /// james world's pace operater (see https://stackoverflow.com/a/21589238/88513)         /// </summary>         public static iobservable<t> pace<t>(this iobservable<t> source, timespan interval)         {             return source.select(i => observable.empty<t>()                     .delay(interval)                     .startwith(i))                 .concat();         }     }      class program     {         isubject<int> requests;         iobservable<int> throttledrequests;          private task<t> queuerequest<t>(int work, func<int, task<t>> dowork)         {             var task = throttledrequests                 .where(x => x == work)                 .take(1)                 .selectmany(dowork)                 .totask();              // queue             requests.onnext(work);              return task;         }          private task<int> dorequest(int x)         {             console.writeline("{0:t}: dorequest({1}) on tid {2}", datetime.utcnow, x, thread.currentthread.managedthreadid);             return task.fromresult(x);         }          private void run()         {             // initialize request queue             requests = new subject<int>();              // create derived rate-limited queue             throttledrequests = requests                 .pace(timespan.frommilliseconds(1000))                 .publish()                 .refcount()                 .observeon(taskpoolscheduler.default);              console.writeline("main tid: {0}", thread.currentthread.managedthreadid);              int = 0;              while (true)             {                 // queue number of requests                 var tasks = enumerable.range(i * 10, 10)                     .select(x => queuerequest(x, dorequest))                     .toarray();                  task.waitall(tasks);                  console.readline();                 i++;             }         }          static void main(string[] args)         {             new program().run();         }     } } 

i cannot answer question in full (not sure why runs runs on threadpoolscheduler) i'll give thoughts , show how fix run expected or without threadpoolscheduler.

first might notice on threadpoolscheduler not work correctly - first 1-3 items processed without delay. why after start processing delay still not clear me though. reason. consider following sample code:

var result = observable.range(0, 10).delay(timespan.fromseconds(10)).startwith(1).take(1).totask().result; 

here, there no delay , task completed immediatly. why? because startwith immediatly injects "1" @ beginning of sequence , take(1) takes value , completes - there no reason continue sequence, delay never executed. if use take(2) example instead - delay 10 seconds before completion.

for exact same reason, code never enters delay (you can verify debugger selecting after delay , logging console example). fix, remove take(1) (or change take(2) example) - there 1 item each key anyway. when that, code run correctly or without threadpoolscheduler.


Comments

Popular posts from this blog

java - Date formats difference between yyyy-MM-dd'T'HH:mm:ss and yyyy-MM-dd'T'HH:mm:ssXXX -

c# - Get rid of xmlns attribute when adding node to existing xml -