Azure Blob(二)——块Blob上传、下载、删除(未完结)

这是Azure Blob系列文章。此前文章的列表为:

Azure Blob(一)——分类


这一节,我们主要学习块Blob的相关操作。块Blob上传时,需要先把文件流划分成不大于4MB的块,待所有文件块都上传完毕之后,再调用PutBlockList方法确认,此前的块便会组织成一个Blob。若不提交,则上传的blob块一周之后自动删除。


下列的代码是上传BlockBlob的相关操作,其中包括多线程上传和单线程上传。文件上传取决于带宽,因此,单线程上传和多线程上传的效率差不多。对于小文件,单线程上传甚至比多线程上传更快。对于大文件,多线程上传比单线程上传略快一些,主要是因为多线程缓存了上传文件。


未完待续。。。


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.Blob;
using System.IO;

namespace AzureBaseLib
{
    public class BlockBlobClient : IBlobClient
    {
        private readonly long THRESHOLD = 100 * 1024 * 1024;          //单线程小文件上传的上限
        
        CloudBlobClient _blobClient = null;
        public BlockBlobClient(string connectionString)
        {
            _blobClient = CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient();
        }

        /***************************************
         * 单线程上传
         **************************************/
        #region 单线程上传
        /// <summary>
        /// 从数据流中单线程上传,大文件也能上传
        /// </summary>
        /// <param name="containerName"></param>
        /// <param name="blobName"></param>
        /// <param name="stream"></param>
        public void UploadFromStream(string containerName, string blobName, Stream stream)
        {
            var blockBlob = getBlockBlob(containerName, blobName);
            blockBlob.UploadFromStream(stream);
        }

        /// <summary>
        /// 从数据流中异步上传,大文件也能上传
        /// </summary>
        /// <param name="containerName"></param>
        /// <param name="blobName"></param>
        /// <param name="stream"></param>
        /// <returns></returns>
        public Task UploadFromStreamAsync(string containerName, string blobName, Stream stream)
        {
            var blockBlob = getBlockBlob(containerName, blobName);
            return blockBlob.UploadFromStreamAsync(stream);            
        }

        /// <summary>
        /// 指定文件路径,上传小文件
        /// </summary>
        /// <param name="containerName"></param>
        /// <param name="blobName"></param>
        /// <param name="fileName"></param>
        public void UploadFromFile(string containerName, string blobName, string fileName)
        {
            var blockBlob = getBlockBlob(containerName, blobName);
            blockBlob.UploadFromFile(fileName, FileMode.Open);
        }

        /// <summary>
        /// 指定文件路径,异步上传小文件
        /// </summary>
        /// <param name="containerName"></param>
        /// <param name="blobName"></param>
        /// <param name="fileName"></param>
        /// <returns></returns>
        public Task UploadFromFileAsync(string containerName, string blobName, string fileName)
        {
            var blockBlob = getBlockBlob(containerName, blobName);
            return blockBlob.UploadFromFileAsync(fileName, FileMode.Open);
        }

        /// <summary>
        /// 上传字符串
        /// </summary>
        /// <param name="containerName"></param>
        /// <param name="blobName"></param>
        /// <param name="str"></param>
        public void UploadFromString(string containerName, string blobName, string str)
        {
            var blockBlob = getBlockBlob(containerName, blobName);
            blockBlob.UploadText(str);
        }

        /// <summary>
        /// 异步上传字符串
        /// </summary>
        /// <param name="containerName"></param>
        /// <param name="blobName"></param>
        /// <param name="str"></param>
        /// <returns></returns>
        public Task UploadFromStringAsync(string containerName, string blobName, string str)
        {
            var blockBlob = getBlockBlob(containerName, blobName);
            return blockBlob.UploadTextAsync(str);
        }
        #endregion


        /***************************************
         * 多线程上传
         **************************************/
        /// <summary>
        /// 自动判断上传的文件方式
        /// </summary>
        /// <param name="containerName"></param>
        /// <param name="blobName"></param>
        /// <param name="stream"></param>
        public void Upload(string containerName, string blobName, Stream stream)
        {
            if(stream.Length > THRESHOLD)
            {
                UploadMultiTask(containerName, blobName, stream);
            }
            else
            {
                UploadFromStream(containerName, blobName, stream);
            }
        }

        /// <summary>
        /// 上传大文件
        /// </summary>
        /// <param name="containerName"></param>
        /// <param name="blobName"></param>
        /// <param name="stream"></param>
        /// <param name="blockSize"></param>
        /// <param name="thread"></param>
        /// <returns></returns>
        public void UploadMultiTask(string containerName, string blobName, Stream stream, long? blockSize = null, int? thread = null)
        {
            var blob = getBlockBlob(containerName, blobName);
            BlockBlobUploadWorker worker = new BlockBlobUploadWorker(blob, stream, thread, blockSize);
            worker.Run();
        }

