Хочу поделиться своим опытом в разработке автономного ftp клиента.
В наличии имеется ftp-сервер, на котором периодически появляются данные в виде графических изображений и текстовых файлов, их размер варьируется от десятков килобайт до пары гигабайт. Доступ в интернет может быть через провод, а может быть через GSM-свисток или вообще по спутнику, то есть стабильным и нестабильным соответственно. Во втором случае резко повышается вероятность потери соединения из-за погодных условий, географического положения и т.п.
Итак, требования к клиенту следующие:
- Опрос ftp-сервера на наличие новых файлов и их последующая загрузка.
- В случае внезапной остановки загрузки (будь то обрыв соединения, или упадёт система, на которой стоит мой ftp-клиент), закачка должна продолжиться при первой возможности.
- Ограничение скорости загрузки (это связано со стоимостью трафика по GSM).
Если интересен мой способ решения задачи, прошу под кат!
Для удобства можно разбить всю статью на ключевые этапы работы клиента, с примерами кода и более подробным описанием тонкостей работы.
Постановка задачи
Подумав, я решил написать клиент, который работает по следующей схеме:
- Стучимся на сервер, получаем список файлов.
- Смотрим в свою историю загрузок, если файл отсутствует в истории, то добавляем файл в очередь загрузки.
- Если файл по каким-то причинам не удалось скачать, он отправляется в конец очереди загрузки.
- Успешно загруженный файл добавляется в историю.
И некоторые особенности:
- История хранится runtime и дублируется в xml-файл, откуда можно будет восстановить историю
- Клиент поддерживает загрузку нескольких файлов одновременно в разных потоках
Периодический опрос сервера и получение списка файлов
Решение периодического опроса сервера приходит на ум почти сразу — запустить таймер, в котором будет заключён метод получения списка файлов. Однако, сервер имеет немного своеобразную структуру каталогов. Если говорить коротко, то на сервере есть две папки — notify и files. В папке files хранятся сами данные, которые требуется скачать, и все они имеют уникальные имена по типу FILE_ID_xxx, где х — любая цифра. Папка notify содержит xml-файлы с описанием файлов из папки files, в т. ч. их настоящее имя, дату размещения на сервере и размер.
Прочитав все xml из папки notify, я формирую коллекцию из нехитрого FileItem:
public class FileItem { [XmlAttribute(AttributeName = "RemoteUri")] public string RemoteUri; [XmlAttribute(AttributeName = "SavePath")] public string SavePath; [XmlAttribute(AttributeName = "Date")] public string Date; [XmlAttribute(AttributeName = "RefId")] public string RefId; [XmlAttribute(AttributeName = "Name")] public string Name; [XmlAttribute(AttributeName = "Extention")] public string Extention; [XmlAttribute(AttributeName = "Size")] public long Size; }
А далее, пробегаясь по коллекции, проверяем, присутствует ли файл в истории загрузок, и не загружается ли он в данный момент
foreach (var df in dataFiles) { if (!FileHistory.FileExists(df) && !client.AlreadyInProgress(df)) { client.DownloadFile(df); } }
Вот и всё. Опрос сервера и поиск новых файлов закончен. О том, кто такие FileHistory и client — напишу далее.
Загрузка файлов в несколько потоков
"client" в коде выше — это экземпляр класса FTPClient, занимающегося только загрузкой файлов с сервера. И по факту FTPClient — моя обертка FtpWebRequest.
FTPClient имеет в себе потокобезопасную очередь, называемую «очередью загрузки»:
private ConcurrentQueue<FileItem> downloadQueue;
Итак, что происходит при вызове метода DownloadFile:
public void DownloadFile(FileItem file) { downloadQueue.Enqueue(file); StartDownloadTask(); }
Всё довольно просто — файл добавляется в очередь на загрузку, и вслед за этим вызывается метод, который создает задачу по загрузке файла, используя TPL. Вот как это выглядит:
private void StartDownloadTask() { if (currentActiveDownloads <= Settings.MaximumDownloadThreads) { FileItem file; if (!downloadQueue.IsEmpty && downloadQueue.TryDequeue(out file)) { Task t; if (File.Exists(file.SavePath)) { FileInfo info = new FileInfo(file.SavePath); var currentSize = info.Length; t = new Task(() => DownloadTask(file, currentSize)); } else { t = new Task(() => DownloadTask(file, 0)); } t.ContinueWith(OnTaskComplete); t.Start(); Interlocked.Increment(ref currentActiveDownloads); lock (inProgressLock) { inProgress.Add(file); } } }
Говоря русским языком, сначала проверим сколько уже работает тасков по загрузке файла, и если есть возможность пропихнуть ещё один. Затем пытаемся получить FileItem из очереди загрузки, если очередь не пуста. Потом определяем, присутствует ли файл уже локально, или нет. А локально присутствовать файл может в том случае, если загрузка неожиданно прервалась. Всё, что мы успели скачать, остаётся на диске. Так вот, в этом случае мы просто начнём загрузку с того места, на котором остановились.
Метод OnTaskComplete, который вызовется после завершения DownloadTask:
private void OnTaskComplete(Task t) { Interlocked.Decrement(ref currentActiveDownloads); StartDownloadTask(); }
То есть уменьшим счетчик активных загрузок, и попробуем начать новый таск загрузки. То есть, получается что новый таск на загрузку будет создаваться при добавлении нового файла в очередь загрузки и при завершении текущего таска на загрузку.
Теперь метод, занимающийся непосредственно загрузкой файла с сервера:
private void DownloadTask(FileItem file, long offset) { // Перед началом загрузки поставим поток на паузу. В случае, если файл не доступен по какой-то причине, то мы не будем спамить на сервер в попытках достучаться до него Thread.Sleep(10 * 1000); Log.Info(string.Format("Загружается файл {0}", file.Name)); try { if (offset == file.Size) { Log.Info(string.Format("Файл {0} уже полностью скачан.", file.Name)); FileHistory.AddToDownloadHistory(file); return; } using (var readStream = GetResponseStreamFromServer(file.RemoteUri, WebRequestMethods.Ftp.DownloadFile, offset)) { using (var writeStream = new FileStream(file.SavePath, FileMode.Append, FileAccess.Write)) { var bufferSize = 1024; var buffer = new byte[bufferSize]; int second = 1000; int timePassed = 0; var stopWatch = new Stopwatch(); var readCount = readStream.Read(buffer, 0, bufferSize); int downloadedBytes = readCount; while(readCount > 0) { // Считаем данные потока и засечём сколько на это ушло времени stopWatch.Start(); writeStream.Write(buffer, 0, readCount); readCount = readStream.Read(buffer, 0, bufferSize); stopWatch.Stop(); // Если скорость ограничена (0 считается за отсутствие ограничения) if (Settings.MaximumDownloadSpeed > 0) { var downloadLimit = (Settings.MaximumDownloadSpeed * 1024 / 8) / currentActiveDownloads; downloadedBytes += readCount; timePassed += (int)stopWatch.ElapsedMilliseconds; if (downloadedBytes >= downloadLimit) { var pause = second - timePassed; if (pause > 0) Thread.Sleep(pause); timePassed = 0; downloadedBytes = 0; stopWatch.Reset(); } if (timePassed > second) { stopWatch.Reset(); timePassed = 0; downloadedBytes = 0; } } } } } lock (inProgressLock) { inProgress.Remove(file); } FileHistory.AddToDownloadHistory(file); Log.Info(string.Format("Файл загружен - {0}", file.Name)); Interlocked.Add(ref currentLoadedSize, -file.Size); } catch (WebException e) { Log.Error(e); downloadQueue.Enqueue(file); } catch (Exception e) { Log.Error(e); } }
И метод, который формирует запрос к серверу и возвращает ответ:
private Stream GetResponseStreamFromServer(string uri, string method, long offset) { var request = (FtpWebRequest)WebRequest.Create(uri); request.UseBinary = true; request.Credentials = new NetworkCredential(Settings.Login, Settings.Password); request.Method = method; request.Proxy = null; request.KeepAlive = false; request.ContentOffset = offset; var response = request.GetResponse(); return response.GetResponseStream(); }
То есть, чтобы начать чтение потока не с начала, используется строка при формировании запроса: ё
request.ContentOffset = offset;
А ограничение скорости работает следующим образом: первым делом расчитаем downloadLimit, сколько байт может загрузить текущий поток. Учитывается общее ограничение скорости и количество активных потоков загрузки. Затем читаем поток 1024 байта. Засекли, сколько времени это заняло (timePassed). Общее количество считанных байт записывается в downloadedBytes.
При превышении лимита ставим поток на паузу на оставшееся время до конца секунды:
var pause = second - timePassed; if (pause > 0) Thread.Sleep(pause);
По истечению секунды счётчики обнуляются.
И в случае WebExeption файл снова добавится в очередь загрузки. А в историю файл попадёт только после успешного завершения.
История загрузок
Хранение истории загрузок в файле пригодится на тот случай, если приложение вдруг перезапустится, и история, хранимая runtime будет потеряна.
Внутри класс FileHistory имеет коллекцию, хранящую в себе FileItem, которые мы уже успешно скачали:
private static List<FileItem> downloadHistory;
Добавление файла происходит очень просто — мы добавляем файл в коллекцию и сразу записываем изменения в xml:
public static void AddToDownloadHistory(FileItem file) { lock (historyLock) { XmlSerializer serializer = new XmlSerializer(typeof(List<FileItem>)); using (var writer = GetXml()) { downloadHistory.Add(file); serializer.Serialize(writer, downloadHistory); } } }
И вот что происходит, когда мы хотим проверить наличие файла в истории:
public static bool FileExists(FileItem file) { lock (historyLock) { if (downloadHistory.Count == 0) { if (!TryRestoreHistoryFromXml()) { return false; } } return downloadHistory.Any(f => f.RefId == file.RefId); } }
Поясню — вот вызывается метод проверки. И записей в нашей коллекции ноль. Вероятно, приложение падало, и история утерялась. На этот случай попытаемся восстановить историю из xml. Если это не удается (файл отсутствует или повреждён) — считаем, что мы этот файл ещё не загружали.
Завершение
Я расчитываю, что эта статья поможет тем, кому тоже придётся писать свой ftp-клиент в первый раз, как и мне. Не претендую на то, что решение идеально. И это мой первый опыт написания статей на хабр, поэтому я открыт для критики и комментариев.
ссылка на оригинал статьи https://habrahabr.ru/post/282600/
Добавить комментарий