Простой DICOM клиент на GO с балансировщиком задач и веб-интерфейсом

от автора


Привет Хабр! В последнее время я очень сильно увлекся разработкой на языке GO. Изящный и выразительный язык программирования. Мне давно хотелось сделать что-нибудь полезное. По специфике своей работы мне приходится работать с медицинскими архивами DICOM-изображений PACS.

Я решил, что пришло время создать свой dicom-клиент с (блэкджеком..) веб-интерфейсом, который может выполнять следующие стандартные операции:

  • Dicom ping;
  • Cкачивание исследований;
  • Загрузка исследования,
  • А также поиск по реквизитам

(c-echo, c-move,c-store,c-find соотвествено).
В качестве dicom-библиотеки была выбрана библиотека GrassRoot SDK. Наш клиент будет распараллеливать задачи. Язык go для этого хорошо адаптирован

Похожий сценарий работы был описан habrahabr.ru/post/198150/.
Наш сценарий несколько отличается:
У нас есть некий балансировщик задач, который получает задания dicom-сервиса, проверяет возможность выполнения и асинхронно их выполняет. Для того чтобы не было ситуации, когда параллельно выполняется 1000 задач, мы реализуем очередь задач таким образом, чтобы были активные задачи и те, которые находятся в спящем состоянии. По умолчанию только 10 задач будут активными. В противном случае мы могли бы обойтись без баллансировщика вообще, тупо параллельно выполнить 1000 задач параллельно без какого либо контроля.
Весь код балансировщика находится в файле job_ballancer.go.

В начале идет описание интерфейсов обработчиков. В случае если работа была выполнена успешно, в случае если вернулась ошибка и сам процесс обработки задачи.

type JobDispatcher interface { 	Dispatch(interface{}) (interface{}, error) } type ErrDispatcher interface { 	DispatchError(FaJob) error } type CompDispatcher interface { 	DispatchSuccess(CompJob) error } 

Когда мы создаем экземпляр диспетчера, мы его инициализируем соответствующими обработчиками.

srv.jbBal.Init(&srv.dDisp, srv, srv)   //Сама структура балансировщика    <source lang="go"> type JobBallancer struct { 	jChan    chan interface{}	//канал в который мы передаем задания 	acJob    map[string]Job		//список активных работ 	slJob    map[string]Job		//список неактивных работ 	errDisp  ErrDispatcher		//обработчик работ завершившихся error 	jobDisp  JobDispatcher		//обработчик заданий 	compDisp CompDispatcher		//обработчик успешно выполненных работ 	JbDone   sync.WaitGroup		//ожидаем завершения всех работ 	aJobC    int			//количество параллельных (активных) }  //инициализация балансировщика func (jbal *JobBallancer) Init(jdis JobDispatcher, cmd CompDispatcher, erd ErrDispatcher) { 	jbal.errDisp = erd 	jbal.jobDisp = jdis 	jbal.compDisp = cmd 	jbal.acJob = make(map[string]Job) 	jbal.slJob = make(map[string]Job) 	jbal.aJobC = 10 	jbal.jChan = make(chan interface{}) 	go jbal.takeJob()		//запускаем поток в котором осуществляем 							//балансировку работ 	log.Println("info: job ballancer inited") } добавление новой работы в очередь. Операция асинхронная, т.е. работа отсылается в канал, а функция takeJob подбирает ее от туда. <source lang="go"> func (jbal *JobBallancer) PushJob(jdat interface{}) error { 	if jbal.checkInit() { 		return errors.New("error: JobChan is not inited") 	} 	uid := genUid() 	job := Job{JobId: uid, Data: jdat} 	jbal.jChan <- job 	return nil  }    func (jbal *JobBallancer) takeJob() { 	for { 		//извлекаем работу из канала 		recivedTask := <-jbal.jChan 		log.Println("info: job taken") 		switch job := recivedTask.(type) { 		case TermJob: 			//если мы получаем сигнал на завершение выходим из функции 			log.Println("info: recive terminate dispatch singal") 			return 		case Job: //обычная обработка (если все слоты активных работ заняты то работа записывается в список не активных работ) 			if len(jbal.acJob) < jbal.aJobC { 				jbal.JbDone.Add(1) 				jbal.addActiveJob(job) 				go jbal.startJob(job) 				log.Println("info: normal dispatch") 			} else { 				jbal.addSleepJob(job) 				jbal.JbDone.Add(1) 				log.Println("info: attend maximum active job") 			} 		case CompJob: 			//работа завершилась успехом 			if err := jbal.compDisp.DispatchSuccess(job); err != nil { 				log.Println("error: failed dispatch success" + job.Job.JobId) 			} 			//удачно завершившуюся работу можно удалить из списка 			jbal.removeJob(job.Job.JobId) 			jbal.JbDone.Done() 			jbal.resumeJobs() 		case FaJob: 			//работа завершилась ошибкой 			if err := jbal.errDisp.DispatchError(job); err != nil { 				log.Println("error: failed dispatch error" + job.Job.JobId) 			} 			//завершившуюся работу можно удалить из списка 			jbal.removeJob(job.Job.JobId) 			jbal.JbDone.Done() 			jbal.resumeJobs() 		default: 			log.Fatalln("error: unknown job type") 			jbal.JbDone.Done() 		} 	} } //функция удаления работы func (jbal *JobBallancer) removeJob(jid string) error { 	if _, isFind := jbal.acJob[jid]; isFind { 		delete(jbal.acJob, jid) 	} else { 		return errors.New("error: can't remove job because job with id not found") 	} 	return nil }   //функция позволяющая правильно завершить работу балансировщика, в случае если  есть работы которые не завершены, функция будет ожидать их завершения func (jbal *JobBallancer) TerminateTakeJob() error { 	if jbal.checkInit() { 		return errors.New("error: is not inited") 	} 	jbal.JbDone.Wait() 	jbal.jChan <- TermJob{} 	close(jbal.jChan) 	if len(jbal.acJob) > 0 { 		return errors.New("error: list job is not empty") 	} 	log.Println("info: greacefully terminate take job") 	return nil } 

