fund_processes = []
Fund.all.each do |fund|
fund_processes[fund.id] = spawn do
... process start(fund) ...
end
end
wait(fund_processes)
This will not cause any real issues as long as you wish to block to wait for these to finish processing, and that there are a reasonable number of Funds in the system. The issue comes when you do something stupid without thinking about it such as request a set of forks for all users in a 150,000 user system. That is right, you just created 150,000 processes attempting to use the same memory and CPU time, database, and IO as the rest of the system. A "think-o" of this magnitude can bring down your system. Trust me, I know. It is not a pretty site when you lock yourself out of your local job box because the system (8 cores, 12gb ram) does not have enough cycles free to respond to a ssh request. Time to wait and hope, not something you ever want to do with a production system.
To address this concern, I was directed to write up some code that would allow you to pass Spawn a limit on the number of processes to create at any one time. This extension of Spawn allows you to use the same arguments as Spawn itself, with one additional parameter, group_size. Here is the new code that can be found at my git repo.
each_in_parallel_groups_of creates blocks of processing groups with a limit on them. Truly strait forward. One thing to keep in mind when using this is that each group will wait for the longest running process in its group to finish before moving on to the next X items to process.
each_in_parallel_groups_of can be called on any Enumerable. This is a nice trick to call on say:
Funds.all.each_in_parallel_groups_of(5) do |fund|
... process_start(fund) ...
end
This makes it easier to make sure that you keep in mind the amount of resources that any one code block can absorb.
Here is the main code change in the Spawn fork. A request is in to get it pulled into mainline. I will keep you up to date on that.
#Spawns a limited number of threads / forks and waits for the entire group to finish
# accepts same spawn options as spawn
# Robert Meyer / Dean Radcliffe
def each_in_parallel_groups_of(group_size=10, spawn_opts={}, &block)
spawn_opts[:argv] ||= spawn_opts[:process_label] || "default_ruby_fork_process"
spawn_opts[:method] ||= :fork
raise LocalJumpError unless block_given?
self.each_slice(group_size) do |group_job|
fork_ids = [] # reset for each loop
group_job.compact.each_with_index do |item, index|
fork_ids[index] = spawn( spawn_opts ) do
block.call(item)
end
end
logger.info "Wating for #{Process.pid}" if defined? logger
wait(fork_ids)
logger.info "Done for #{Process.pid}" if defined? logger
end
end
<\code>
No comments:
Post a Comment