понедельник, 1 июля 2013 г.

Python: примеры и тесты, часть 5 - процессы

Параллельные процессы.

В составе модулей Python декларируется весьма много средств для запуска нового процесса из кода, многие из которых - альтернатива друг другу:
  • отдельный модуль subprocess, реализующий класс Popen;
  • отдельный модуль popen2, реализующий класс Popen3;
  • функции из модуля os группы system(), popen(), popen2(), popen3(), popen4() - выполняющие командную строку в новом экземпляре командного интерпретатора;
  • многочисленные функции из модуля os групп exec*() и spawn*() - подменяющие текущий процесс на указанный;
  • клонирование процесса вызовом fork() из модуля os; 
Наверняка существует ещё множество альтернативных реализаций, представленных сторонними производителями, что обуславливается лёгкостью тиражирования программного обеспечения для Python.

Все доступные средства запуска новых процессов отчётливо разделяются на три категории: а). запуск экземпляра командного интерпретатора (bash), который запустит процесс (system(), popen(), ...), б). подмена кода  текущего адресного процесса на код нового процесса (exec*() и spawn*()), в). создание копии текущего адресного пространства выполняющегося процесса (fork()). Первые две возможности оперируют с двоичным исполнимым форматом в файле, и полностью перекладывают выполнение своей деятельности на функциональность операционной системы. Меня для тестирования больше всего интересовала именно последняя возможность - создание клона процесса вызовом fork() (о причине почему "больше всего" чуть позже). Вот пример подобной программы (файл fork.py):

#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import time
import sys
import getopt
from rdtsc import rdtsc
 
delay = 1
procnum = 2
debuglevel = 0
 
opts, args = getopt.getopt( sys.argv[1:], "p:d:v" )
for opt, arg in opts:  # опции (ключи) командной строки
    if 0 == cmp( opt[ 1: ], 'p' ): procnum = int( arg )
    if 0 == cmp( opt[ 1: ], 'd' ): delay = int( arg )
    if 0 == cmp( opt[ 1: ], 'v' ): debuglevel = debuglevel + 1
 
childs = []
if debuglevel : print "родительский процесс %i" % os.getpid()
for i in range( 0, procnum ) :
    tim = rdtsc();
    try :
        pid = os.fork();
    except :
        print "error: create child process"
        sys.exit( 33 )
    if pid == 0 :      # в коде дочернего процесса
        trun = rdtsc() - tim;
        if debuglevel :
            print "дочерний процесс %i - циклов процессора на запуск: %u" % \
                  ( os.getpid(), trun );
        time.sleep( delay )
        trun = rdtsc() - tim;
        if debuglevel :
            print "дочерний процесс %i - время завершения: %u" % \
                  ( os.getpid(), trun );
        sys.exit( 3 )
    if pid > 0 :
      # в коде родительского процесса
        childs.append( pid )
        if debuglevel :

            print "%i: создан новый дочерний процесс %i" % ( os.getpid(), pid )
 
print "ожидание завершения дочерних процессов ..."
for p in childs :
    pid, status = os.wait()
    if debuglevel :

        print "код завершения процесса %i = %i" % ( pid, os.WEXITSTATUS( status ) )
print "все порождённые процессы успешно завершены"


Вот как выполняется этот код с детализированным уровнем отладочного вывода:

$ ./fork.py -p3 -d2 -v
родительский процесс 17743
17743: создан новый дочерний процесс 17744
17743: создан новый дочерний процесс 17745
17743: создан новый дочерний процесс 17746
ожидание завершения дочерних процессов ...
дочерний процесс 17746 - циклов процессора на запуск: 1136190
дочерний процесс 17745 - циклов процессора на запуск: 2535930
дочерний процесс 17744 - циклов процессора на запуск: 4064020
дочерний процесс 17746 - время завершения: 3330095370
дочерний процесс 17745 - время завершения: 3334456990
дочерний процесс 17744 - время завершения: 3342243510
код завершения процесса 17746 = 3
код завершения процесса 17745 = 3
код завершения процесса 17744 = 3
все порождённые процессы успешно завершены


Здесь много чего интересного: и последовательность старта выполнения созданных процессов, и порядок их завершения работы, и то, что интервал времени до старта создаваемого процесса имеет в точности тот порядок величины времени, что и при запуске потока, а это значит, что основные затраты производительности ложатся не на системные API создания параллельных ветвей выполнения, а на накладные расходы исполняющей системы Python...