Остальные вспомогательные функции мы не будем рассматривать. полный код можно посмотреть
github.com/Loafter/dtools/blob/master/dcmjsser/job_ballancer.go

Не смотря, что код не сложный и я долго обдумывал его. Но все равно для проверки надежности я реализовал нагрузочный тест на десятки задач:

testJobDispatcher := TestJobDispatcher{} 	testErrorDispatcher := TestErrorDispatcher{} 	testSuccessDispatcher := TestCompletedDispatcher{} 	jobBallancer := JobBallancer{} 	jobBallancer.Init(&testJobDispatcher, &testSuccessDispatcher, 		&testErrorDispatcher) 	for i := 0; i < 40; i++ { 		jobBallancer.PushJob("data: " + strconv.Itoa(i)) 	} 	jobBallancer.TerminateTakeJob() 

Он отработал нормально. Все задачи были выполнены, а функция TerminateTakeJob завершилась тогда, когда все задачи были выполненны. Для контроля отработаных задач используется объект синхронизации sync.WaitGroup JbDone, который ведет подсчет количества выполненных работ. Как я уже отмечал выше, код балансировщика является универсальным и для того чтобы наш балансировщик работал по-другому, нам достаточно проинстанцировать его соотвествующими обработчиками.

Как и в прошлой своей поделке) (http://habrahabr.ru/post/247727/) интерефейс приложения я реализовал в виде веб-интерфейса.

Для теста я использовал публичный dicom-архив 213.165.94.158:11112. С него можно скачивать исследования, если есть прямой айпи и если на стороне клиента открыт порт 11112. Так же я проверил работу на свободном dicom-архиве dcm4che sourceforge.net/projects/cdmedicpacsweb/files/latest/download?source=files.
Мне удалось собрать рабочую версию для Linux, к сожалению собрать под Widows мне не удалось. Библиотека grassroot успешно собралась, но ошибка возникает при линковке самого приложения.
cmd/ld: Malformed PE file: Unexpected flags for PE section.

Об этой ошибке много написано тут: github.com/golang/go/issues/4069.
К сожалению я не настолько знаком с тонкостями сборки и поэтому получилась версия только под Linux. Может «хабра-эфект» сдвинет с мертвой точки и эту проблему. Для пользователей Windows, которые хотят проверить и посмотреть как это работает, я подготовил виртуальную машину на базе CoreOS (https://yadi.sk/d/y81KC-tyfar6A). В демо машине наш dicom-клиент работает как systemd-сервис.
При наличии желания, можно например реализовать сервис, который выкачивает исследования с различных dicom-узлов, и выкладывает в zip-архиве для скачивания. Для управления сервисом можно использовать json-сообщения, так же, как делает наш GUI.
А можно поступить, как поступил я: прикрутить в наше приложение какой-нибудь веб-просмотровщик на базе html5.


Github: github.com/Loafter/dtools
Версия Linux-amd64: github.com/Loafter/dtools/releases/download/1.0/dcmjsser

ссылка на оригинал статьи http://habrahabr.ru/post/254581/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *