Archive

Posts Tagged ‘IPC’

Child process keep alive: fork and CLD Signal

November 28th, 2007

The advantage of a process over a thread is when the process dies you can get a signal telling you the process died and recover. For any long running process, it’s always very important to be able to recover from unexpected disasters. In working on a server that’s tasked with receiving a lot of large files and spending a fair bit of timing processing those files, I needed to make sure if for some reason the file input I received caused my server to go down – I could quickly recover. Because, I communicate to this server via a pipe (instead of a socket), it’s not as simple to recover using something like monit.

The CLD signal is sent when a child process dies. This is great! All I need to do is trap that signal and start my process backup. My main concern with this is will I get stuck in an infinite loop, forking new processes because some condition has caused the child process to die every time, consuming all the resources on my system.

Here’s my solution so far:

class UploadServer
  def initialize(options = {})
    @upload_read, @upload_write = IO.pipe
    @start_up_threads = (options[:start_up_threads] || 2)
    @max_read = (options[:max_read] || 1024)
    @logger = (options[:logger] || Logger.new(STDOUT))
  end

  # starts up the upload server
  def start
    @pid = fork do
      initialize_server
      while( 1 )
        select
      end
    end
    Signal.trap(0) do
      # tell the child process to die
      Process.kill("TERM", @pid)
    end
    Signal.trap("CLD") do
      # something extremely unexpected happened and the child process died
      @logger.error( "It appears the upload background process has died... Attempting a restart..." )
      # make sure we kill of any residue from the child process is cleaned up e.g. avoid defunct process
      Process.wait(@pid)
      # this is all a little risky since someone could have been in the middle of an upload
      # they'll be cut off anyway since the process died...
      # close down open pipe
      @upload_write.close
      # create a new pipe
      @upload_read, @upload_write = IO.pipe
      # start it back up
      start
    end
    @logger.debug( "Upload Process started up on #{@pid}" )
    # close the read end on the main process
    @upload_read.close
    @pid
  end

The Process.wait(@pid) is very important otherwise we’re left with a lot of <defunct> processes. It may also help to throttle the issue of infinite forking. At the very least it means we’ll never get more then 1 child process per server. The only other thing I can imagine adding is some kind of timer to help throttle in the case that the child process dies very quickly…

Software , , ,

IO.pipe for interprocess communication

November 19th, 2007

Pipes are a really simple way to send messages from one process to another.

MAX_READ = 4

rd, wd = IO.pipe

pid = fork do
  wd.close

  msg_buffer = ""
  c = 0

  while( 1 )
    # select on the read end of the pipe
    ready = IO.select( [rd], nil, nil, 1 )
    if ready
      ready[0].each do|io|
        msg_buffer << begin
          io.read_nonblock(MAX_READ)
        rescue EOFError
          puts "the pipe was unexpectidly closed??"
          exit
        rescue Object => e
          STDERR.puts "failed with" + e.message + "\n" + e.backtrace("\n")
        end

        last_is_complete = msg_buffer.match(/\n\n$/)
        messages = msg_buffer.split("\n\n")

        # the last msg is not complete
        if !last_is_complete
          msg_buffer = messages.pop
        else
          msg_buffer = "" # reset the msg_buffer we're reading everything
        end

        messages.each do|msg|
          puts msg
          c += 1
        end

        puts c

      end
    end
  end
end

rd.close

count = 10
while( count > 0 )
  wd.write( "hello\n\nhello\n\nhello\n\n" )
  wd.write( "hello\n\n" )
  wd.write( "hello\n\nhello\n\nhello\n\n" )
  wd.write( "hello\n\n" )
  wd.write( "hello\n\n" )
  sleep 0.1
  count -= 1
end
wd.close

Software ,