А вот его выполнение для очень большого числа порождённых процессов:

$ ./fork.py -p100
ожидание завершения дочерних процессов ...
все порождённые процессы успешно завершены
$ ./fork.py -p500
ожидание завершения дочерних процессов ...
все порождённые процессы успешно завершены
$ ./fork.py -p721
ожидание завершения дочерних процессов ...
все порождённые процессы успешно завершены
$ echo $?
0
$ ./fork.py -p722
error: create child process
$ echo $?
33

Теперь возвратимся к сказанному ранее, что этот случай клонирования процесса самый любопытный... Почему?
  • во-первых, потому, что fork() - это самое сердце серверных технологий UNIX на протяжении десятилетий...
  • во-вторых, потому, что интересен вопрос: в интерпретирующей (исполняющей) системе Python что клонируется, какой процесс?
Смотрим (одновременно в двух терминалах):

$ ./fork.py -p5 -d20
ожидание завершения дочерних процессов ...
все порождённые процессы успешно завершены
$ ps -A | grep 'fork.py'
6882 pts/1 00:00:00 fork.py
6883 pts/1 00:00:00 fork.py
6884 pts/1 00:00:00 fork.py
6885 pts/1 00:00:00 fork.py
6886 pts/1 00:00:00 fork.py
6887 pts/1 00:00:00 fork.py

Как и следовало ожидать, клонируются экзепляры виртуальной машины Python (исполняющей байт-код системы), внутри которых уже, в свою очередь, выполняются копии приложения fork.py.

Ну и, в завершение, другой, более элементарный пример того, как из кода Python запускается новый экземпляр командного интерпретатотора (bash), выполняющий, в свою очередь, заказанное ему приложение:

  • родительский процесс (файл parent.py), передающий дочерним процессам работу по поиску вхождений в файлы текстовых фрагментов, заданных переменной word:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import subprocess
import sys
 
child = os.path.join( os.path.dirname(__file__), "./child.py" )
word = 'word'
file = [ './parent.py', './child.py' ]
pipes = []
 
for i in range( 0, 2 ):
    command = [ sys.executable, child ]
    pipe = subprocess.Popen( command, stdin=subprocess.PIPE )
    pipes.append( pipe )
    pipe.stdin.write( word.encode( "utf8" ) + b"\n" )
    pipe.stdin.write( file[ i ].encode( "utf8" ) + b"\n" )
    pipe.stdin.close()
 
while pipes:
    pipe = pipes.pop()
    pipe.wait()
  • порождённый процесс (файл child.py), причём, и то что искать (сигнатуру) и то где искать (имя файла для поиска) этот процесс получает из входного потока от своего родительского процесса:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
 
word = sys.stdin.readline().rstrip()
filename = sys.stdin.readline().rstrip()
try:
    with open( filename, "rb" ) as fh:
    while True:
        current = fh.readline()
    if not current:
        break
    if ( word in current ):
        print( "find: {0} {1}".format( filename, word ) )
except :
    pass


И вот что, в итоге, мы увидим:

$ ./parent.py
find: ./parent.py word
find: ./parent.py word
find: ./child.py word
find: ./child.py word
find: ./child.py word



Python: примеры и тесты, часть 4 - потоки

Опции командной строки.

Это немаловажная деталь для построения консольных приложений в стиле UNIX: обработка опций в командной строке. Для этого предоставляется модуль getopt, который позволяет обрабатывать опции на манер того, как делает одноимённый вызов C. Обрабатываются как короткие опции (односимвольные с одним предшествующим '-'), так и длинные (многосимвольные с двумя предшествующими '--'). Можно задавать опции как с требуемым значением, так и без. В общем: все удовольствия.

Примеры использования будут показаны ниже , при запуске многопоточных приложений.

Потоки и синхронизация.

В описаниях Python везде упоминаются модуль реализации потоков низкого уровня - thread, и модуль реализации потоков высокого уровня - threading, надстроенный над модулем thread. Почти наверняка, доступно ещё много альтернативных реализаций от сторонних разработчиков, что обуславливается лёгкостью разработки тиражирования программного кода Python. Но все они, в конечном итоге, являются обёртками к pthread_t из POSIX API.

