c# - Processing requests at a maximum rate -
i'm using rx ensure our backend obeys third-party api's request limits.
the implementation below utilizes simple subject<t>
input queue tamed using james world's custom pace operator.
this works, long throttledrequests
not observed on main-thread enforced observeon(taskpoolscheduler.default)
.
as comment out line (line 61), program behaves if pace
operator not used @ , request again processed fast queued up. can explain behavior?
using system; using system.linq; using system.reactive.concurrency; using system.reactive.linq; using system.reactive.subjects; using system.reactive.threading.tasks; using system.threading; using system.threading.tasks; namespace consoleapplication1 { public static class observableextensions { /// <summary> /// james world's pace operater (see https://stackoverflow.com/a/21589238/88513) /// </summary> public static iobservable<t> pace<t>(this iobservable<t> source, timespan interval) { return source.select(i => observable.empty<t>() .delay(interval) .startwith(i)) .concat(); } } class program { isubject<int> requests; iobservable<int> throttledrequests; private task<t> queuerequest<t>(int work, func<int, task<t>> dowork) { var task = throttledrequests .where(x => x == work) .take(1) .selectmany(dowork) .totask(); // queue requests.onnext(work); return task; } private task<int> dorequest(int x) { console.writeline("{0:t}: dorequest({1}) on tid {2}", datetime.utcnow, x, thread.currentthread.managedthreadid); return task.fromresult(x); } private void run() { // initialize request queue requests = new subject<int>(); // create derived rate-limited queue throttledrequests = requests .pace(timespan.frommilliseconds(1000)) .publish() .refcount() .observeon(taskpoolscheduler.default); console.writeline("main tid: {0}", thread.currentthread.managedthreadid); int = 0; while (true) { // queue number of requests var tasks = enumerable.range(i * 10, 10) .select(x => queuerequest(x, dorequest)) .toarray(); task.waitall(tasks); console.readline(); i++; } } static void main(string[] args) { new program().run(); } } }
i cannot answer question in full (not sure why runs runs on threadpoolscheduler) i'll give thoughts , show how fix run expected or without threadpoolscheduler.
first might notice on threadpoolscheduler not work correctly - first 1-3 items processed without delay. why after start processing delay still not clear me though. reason. consider following sample code:
var result = observable.range(0, 10).delay(timespan.fromseconds(10)).startwith(1).take(1).totask().result;
here, there no delay , task completed immediatly. why? because startwith immediatly injects "1" @ beginning of sequence , take(1) takes value , completes - there no reason continue sequence, delay never executed. if use take(2) example instead - delay 10 seconds before completion.
for exact same reason, code never enters delay (you can verify debugger selecting after delay , logging console example). fix, remove take(1) (or change take(2) example) - there 1 item each key anyway. when that, code run correctly or without threadpoolscheduler.
Comments
Post a Comment