        #region
        /***************************************
         * 验证是否存在
         **************************************/
        #region 验证是否存在
        public bool CheckExist(string containerName, string blobName)
        {
            // Retrieve a reference to a container. 
            CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
            if (!container.Exists())
            {
                return false;
            }
            return container.GetBlockBlobReference(blobName).Exists();
        }

        public Task<bool> CheckExistAsync(string containerName, string blobName)
        {
            // Retrieve a reference to a container. 
            CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
            Task<bool> containerExist = container.ExistsAsync();
            if (!containerExist.Result)
            {
                return Task.Factory.StartNew<bool>(()=> { return false; });
            }
            return container.GetBlockBlobReference(blobName).ExistsAsync();
        }
        #endregion


        /***************************************
         * 删除Container
         **************************************/
        #region 删除Container
        public void DeleteContainer(string containerName)
        {
            CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
            container.DeleteIfExists();
        }

        public Task DeleteContainerAsync(string containerName)
        {
            CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
            return container.DeleteIfExistsAsync();
        }
        #endregion

        /***************************************
         * 删除Blob
         **************************************/
        #region 删除blob
        public void Delete(string containerName, string blobName)
        {
            CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
            if (!container.Exists())
            {
                return;
            }
            CloudBlockBlob blockBlob = container.GetBlockBlobReference(blobName);
            blockBlob.Delete();
        }

        public Task DeleteAsync(string containerName, string blobName)
        {
            CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
            Task<bool> containerExist = container.ExistsAsync();
            if (!containerExist.Result)
            {
                return Task.Factory.StartNew(() => { });
            }
            CloudBlockBlob blockBlob = container.GetBlockBlobReference(blobName);
            return blockBlob.DeleteAsync();
        }
        #endregion

        /***************************************
         * 小文件下载
         **************************************/
        #region 下载Blob
        public Stream DownloadToStream(string containerName, string blobName)
        {
            var blockBlob = getBlockBlob(containerName, blobName);
            MemoryStream memStream = new MemoryStream();
            blockBlob.DownloadToStream(memStream);
            memStream.Position = 0;
            return memStream;
        }

        public Task<Stream> DownloadToStreamAsync(string containerName, string blobName)
        {
            var blockBlob = getBlockBlob(containerName, blobName);
            MemoryStream memStream = new MemoryStream();
            Task t = blockBlob.DownloadToStreamAsync(memStream);
            return Task.Factory.StartNew<Stream>(()=> {
                t.Wait();
                memStream.Position = 0;
                return memStream;
            });
        }

        public string DownLoadToString(string containerName, string blobName)
        {
            var blockBlob = getBlockBlob(containerName, blobName);
            return blockBlob.DownloadText();
        }

        public Task<string> DownLoadToStringAsync(string containerName, string blobName)
        {
            var blockBlob = getBlockBlob(containerName, blobName);
            return blockBlob.DownloadTextAsync();
        }
        #endregion

        #region own function
        private CloudBlockBlob getBlockBlob(string containerName, string blobName)
        {
            // Retrieve a reference to a container. 
            CloudBlobContainer container = _blobClient.GetContainerReference(containerName);

            // Create the container if it doesn't already exist.
            container.CreateIfNotExists();

            CloudBlockBlob blockBlob = container.GetBlockBlobReference(blobName);

            return blockBlob;
        }

        
        #endregion
        #endregion
    }

    class BlockBlobUploadWorker
    {
        private readonly long DEFAULT_BLOCK_SIZE = 2 * 1024 * 1024;  //默认每块的大小
        private readonly int DEFAULT_THREAD = 20;                    //默认上传和下载的线程数
        private readonly long MAX_BLOCK_SIZE = 4 * 1024 * 1024;     //每个块最大的大小
        private readonly int TRY_NUMBER = 5;                         //单个块上传失败的次数

        private object _syncRoot = new object();     //for multi task sync
        private int _blockId = 0;
        List<string> _blockIdList = new List<string>();

        private int _runningTaskNumber = 0;
        private object _syncRunningNumebr = new object();

        private bool isCancel = false;
        private object _syncCancel = new object();

        private long _byteCountLeft;                // left byte count
        private Stream _stream;
        private int _totalThread;
        private long _blockSize;
        
        private CloudBlockBlob _blob;

        AutoResetEvent _sigEvent = new AutoResetEvent(false);