Реализация низкого уровня - модуль thread. Модуль крайне скудно описан в документации и литературе, так что пример кода использования я нигде не встретил, и его пришлось писать "с нуля". В модуле thread очень бедные возможности синхронизации - один единственный примитив (LockType = class lock), нечто на манер самого простейшего бинарного мютекса, ограничивающего критическую секцию кода. Для того, чтобы дождаться окончания дочерних потоков (реализовать "барьер", один из наиболее частых случаев) нужно строить искусственную конструкцию, по типу счётного семафора (файл tlspeed.py):

#!/usr/bin/python -O
# -*- coding: utf-8 -*-
from rdtsc import rdtsc
from calibr import calibr, delay_in_cycle
import getopt
import sys
import thread
import time
import string
 
debuglevel = 0
threadnum = 2                     # заказанное число порождаемых потоков
delay = 1
active = 0
numt = 0                          # текущее число активных дочерних потоков
lock = thread.allocate_lock()     # блокировка доступа к числу активных дочерних потоков
wait = thread.allocate_lock()     # блокировка ожидания завершения всех дочерних потоков
barier = { 'numt' : numt, 'lock' : lock, 'wait' : wait }
 
def thrfun( delay, num, tstart ): # функция потока
    st = rdtsc() - tstart
    barier[ 'numt' ] = barier[ 'numt' ] + 1
    barier[ 'lock' ].release()
    ss = "\t%i : %i <= старт: %u" % ( num, id, st )
    if not active : time.sleep( delay )
    else : delay_in_cycle( delay )
    barier[ 'lock' ].acquire()
    barier[ 'numt' ] = barier[ 'numt' ] - 1
    st = rdtsc() - tstart
    print "%s - финиш: %u" % ( ss, st )
    if 0 == barier[ 'numt' ] :
        barier[ 'wait' ].release()
    barier[ 'lock' ].release()
    return
 
opts, args = getopt.getopt( sys.argv[1:], "vt:d:a" )
for opt, arg in opts:             # опции (ключи) командной строки (-v, -t, -d, -a)
    if 0 == cmp( opt[ 1: ], 'v' ): debuglevel = debuglevel + 1
    if 0 == cmp( opt[ 1: ], 't' ): threadnum = string.atoi( arg )
    if 0 == cmp( opt[ 1: ], 'd' ): delay = string.atoi( arg )
    if 0 == cmp( opt[ 1: ], 'a' ): active = 1
if debuglevel > 0 :
    print opts
    print args
    print debuglevel
    print threadnum
 
barier[ 'wait' ].acquire()        # захват блокировки завершения
for n in range( threadnum ):      # запуск threadnum потоков
    barier[ 'lock' ].acquire()
    id = thread.start_new_thread( thrfun, ( delay, n, rdtsc() ) )
    print "\t%i : %i =>" % ( n, id )
barier[ 'wait' ].acquire()        # ожидание завершения всех потоков
print "завершены все %i потоков, \

       завершается ожидавший главный поток" % threadnum

Вот как выглядит исполнение этого примера (3 дочерних потока, каждый из которых выполняется по 3 секунды):

$ ./tlspeed.py -t3 -d3
0 : -1219589312 =>
1 : -1229980864 =>
2 : -1240466624 =>
0 : -1219589312 <= старт: 316450 - финиш: 4993234920
2 : -1240466624 <= старт: 235850 - финиш: 4992812200
1 : -1229980864 <= старт: 162580 - финиш: 4993124550
завершены все 3 потоков, завершается ожидавший главный поток


Здесь довольно странные (для меня) отрицательный значения в качестве идентификаторов отдельных потоков, хотя в документации сказано, что это просто числовое значение ... и, наверное, такие значение тоже имеют право быть.

Альтернативный вариант, это то, что декларируется как реализация высокого уровня - модуль threading. Здесь всё гораздо проще (но в кое-чём и сложнее ... например в передаче параметров в потоковую функцию уже после создания объекта класса Thread, непосредственно перед запуском на выполнение). Но в этом модуле уже представлено достаточно много самых разных примитивов синхронизации:  Lock, RLock, Condition, Semaphore, Event, Queue - это сильно облегчает описание взаимодействий потоков. Пример, эквивалентный предыдущему, может выглядеть как-то так:

#!/usr/bin/python -O
# -*- coding: utf-8 -*-
from rdtsc import rdtsc
from calibr import calibr, delay_in_cycle
import getopt
import sys
import threading
import time
import string
 
debuglevel = 0
threadnum = 2
delay = 1
active = 0
 
