Python/Twisted Example for Parent and Child Process Communication using Pipes

neotam Avatar

Python/Twisted Example for Parent and Child Process Communication using Pipes
Posted on :
,

Twisted offers the elegant API to spwan the child process and Protocol style API to send or receive messages via Pipes from child process using twisted.internet.protocol.ProcessProtocol

This article explains how to spwan a child process and communicate with with using stdin, stdout and stderr and how to establish the full duplex communication between parent and child

Use Case

Write a parent process which spawns the worker or child process. Upon successfully spwaning the worker process send the multiline text to the child/worker that receives and counts the number of lines, words and characters in the text that is received via stdin of child. After finished counting lines, words and chars send the output to the parent process via stdout stream of child process 

Approach

Given use case scenario is implemented in such a way that first parent.py program is executed which spawns the word_counter.py. Where word_counter process becomes the child process of parent which receives the text via stdin and sends the computed result back to the parent via stdout of child

Coding

Program parent.py

import logging

from twisted.internet import reactor, protocol
from twisted.python import failure

logging.basicConfig(
    format='%(asctime)s %(process)d - %(levelname)s : %(message)s',
    level=logging.DEBUG
)
log = logging.getLogger('parent')

class WorkerProcess(protocol.ProcessProtocol):

    def __init__(self, text):
        self.text = bytes(text, encoding='ascii')

    def connectionMade(self):
        log.info("Connected to Process")
        self.transport.write(self.text)
        self.transport.closeStdin()


    def outReceived(self, data: bytes):
        log.info("Out Received from worker/child process: %s", data)
        self.printOutput(data)

    def errReceived(self, data: bytes):
        log.info("Err received from worker/child process: %s", data)

    def inConnectionLost(self):
        log.error("inConnectionLost!  stdin of child is closed")

    def outConnectionLost(self):
        log.error("outConnectionLost! stdout of child is closed")

    def errConnectionLost(self):
        log.error("errConnectionLost! stderr of child is closed")

    def processExited(self, reason: failure.Failure):  # Called when process ended normally or terminated
        log.error("Child Process Exited with status: %s", reason.value.exitCode)

    def processEnded(self, reason: failure.Failure):
        """ This would called when all file descriptors of child are closed
            it will be last callback to be called """
        log.error("Child Process Ended with status: %s", reason.value.exitCode)

    def printOutput(self, text):
        lines, words, chars = text.split()
        log.info("Total Lines: %s, Words: %s and Characters: %s", lines, words, chars)

if __name__ == '__main__':
    text = ''' Python is great to use anyware
            Python combined with twisted is unstoppable '''
    process = WorkerProcess(text)
    reactor.spawnProcess(process, 'python', ['python', 'word_counter.py'])
    reactor.run()

Program word_counter.py

import os
import sys
import logging

from twisted.internet import threads, reactor

logging.basicConfig(
    filename='worker.log',
    format='%(asctime)s %(process)d - %(levelname)s : %(message)s',
    level=logging.DEBUG
)
log = logging.getLogger('parent')
log.debug("Word Counter Started")


def count(text):
    log.debug("Text: %s", text)
    lines = text.split('\n')
    lines_count = len(lines)
    words = 0
    for line in lines:
        words += len(line.strip().split(' '))
    chars = len(text)
    output = '{} {} {}'.format(lines_count, words, chars)
    log.info("Counts lines: %s, words: %s, chars: %s", lines_count, words, chars)
    return output

def readInput():
    text = ''
    log.info("Reading Input....")
    while not sys.stdin.closed:
        c = sys.stdin.read(8)
        text += c
        # log.debug("read ....%s  Empty: %s", c, c=='')
        if c =='':
            break
    return text


def finishedReading(result):
    log.info("Input received: %s", result)
    log.info("is stdin closed %s", sys.stdin.closed)
    counts = count(result)
    sys.stdout.write(counts)
    reactor.callLater(2, reactor.stop)

# reactor.callInThread(readInput)
df = threads.deferToThread(readInput)
df.addCallback(finishedReading)
reactor.run()

Execution

First run the parent process assuming named “parent.py”

python parent.py 

Since parent will automatically spwans the child process, check the logs of child process by tailing “worker.log” file in the working directory

tail -f worker.log 

Parent Process

Child Process Logs “tail -f worker.log”

About Process Protocol

Process protocol offers the same kind of API that we use to communicate with TCP or UDP server. But, ProcessProtocol offers slitly different callbacks instead of dataReceived there would be outReceived, errReceived. Instead of connectionLost there would inConnectionLost, outConnectionlost and errConnectionLost. When child process is ends processExited and processEnded will be called

connectionMadeWhen child process successfully started
outReceivedCalled when data is received from child process via stdout stream
errReceivedCalled when data is received from child process via stderr stream
inConnectionLostIf connection to child stdin is closed
outConnectionLostCalled if stdout pipe of child is closed
errConnectionLostCalled if stderr pipe of child is closed
processExited(status)Called when either child process is terminated or ended normally
processEnded(status)This is the callback to be called at last when all pipes are closed
childConnectionLost(childFD)This is generic method when connection to specific FD is closed. It is best used when extra FDs are passed using ChildFDs paramter to spwanProcess
childDataReceived(childFD, data):This is generic method called to be called when connection is lost to specified FD is closed

Following to methods are useful to write to child and close file descriptors

  • self.transport.writeToChild(childFD, data): put some data into input pipe write end having childFD file descritpro where as  .write simply writes to childFD=0.
  • self.transport.closeChildFD(childFD): Close the child’s pipe with respect to specified file scriptor childFD. To indicate EOF to the child process typically we close the input pipe. Where as closing an output pipe is not recommended

Above two methods are very generic, following are helper methods that meke use of above methods and provide convinent API to deal with stdin, stdout and stderr

  • self.transport.closeStdin: Close the stdin pipe.
  • self.transport.closeStdout: Closes the stdout pipe. Not usually called, if you close the stdout pipe, any attempt to write to stdout will cause a SIGPIPE error.
  • self.transport.closeStderr: Closes the stderr pipe. It is also not usually called same as closeStdout

Observations

While running these programs following things are observed

  • For some reason, even though self.transport.closeStdin() is called from parent process, sys.stdin.closed flag is False in child process
  • When end of the file is reached stdin typically returns the empty string which can be used to sginal end of data

Download the source code: https://github.com/get-kt/twisted-multiprocessing-ProcessProtocol

Leave a Reply

Your email address will not be published. Required fields are marked *