Twisted offers the elegant API to spwan the child process and Protocol style API to send or receive messages via Pipes from child process using twisted.internet.protocol.ProcessProtocol
This article explains how to spwan a child process and communicate with with using stdin, stdout and stderr and how to establish the full duplex communication between parent and child
Use Case
Write a parent process which spawns the worker or child process. Upon successfully spwaning the worker process send the multiline text to the child/worker that receives and counts the number of lines, words and characters in the text that is received via stdin of child. After finished counting lines, words and chars send the output to the parent process via stdout stream of child process
Approach
Given use case scenario is implemented in such a way that first parent.py program is executed which spawns the word_counter.py. Where word_counter process becomes the child process of parent which receives the text via stdin and sends the computed result back to the parent via stdout of child
Coding
Program parent.py
import logging
from twisted.internet import reactor, protocol
from twisted.python import failure
logging.basicConfig(
format='%(asctime)s %(process)d - %(levelname)s : %(message)s',
level=logging.DEBUG
)
log = logging.getLogger('parent')
class WorkerProcess(protocol.ProcessProtocol):
def __init__(self, text):
self.text = bytes(text, encoding='ascii')
def connectionMade(self):
log.info("Connected to Process")
self.transport.write(self.text)
self.transport.closeStdin()
def outReceived(self, data: bytes):
log.info("Out Received from worker/child process: %s", data)
self.printOutput(data)
def errReceived(self, data: bytes):
log.info("Err received from worker/child process: %s", data)
def inConnectionLost(self):
log.error("inConnectionLost! stdin of child is closed")
def outConnectionLost(self):
log.error("outConnectionLost! stdout of child is closed")
def errConnectionLost(self):
log.error("errConnectionLost! stderr of child is closed")
def processExited(self, reason: failure.Failure): # Called when process ended normally or terminated
log.error("Child Process Exited with status: %s", reason.value.exitCode)
def processEnded(self, reason: failure.Failure):
""" This would called when all file descriptors of child are closed
it will be last callback to be called """
log.error("Child Process Ended with status: %s", reason.value.exitCode)
def printOutput(self, text):
lines, words, chars = text.split()
log.info("Total Lines: %s, Words: %s and Characters: %s", lines, words, chars)
if __name__ == '__main__':
text = ''' Python is great to use anyware
Python combined with twisted is unstoppable '''
process = WorkerProcess(text)
reactor.spawnProcess(process, 'python', ['python', 'word_counter.py'])
reactor.run()
Program word_counter.py
import os
import sys
import logging
from twisted.internet import threads, reactor
logging.basicConfig(
filename='worker.log',
format='%(asctime)s %(process)d - %(levelname)s : %(message)s',
level=logging.DEBUG
)
log = logging.getLogger('parent')
log.debug("Word Counter Started")
def count(text):
log.debug("Text: %s", text)
lines = text.split('\n')
lines_count = len(lines)
words = 0
for line in lines:
words += len(line.strip().split(' '))
chars = len(text)
output = '{} {} {}'.format(lines_count, words, chars)
log.info("Counts lines: %s, words: %s, chars: %s", lines_count, words, chars)
return output
def readInput():
text = ''
log.info("Reading Input....")
while not sys.stdin.closed:
c = sys.stdin.read(8)
text += c
# log.debug("read ....%s Empty: %s", c, c=='')
if c =='':
break
return text
def finishedReading(result):
log.info("Input received: %s", result)
log.info("is stdin closed %s", sys.stdin.closed)
counts = count(result)
sys.stdout.write(counts)
reactor.callLater(2, reactor.stop)
# reactor.callInThread(readInput)
df = threads.deferToThread(readInput)
df.addCallback(finishedReading)
reactor.run()
Execution
First run the parent process assuming named “parent.py”
python parent.py
Since parent will automatically spwans the child process, check the logs of child process by tailing “worker.log” file in the working directory
tail -f worker.log
Parent Process

Child Process Logs “tail -f worker.log”

About Process Protocol
Process protocol offers the same kind of API that we use to communicate with TCP or UDP server. But, ProcessProtocol offers slitly different callbacks instead of dataReceived there would be outReceived, errReceived. Instead of connectionLost there would inConnectionLost, outConnectionlost and errConnectionLost. When child process is ends processExited and processEnded will be called
connectionMade | When child process successfully started |
outReceived | Called when data is received from child process via stdout stream |
errReceived | Called when data is received from child process via stderr stream |
inConnectionLost | If connection to child stdin is closed |
outConnectionLost | Called if stdout pipe of child is closed |
errConnectionLost | Called if stderr pipe of child is closed |
processExited(status) | Called when either child process is terminated or ended normally |
processEnded(status) | This is the callback to be called at last when all pipes are closed |
| This is generic method when connection to specific FD is closed. It is best used when extra FDs are passed using ChildFDs paramter to spwanProcess |
| This is generic method called to be called when connection is lost to specified FD is closed |
Following to methods are useful to write to child and close file descriptors
self.transport.writeToChild(childFD, data)
: put some data into input pipe write end having childFD file descritpro where as.write
simply writes to childFD=0.self.transport.closeChildFD(childFD)
: Close the child’s pipe with respect to specified file scriptor childFD. To indicate EOF to the child process typically we close the input pipe. Where as closing an output pipe is not recommended
Above two methods are very generic, following are helper methods that meke use of above methods and provide convinent API to deal with stdin, stdout and stderr
self.transport.closeStdin
: Close the stdin pipe.self.transport.closeStdout
: Closes the stdout pipe. Not usually called, if you close the stdout pipe, any attempt to write to stdout will cause a SIGPIPE error.self.transport.closeStderr
: Closes the stderr pipe. It is also not usually called same as closeStdout
Observations
While running these programs following things are observed
- For some reason, even though self.transport.closeStdin() is called from parent process, sys.stdin.closed flag is False in child process
- When end of the file is reached stdin typically returns the empty string which can be used to sginal end of data
Download the source code: https://github.com/get-kt/twisted-multiprocessing-ProcessProtocol