def thrfun( *args ):     
   # функция потока
    st = rdtsc() - args[ 2 ] # время старта потока
    ss = "\t%i : %s <= старт: %u" % \
         ( args[ 1 ], threading.currentThread().getName(), st )
    if not active : time.sleep( args[ 0 ] )
    else : delay_in_cycle( args[ 0 ] )
    st = rdtsc() - args[ 2 ] # время завершения потока
    print "%s - финиш: %u" % ( ss, st )
    return
 
opts, args = getopt.getopt( sys.argv[1:], "vt:d:a" )
for opt, arg in opts:        # опции (ключи) командной строки (-v, -t, -d, -a)
    if 0 == cmp( opt[ 1: ], 'v' ): debuglevel = debuglevel + 1
    if 0 == cmp( opt[ 1: ], 't' ): threadnum = string.atoi( arg )
    if 0 == cmp( opt[ 1: ], 'd' ): delay = string.atoi( arg )
    if 0 == cmp( opt[ 1: ], 'a' ): active = 1
if debuglevel > 0 :
    print opts
    print args
    print debuglevel
    print threadnum
 
threads = []
for n in range( threadnum ): # создание и запуск потоков
    parm = [ delay, n, 0 ]
    t = threading.Thread( target=thrfun, args=parm )
    threads.append( t )
    t.setDaemon( 1 )
    print "\t%i : %s =>" % ( n, t.getName() )
    parm[ 2 ] = rdtsc()
    t.start()
for n in range( threadnum ): # ожидание завершения всех потоков
    threads[ n ].join()
print "завершены все %i потоков, \

       завершается ожидавший главный поток" % threadnum

Вот как выглядит выполнение такого варианта (4 потока, выполняющиеся по 2 секунды), теперь вместо численных идентификаторов потока для их различения возникают имена потоков:

$ ./thspeed.py -t4 -d2
0 : Thread-1 =>
1 : Thread-2 =>
2 : Thread-3 =>
3 : Thread-4 =>
0 : Thread-1 <= старт: 558390 - финиш: 3329127080
2 : Thread-3 <= старт: 247300 - финиш: 3328592890
3 : Thread-4 <= старт: 211740 - финиш: 3328294390
1 : Thread-2 <= старт: 312410 - финиш: 3328573940
завершены все 4 потоков, завершается ожидавший главный поток

Обратите внимание на порядок старта потоков (временные метки) и на порядок их завершения. И в примере с низкоуровневым модулем thread, и здесь, в примере с высокоуровневым модулем threading, код допускает в качестве паузы выполнения потока использовать не пассивное ожидание (time.sleep()), а активное ожидание выполнением пустых циклов. При этом картина достаточно радикально меняется, что всегда возникает в параллелизмах (но это я оставлю для анализа любознательному уму заинтересованных читателей):

$ ./thspeed.py -t4 -d2 -a
0 : Thread-1 =>
1 : Thread-2 =>
2 : Thread-3 =>
3 : Thread-4 =>
1 : Thread-2 <= старт: 12039350 - финиш: 3339395210
0 : Thread-1 <= старт: 683100 - финиш: 3340604010
2 : Thread-3 <= старт: 21162120 - финиш: 3346219540
3 : Thread-4 <= старт: 41049890 - финиш: 3366116170
завершены все 4 потоков, завершается ожидавший главный поток


Функция активной задержки (delay_in_cycle()) реализована в собственном модуле calibr.py (он упоминался раньше) как:

def delay_in_cycle( delay = 1 ):
    t = time.time()
    while time.time() - t < delay :
        time.localtime()


Реализация потоков в Python вызывает ряд вопросов:

  • Является ли сама исполняющая система Python (виртуальная машина, исполняющая байт-код) многопоточной? То есть, распараллеливается ли многопоточное Python-приложение по числу доступных на оборудовании процессоров (ядер)? Мои наблюдения в системном мониторе показывают, что, похоже, это так и есть. 
  • Переключение между потоками Python-приложение должно происходить не по системному таймеру (как это происходит с потоками POSIX), а по полному завершению выполнения очередного байт-кода в потоке (об этом сказано и не в одном месте в документации). Более того, частота, с которой интерпретатор проверяет и переключает потоки, может быть переустановлена вызовом sys.setcheckinterval(). По умолчанию (утверждается) интерпретатор проверяет переключение потоков каждые 10 команд байт-кода (чем запросить эту величину из интерпретатора я не нашёл). Имеет ли эта модель какие-то последствия?