MATLAB: How to send data on the fly to a worker when using parfeval

MATLABparfevalpct

I'm writing an application which uses tcpip server sockets and I want it to run in the background so that my client is not blocked. I want to send commands to it at runtime not via tcpip. more specifically i tried to send a command to stop listening to incoming requests and then forced it to poll the queue (or whatever it is called) by sending a request over tcp. I don't want it to stop listening when there is an incoming request over tcp because then it can be closed by something other than my client, and I'm generally paranoid. When I tried just canceling the job from the parallel pool it gave me trouble with the socket not being closed properly. I can't seem to send anything to it using labsend and my search brought only dire news that it can't be done. Is there a way to transfer information to a worker started using parfeval at runtime (not when job starts/ends)? Or is there something else that gives me similar functionality and the ability to send information between the workers?
Oh and I don't want to manage my communication using files

Best Answer

I think it's possible to do this sort of thing by getting the workers to create a parallel.pool.PollableDataQueue, and getting the client to send messages to the worker. It's a bit awkward to set up, and in the example below, I'm going to use a parallel.pool.Constant to get the worker to hold onto the queue instance.
The general plan is this:
  1. Get the worker to construct a pollable dataqueue
  2. Send the dataqueue instance back to the client
  3. Set the worker off into an loop waiting for messages from the client
  4. Send messages from the client to get the worker to do stuff
  5. When done, get the client to send a "poison pill" which gets the worker to return the result.
Note the code below only works for a pool of size 1. To run on a larger pool, you would need to use parfevalOnAll rather than parfeval, and you'd also need to manage the multiple queues from the workers.
% First, create a parallel pool if necessary
if isempty(gcp())
parpool('local', 1);
end
% Get the worker to construct a data queue on which it can receive
% messages from the client
workerQueueConstant = parallel.pool.Constant(@parallel.pool.PollableDataQueue);
% Get the worker to send the queue object back to the client
workerQueueClient = fetchOutputs(parfeval(@(x) x.Value, 1, workerQueueConstant));
% Get the worker to start waiting for messages
future = parfeval(@doStuff, 1, workerQueueConstant);
% Send a few messages to the worker
for idx = 1:10
send(workerQueueClient, idx);
end
% Send [] as a "poison pill" to the worker to get it to stop
send(workerQueueClient, []);
% Get the result
fetchOutputs(future)
% This function gets the worker to keep processing messages from the client
function out = doStuff(qConstant)
q = qConstant.Value;
out = 0;
while true
% Wait for a message
data = poll(q, Inf);
if isempty(data)
return
else
out = out + data;
end
end
end