13
01/2016
Azure Blob(二)——块Blob上传、下载、删除(未完结)
这是Azure Blob系列文章。此前文章的列表为:
这一节,我们主要学习块Blob的相关操作。块Blob上传时,需要先把文件流划分成不大于4MB的块,待所有文件块都上传完毕之后,再调用PutBlockList方法确认,此前的块便会组织成一个Blob。若不提交,则上传的blob块一周之后自动删除。
下列的代码是上传BlockBlob的相关操作,其中包括多线程上传和单线程上传。文件上传取决于带宽,因此,单线程上传和多线程上传的效率差不多。对于小文件,单线程上传甚至比多线程上传更快。对于大文件,多线程上传比单线程上传略快一些,主要是因为多线程缓存了上传文件。
未完待续。。。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.WindowsAzure; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Auth; using Microsoft.WindowsAzure.Storage.Blob; using System.IO; namespace AzureBaseLib { public class BlockBlobClient : IBlobClient { private readonly long THRESHOLD = 100 * 1024 * 1024; //单线程小文件上传的上限 CloudBlobClient _blobClient = null; public BlockBlobClient(string connectionString) { _blobClient = CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient(); } /*************************************** * 单线程上传 **************************************/ #region 单线程上传 /// <summary> /// 从数据流中单线程上传,大文件也能上传 /// </summary> /// <param name="containerName"></param> /// <param name="blobName"></param> /// <param name="stream"></param> public void UploadFromStream(string containerName, string blobName, Stream stream) { var blockBlob = getBlockBlob(containerName, blobName); blockBlob.UploadFromStream(stream); } /// <summary> /// 从数据流中异步上传,大文件也能上传 /// </summary> /// <param name="containerName"></param> /// <param name="blobName"></param> /// <param name="stream"></param> /// <returns></returns> public Task UploadFromStreamAsync(string containerName, string blobName, Stream stream) { var blockBlob = getBlockBlob(containerName, blobName); return blockBlob.UploadFromStreamAsync(stream); } /// <summary> /// 指定文件路径,上传小文件 /// </summary> /// <param name="containerName"></param> /// <param name="blobName"></param> /// <param name="fileName"></param> public void UploadFromFile(string containerName, string blobName, string fileName) { var blockBlob = getBlockBlob(containerName, blobName); blockBlob.UploadFromFile(fileName, FileMode.Open); } /// <summary> /// 指定文件路径,异步上传小文件 /// </summary> /// <param name="containerName"></param> /// <param name="blobName"></param> /// <param name="fileName"></param> /// <returns></returns> public Task UploadFromFileAsync(string containerName, string blobName, string fileName) { var blockBlob = getBlockBlob(containerName, blobName); return blockBlob.UploadFromFileAsync(fileName, FileMode.Open); } /// <summary> /// 上传字符串 /// </summary> /// <param name="containerName"></param> /// <param name="blobName"></param> /// <param name="str"></param> public void UploadFromString(string containerName, string blobName, string str) { var blockBlob = getBlockBlob(containerName, blobName); blockBlob.UploadText(str); } /// <summary> /// 异步上传字符串 /// </summary> /// <param name="containerName"></param> /// <param name="blobName"></param> /// <param name="str"></param> /// <returns></returns> public Task UploadFromStringAsync(string containerName, string blobName, string str) { var blockBlob = getBlockBlob(containerName, blobName); return blockBlob.UploadTextAsync(str); } #endregion /*************************************** * 多线程上传 **************************************/ /// <summary> /// 自动判断上传的文件方式 /// </summary> /// <param name="containerName"></param> /// <param name="blobName"></param> /// <param name="stream"></param> public void Upload(string containerName, string blobName, Stream stream) { if(stream.Length > THRESHOLD) { UploadMultiTask(containerName, blobName, stream); } else { UploadFromStream(containerName, blobName, stream); } } /// <summary> /// 上传大文件 /// </summary> /// <param name="containerName"></param> /// <param name="blobName"></param> /// <param name="stream"></param> /// <param name="blockSize"></param> /// <param name="thread"></param> /// <returns></returns> public void UploadMultiTask(string containerName, string blobName, Stream stream, long? blockSize = null, int? thread = null) { var blob = getBlockBlob(containerName, blobName); BlockBlobUploadWorker worker = new BlockBlobUploadWorker(blob, stream, thread, blockSize); worker.Run(); } #region /*************************************** * 验证是否存在 **************************************/ #region 验证是否存在 public bool CheckExist(string containerName, string blobName) { // Retrieve a reference to a container. CloudBlobContainer container = _blobClient.GetContainerReference(containerName); if (!container.Exists()) { return false; } return container.GetBlockBlobReference(blobName).Exists(); } public Task<bool> CheckExistAsync(string containerName, string blobName) { // Retrieve a reference to a container. CloudBlobContainer container = _blobClient.GetContainerReference(containerName); Task<bool> containerExist = container.ExistsAsync(); if (!containerExist.Result) { return Task.Factory.StartNew<bool>(()=> { return false; }); } return container.GetBlockBlobReference(blobName).ExistsAsync(); } #endregion /*************************************** * 删除Container **************************************/ #region 删除Container public void DeleteContainer(string containerName) { CloudBlobContainer container = _blobClient.GetContainerReference(containerName); container.DeleteIfExists(); } public Task DeleteContainerAsync(string containerName) { CloudBlobContainer container = _blobClient.GetContainerReference(containerName); return container.DeleteIfExistsAsync(); } #endregion /*************************************** * 删除Blob **************************************/ #region 删除blob public void Delete(string containerName, string blobName) { CloudBlobContainer container = _blobClient.GetContainerReference(containerName); if (!container.Exists()) { return; } CloudBlockBlob blockBlob = container.GetBlockBlobReference(blobName); blockBlob.Delete(); } public Task DeleteAsync(string containerName, string blobName) { CloudBlobContainer container = _blobClient.GetContainerReference(containerName); Task<bool> containerExist = container.ExistsAsync(); if (!containerExist.Result) { return Task.Factory.StartNew(() => { }); } CloudBlockBlob blockBlob = container.GetBlockBlobReference(blobName); return blockBlob.DeleteAsync(); } #endregion /*************************************** * 小文件下载 **************************************/ #region 下载Blob public Stream DownloadToStream(string containerName, string blobName) { var blockBlob = getBlockBlob(containerName, blobName); MemoryStream memStream = new MemoryStream(); blockBlob.DownloadToStream(memStream); memStream.Position = 0; return memStream; } public Task<Stream> DownloadToStreamAsync(string containerName, string blobName) { var blockBlob = getBlockBlob(containerName, blobName); MemoryStream memStream = new MemoryStream(); Task t = blockBlob.DownloadToStreamAsync(memStream); return Task.Factory.StartNew<Stream>(()=> { t.Wait(); memStream.Position = 0; return memStream; }); } public string DownLoadToString(string containerName, string blobName) { var blockBlob = getBlockBlob(containerName, blobName); return blockBlob.DownloadText(); } public Task<string> DownLoadToStringAsync(string containerName, string blobName) { var blockBlob = getBlockBlob(containerName, blobName); return blockBlob.DownloadTextAsync(); } #endregion #region own function private CloudBlockBlob getBlockBlob(string containerName, string blobName) { // Retrieve a reference to a container. CloudBlobContainer container = _blobClient.GetContainerReference(containerName); // Create the container if it doesn't already exist. container.CreateIfNotExists(); CloudBlockBlob blockBlob = container.GetBlockBlobReference(blobName); return blockBlob; } #endregion #endregion } class BlockBlobUploadWorker { private readonly long DEFAULT_BLOCK_SIZE = 2 * 1024 * 1024; //默认每块的大小 private readonly int DEFAULT_THREAD = 20; //默认上传和下载的线程数 private readonly long MAX_BLOCK_SIZE = 4 * 1024 * 1024; //每个块最大的大小 private readonly int TRY_NUMBER = 5; //单个块上传失败的次数 private object _syncRoot = new object(); //for multi task sync private int _blockId = 0; List<string> _blockIdList = new List<string>(); private int _runningTaskNumber = 0; private object _syncRunningNumebr = new object(); private bool isCancel = false; private object _syncCancel = new object(); private long _byteCountLeft; // left byte count private Stream _stream; private int _totalThread; private long _blockSize; private CloudBlockBlob _blob; AutoResetEvent _sigEvent = new AutoResetEvent(false); public BlockBlobUploadWorker(CloudBlockBlob blob, Stream stream, int? totalThread, long? blockSize) { _blob = blob; _stream = stream; _byteCountLeft = stream.Length; if (blockSize == null || blockSize > MAX_BLOCK_SIZE) { blockSize = DEFAULT_BLOCK_SIZE; } if (totalThread == null || totalThread < 0) { totalThread = DEFAULT_THREAD; } _totalThread = (int)totalThread; _blockSize = (long)blockSize; } public void Run() { //first we queue the task List<Func<Task>> taskList = new List<Func<Task>>(); while(taskList.Count < _totalThread) { var task = GetTask(); if (task == null) { break; } taskList.Add(task); } //let's run it foreach(var task in taskList) { runTask(task); } WaitForFinished(); PutBlockList(); } private void runTask(Func<Task> func) { lock (_syncCancel) { if (isCancel) { return; } } lock (_syncRunningNumebr) { _runningTaskNumber++; } var task = func(); task.ContinueWith((T) => { lock (_syncRunningNumebr) { _runningTaskNumber--; } var t = GetTask(); if (t != null) { runTask(t); } SignifyIfOK(); }); } private void PutBlockList() { int tryNumber = 0; while (true) { try { _blob.PutBlockList(_blockIdList); break; } catch { tryNumber++; if (tryNumber > TRY_NUMBER) { throw; } } } } private void WaitForFinished() { _sigEvent.WaitOne(); } private void SignifyIfOK() { lock (_syncRunningNumebr) { if (_runningTaskNumber == 0) { _sigEvent.Set(); } } } private Func<Task> GetTask() { lock (_syncRoot) { if (_byteCountLeft > 0) { long bufferSize = Math.Min(_byteCountLeft, _blockSize); _byteCountLeft -= bufferSize; byte[] buffer = new byte[bufferSize]; Task t = _stream.ReadAsync(buffer, 0, (int)bufferSize); //get buffer string blockIdBase64 = Convert.ToBase64String(BitConverter.GetBytes(_blockId)); Func<Task> fun = async () => { await PutBlockAsync(blockIdBase64, buffer); }; _blockIdList.Add(blockIdBase64); _blockId++; t.Wait(); return fun; } else { return null; } } } private async Task PutBlockAsync(string blockId, byte[] buffer) { using (MemoryStream stream = new MemoryStream(buffer)) { int tryNumber = 0; while(true) { try { await _blob.PutBlockAsync(blockId, stream, null); break; } catch { tryNumber++; if (tryNumber > TRY_NUMBER) { lock (_syncCancel) { isCancel = true; } throw; } } } } } } }
单线程的下载测试代码:
BlockBlobClient blobClient = new BlockBlobClient("你的连接字符串"); using (Stream stream = blobClient.DownloadToStream("test1", "测试.rar")) { byte[] buffer = new byte[stream.Length]; stream.Read(buffer, 0, (int)stream.Length); using (FileStream fs = new FileStream(@"d:\data\测试.rar", FileMode.OpenOrCreate, FileAccess.Write)) { fs.Write(buffer, 0, buffer.Length); } } Console.WriteLine("Finished!"); Console.ReadKey();
多线程上传测试代码:
static void Main(string[] args) { BlockBlobClient blobClient = new BlockBlobClient("你的连接字符串"); using(FileStream stream = new FileStream(@"D:\Data\FJData.rar", FileMode.Open, FileAccess.Read)) { blobClient.UploadBigFile("test1", "测试1.rar", stream); } Console.WriteLine("Finished!"); Console.ReadKey(); }
0 条评论