        public BlockBlobUploadWorker(CloudBlockBlob blob, Stream stream, int? totalThread, long? blockSize)
        {
            _blob = blob;
            _stream = stream;
            _byteCountLeft = stream.Length;

            if (blockSize == null || blockSize > MAX_BLOCK_SIZE)
            {
                blockSize = DEFAULT_BLOCK_SIZE;
            }
            if (totalThread == null || totalThread < 0)
            {
                totalThread = DEFAULT_THREAD;
            }
            _totalThread = (int)totalThread;
            _blockSize = (long)blockSize;
        }

        public void Run()
        {
            //first we queue the task
            List<Func<Task>> taskList = new List<Func<Task>>();
            while(taskList.Count < _totalThread)
            {
                var task = GetTask();
                if (task == null)
                {
                    break;
                }
                taskList.Add(task);
            }
            //let's run it
            foreach(var task in taskList)
            {
                runTask(task);
            }
            WaitForFinished();
            PutBlockList();
        }

        private void runTask(Func<Task> func)
        {
            lock (_syncCancel)
            {
                if (isCancel)
                {
                    return;
                }
            }
            lock (_syncRunningNumebr)
            {
                _runningTaskNumber++;
            }
            var task = func();
            task.ContinueWith((T) => {
                lock (_syncRunningNumebr)
                {
                    _runningTaskNumber--;
                }
                var t = GetTask();
                if (t != null)
                {
                    runTask(t);
                }
                SignifyIfOK();
            });
        }

        private void PutBlockList()
        {
            int tryNumber = 0;
            while (true)
            {
                try
                {
                    _blob.PutBlockList(_blockIdList);
                    break;
                }
                catch
                {
                    tryNumber++;
                    if (tryNumber > TRY_NUMBER)
                    {
                        throw;
                    }
                }
            } 
        }

        private void WaitForFinished()
        {
            _sigEvent.WaitOne();
        }

        private void SignifyIfOK()
        {
            lock (_syncRunningNumebr)
            {
                if (_runningTaskNumber == 0)
                {
                    _sigEvent.Set();
                }
            }
        }

        private Func<Task> GetTask()
        {
            lock (_syncRoot)
            {
                if (_byteCountLeft > 0)
                {
                    long bufferSize = Math.Min(_byteCountLeft, _blockSize);
                    _byteCountLeft -= bufferSize;
                    byte[] buffer = new byte[bufferSize];
                    Task t = _stream.ReadAsync(buffer, 0, (int)bufferSize); //get buffer

                    string blockIdBase64 = Convert.ToBase64String(BitConverter.GetBytes(_blockId));
                    Func<Task> fun = async () =>
                    {
                        await PutBlockAsync(blockIdBase64, buffer);
                    };

                    _blockIdList.Add(blockIdBase64);
                    _blockId++;

                    t.Wait();
                    return fun;
                }
                else
                {
                    return null;
                }
            }
        }

        private async Task PutBlockAsync(string blockId, byte[] buffer)
        {
            using (MemoryStream stream = new MemoryStream(buffer))
            {
                int tryNumber = 0;
                while(true)
                {
                    try
                    {
                        await _blob.PutBlockAsync(blockId, stream, null);
                        break;
                    }
                    catch
                    {
                        tryNumber++;
                        if (tryNumber > TRY_NUMBER)
                        {
                            lock (_syncCancel)
                            {
                                isCancel = true;
                            }
                            throw;
                        }
                    }
                }
            }
        }
    } 
}


单线程的下载测试代码:

BlockBlobClient blobClient = new BlockBlobClient("你的连接字符串");
using (Stream stream = blobClient.DownloadToStream("test1", "测试.rar"))
{
    byte[] buffer = new byte[stream.Length];
    stream.Read(buffer, 0, (int)stream.Length);
    using (FileStream fs = new FileStream(@"d:\data\测试.rar", FileMode.OpenOrCreate, FileAccess.Write))
    {
        fs.Write(buffer, 0, buffer.Length);
    }
}
Console.WriteLine("Finished!");
Console.ReadKey();


多线程上传测试代码:

static void Main(string[] args)
{
    BlockBlobClient blobClient = new BlockBlobClient("你的连接字符串");
    using(FileStream stream = new FileStream(@"D:\Data\FJData.rar", FileMode.Open, FileAccess.Read))
    {
        blobClient.UploadBigFile("test1", "测试1.rar", stream);
    }
    Console.WriteLine("Finished!");
    Console.ReadKey();
}


0 条评论

    发表评论

    电子邮件地址不会被公开。 必填项已用 * 标注