Параллельное программирование в shell

в 14:34, , рубрики: Песочница, метки: ,

Добрый день.
Представляю маленькую, кросс платформенную (тестировалось в Linux и NetBSD) библиотеку функций и пример ее использования.
Основная задача — облегчить разработку скриптов на shell, выполняющих несколько параллельных, независимых задач и обеспечить синхронизацию запущенных процессов.

Структура программы при использование данной библиотеки выглядит следующим образом:
Основной процесс – подготавливает задания и распределяет их по рабочим процессам.
Рабочие процессы — выполняют эти задания и сообщают о своей готовности принять новое основному процессу.
Основное преимущество данной схемы в том, что нет постоянного разветвления процессов (fork()).
Я не претендую на оригинальность идеи, да и реализации тоже, но я надеюсь, что это будет познавательно и интересно.

Библиотека состоит из трех функций.
1. pjobs_init ()
Принимает два параметра: имя функции (оно же станет Идентификатором задания) и количество запускаемых рабочих процессов.
На этом этапе выполняется вся подготовительная работа, создается управляющий канал один на всех. По этому каналу основной процесс будет отслеживать готовность рабочих процессов к выполнению.
Для каждого рабочего процесса создается свой канал, по которому он будет ждать команды от основного процесса.
2. pjobs_send ()
Принимает два параметра: идентификатор задания и строку – задание, которое будет передано одному из рабочих процессов по готовности.
3. pjobs_exit ()
Принимает параметр: идентификатором задания.
Дает команду и ждет завершения всех рабочих процессов.

Библиотека не отслеживает возможность рабочего процесса вернуть ошибку, это сделать несложно, но еще больше запутает код. Не заботится о безопасности, каналы создаются в /tmp беззаботно.

С помощью этой библиотеки мне удалось ускорить работу простой, программы разбора директории в 1.5 раза, если задача более сложная, можно ожидать большего.

Библиотека:

####!/bin/sh
####set -eu

# Used getfd()
MAXFD=256
MINFD=3

# Lookup a firstly free file descriptor.
# Name of the return value must be in $1.
getfd () {
  local Curfd=$MINFD
  while [ $Curfd -le $MAXFD ]; do
    eval "exec 1>&$Curfd &" 
    if wait $!
    then
      Curfd=$((Curfd + 1))
      continue
    else
      eval "$1=$Curfd $Curfd>&1"
      return 0
    fi
  done 1>/dev/null 2>&1
  echo Error: getfd have not found a free file descriptor.
  exit 1
}

# Parallel jobs control inner function.
# Wait command from (fd = $2), for execute func in $1,
# ready symbol must be in $4 and ready channel in (fd = $3).
pjobs_wait_cmd_for_job_ () {
  local Cmd
  while :; do
    eval "echo $4 >&$3"
    if eval "read Cmd <&$2"; then
      $1 "$Cmd"
    else
      break
    fi
  done
  exit 0
}

# Jobs control(jobs and function names are in $1).
# Init $2(1..N) parallel jobs with the executable 
# function in $1, called through the pjobs_send().
pjobs_init () {
  local PREF=${1}$$
  eval ${PREF}Jt=$2
  local Rfd
  local Cfd

  getfd Rfd
  eval ${PREF}Readyfd=$Rfd
  mkfifo /tmp/$PREF.rch
  eval "exec $Rfd<>/tmp/$PREF.rch"
  rm /tmp/$PREF.rch

  local Jn=1
  while [ $Jn -le $2 ]; do
    getfd Cfd
    eval ${PREF}${Jn}Cmdfd=$Cfd
    mkfifo /tmp/$PREF.cch
    eval "pjobs_wait_cmd_for_job_ $1 $Cfd $Rfd ${PREF}${Jn}Cmdfd $Cfd</tmp/$PREF.cch &"
    eval "exec $Cfd>/tmp/$PREF.cch"
    rm /tmp/$PREF.cch
    Jn=$((Jn + 1))
  done
  return 0  
}

# Send $2 toward a first ready job(jobs name in $1).
pjobs_send () {
  local PREF=${1}$$
  local Rfd
  eval Rfd=$${PREF}Readyfd
  local Rsym
  local Cfd
  eval "read Rsym <&$Rfd"
  eval Cfd=$$Rsym
  eval "echo "$2" >&$Cfd"
}

pjobs_exit () {
  local PREF=${1}$$
  local Jt
  eval Jt=$${PREF}Jt
  local Cfd
  local Jn=1
  while [ $Jn -le $Jt ]; do
    eval Cfd=$${PREF}${Jn}Cmdfd
    eval "exec $Cfd>&-"
    Jn=$((Jn + 1))
  done
  wait
}

Пример использования:

#!/bin/sh

# Reverse scan directory in $1 and save filenames:
# in "ShortList" if directory has less than 10 files
# in "BigList" if directory has more than 10 files
# Number of parallel jobs in $2.

set -eu

. ./functions.lib

skip_name() {
if [ ! -e "$1" -o "${1##*/}" = "." -o "${1##*/}" = ".." ]; then
  return 0
fi
return 1
}

exec 3>>ShortList 4>>BigList

dir_cmd () {
local Fn
for Fn in "$@"; do
  if ! skip_name "$Fn"; then
    if [ $# -lt 12 ]; then
      echo "$Fn" >&3
    else
      echo "$Fn" >&4
    fi
  fi
done
}

dir_get () {
  dir_cmd "$1"/* "$1"/.*
}

dir_parse () {
local Fn
pjobs_send dir_get "$1"
for Fn in "$1"/* "$1"/.*; do
  if skip_name "$Fn"; then
    continue
  fi
  if [ -d "$Fn" ]; then
    dir_parse "$Fn"
  fi
done
}


pjobs_init dir_get $2
dir_parse "${1%%/}"
pjobs_exit dir_get

Поделиться

* - обязательные к заполнению поля