Azure Storm入门(一)——从一个例子开始

近几日刚刚进入Hadoop及其相关产品的世界。发觉从0到有的过程的确非常煎熬。项目中,需要用到一个实时搜索,最开始想用hadoop来弄,发现hadoop适合离线批处理。而对于搜索来说,希望能够完成一个实时处理。经多方打听,storm适合处理实时工作


关于storm的入门文档转载文章:http://www.kangry.net/blog/?type=article&article_id=374


这里纪录一下使用Azure HDInsight Storm的过程。


1、首先,得拥有一个Azue账户。进入Azure门户,创建HDInsight storm集群。如下所示。输入相关的信息,注意存储账户也应该指定,若没有,需要创建存储账户。HDInsight默认使用blob作为文件系统。后面我们也会看到,其相关日志也是放在存储账户中。点击“创建HDInsight集群”,后台自动创建集群。整个过程大概需要15分钟。

2、安装下列一个版本的Visual Studio

(1)Visual Studio 2012 Update 4

(2)Visual Studio 2013 Update 4 或 Visual Studio 2013 Community

(3)Visual Studio 2015 或 Visual Studio 2015 Community

果断选择安装2015啦

3、安装最新版的Azure SDK。官网下载链接:https://azure.microsoft.com/en-us/downloads/,注意与你的语言和开发工具对应。

安装好Azure SDK后,VS左侧的服务浏览器中应该有HDInsight了

4、vs中,选择“文件-》新建-》项目”。在新建项目窗口中,展开“已安装-》模板”,然后选择HDInsight。选择“Storm应用程序”,并输入应用名称

注意:这里的项目文件和源码文件最好放在同一个文件夹中,否则在以后的在提交的时候会出现问题。具体见:https://social.msdn.microsoft.com/Forums/en-US/c2463b9a-fc87-4886-8f0c-14f32b7f0cd2/failed-to-find-scpnet-package-for-current-storm-project?forum=hdinsight

5、创建项目后,包含以下文件:

(1)Program.cs:定义项目的拓扑。请注意,默认情况下会创建包含一个 Spout 和一个 Bolt 的默认拓扑。

(2)Spout.cs:发出随机数的示例 Spout。

(3)Bolt.cs:保留 Spout 所发出数字计数的示例 Bolt。

在创建项目过程中,将会从 NuGet 下载最新的SCP.NET 包。(倘若不是,后续过程会提示你的SCP需要更新,可以通过.NuGet管理包更新SCP包)

接下来的例子中,我们修改现有代码,实现一个单词计数的storm程序。建议自己重新输入代码,以期更好的理解执行过程。

6、打开Spout.cs。spout用于将外部数据源的数据读入拓扑。Spout含义为喷泉,相当于Storm系统中的数据产生器。其主要组件如下:

(1)NextTuple: 允许Spout发出新的Tuple时由Storm调用。大白话讲就是,这段代码由整个Storm调用,是程序的入口,而且是循环不停的调用

(2)Ack(仅限事务拓扑):针对从Spout发送的Tuple,处理拓扑中其他组件所发起的确认。确认Tuple可让Spout知道下游组件已经成功处理Tuple。也就是说,当Spout将任务发出去后,其实不知道任务是否已经做完,通过这个函数来验证(后续我会跟进这部分如何使用。本示例不会用到这部分的功能)

(3)Fail(仅限事务拓扑):处理其他组件无法处理的Tuple。这提供机会来重新发出Tuple,以重新处理Tuple。(同样,本示例不会用到这方面的功能)

7、将Spout文件内的内容替换为以下内容。若能全部手动输入更好,至少要通读一遍代码。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Threading;
using Microsoft.SCP;
using Microsoft.SCP.Rpc.Generated;

// 将外部源中的数据读入拓扑。
namespace StormTestWordCount
{
    public class Spout : ISCPSpout
    {
        private Context ctx;
        private Random r = new Random();

        string[] sentences = new string[] {
                "the cow jumped over the moon",
                "an apple a day keeps the doctor away",
                "four score and seven years ago",
                "snow white and the seven dwarfs",
                "i am at two with nature"
            };

        public Spout(Context ctx)
        {
            //set the instance context
            this.ctx = ctx;

            Context.Logger.Info("StormTestWordCount, Spout constructor called");

            //Declare output schema
            Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();

            //The schema for the default output stream is
            // a tuple that contains a string field
            outputSchema.Add("default", new List<Type>() { typeof(string) });
            this.ctx.DeclareComponentSchema(new ComponentStreamSchema(null, outputSchema));
        }

        // get an instance of the spout
        public static Spout Get(Context ctx, Dictionary<string, Object> parms)
        {
            return new Spout(ctx);
        }

        // 由storm调用
        public void NextTuple(Dictionary<string, Object> parms)
        {
            Context.Logger.Info("StormTestWordCount, Spout NextTuple enter");

            System.Threading.Thread.Sleep(1000 * 60);

            // The sentence to be emitted
            string sentence;

            // get a rondom sentence
            sentence = sentences[r.Next(0, sentences.Length - 1)];
            Context.Logger.Info("StormTestWordCount, Spout Emit: {0}", sentence);

            //Emit it
            ctx.Emit(new Values(sentence));

            Context.Logger.Info("StormTestWordCount, Spout NextTuple exit");
        }

        public void Ack(long seqId, Dictionary<string, Object> parms)
        {
            // only used for transactional topologies
        }

        public void Fail(long seqId, Dictionary<string, Object> parms)
        {
            // only used for transactional topologies
        }
    }
}

8、删除项目中的现有Bolt.cs文件,在“资源管理器”中,右键-》添加-》新建项。从列表中选择“Storm Bolt”,将名字改为“Splitter.cs”。同样,创建另外一个Bolt“Counter.cs”

(1)Splitter.cs:实现Bolt,获得从Spout发过来的句子,分割句子,并将分割后得到的单词发出。

(2)Counter.cs:实现Bolt,统计从Splitter中发过来的单词计数。

9、默认情况下,Splitter.cs中包含一个方法Execute,当获得Tuple时,调用此方法。将Splitter的内容替换如下:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Threading;
using Microsoft.SCP;
using Microsoft.SCP.Rpc.Generated;

// 实现Bolt,以将橘子分割成不同的单词并发出一串新单词
namespace StormTestWordCount
{
    public class Splitter : ISCPBolt
    {
        private Context ctx;
        public Splitter(Context ctx)
        {
            Context.Logger.Info("StormTestWordCount, Splitter constructor called");
            this.ctx = ctx;

            // declare input and output schemas

            Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
            // Input contains a tuple with a string field (the sentence)
            inputSchema.Add("default", new List<Type>() { typeof(string) });

            Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
            // output contains a tuple with a string field (the word)
            outputSchema.Add("default", new List<Type>() { typeof(string) });

            this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema));
        }

        // get a new instance of the bolt
        public static Splitter Get(Context ctx, Dictionary<string, Object> parms)
        {
            return new Splitter(ctx);
        }

        // 在Bolt收到要处理的Tuple时将调用此方法。此时,你可以读取和处理Tuple,以及发出传出Tuple
        // Called whe a new tuple is available
        public void Execute(SCPTuple tuple)
        {
            Context.Logger.Info("StormTestWordCount, Splitter Excute enter");

            // get the sentence from the tuple
            string sentence = tuple.GetString(0);
            // split at space characters
            foreach (var word in sentence.Split(' '))
            {
                Context.Logger.Info("StormTestWordCount, Splitter Emit: {0}", word);
                //Emit each word
                this.ctx.Emit(new Values(word));
            }

            Context.Logger.Info("StormTestWordCount, Splitter Execute exit");
        }
    }
}

10、Counter.cs中默认也只有Execute函数。将Counter.cs的内容替换如下:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Threading;
using Microsoft.SCP;
using Microsoft.SCP.Rpc.Generated;

// 实现Bolt,以统计每个单词的数目,并发出一串新单词和每个单词的计数
namespace StormTestWordCount
{
    public class Counter : ISCPBolt
    {
        private Context ctx;

        // Dictionary for holding words and counts
        private Dictionary<string, int> counts = new Dictionary<string, int>();

        // Constructor
        public Counter(Context ctx)
        {
            Context.Logger.Info("StormTestWordCount, Counter constructor called");

            // set instance context
            this.ctx = ctx;

            // Declare input and output schemas
            Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
            // a tuple containing a string field - the word
            inputSchema.Add("default", new List<Type>() { typeof(string) });

            Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
            // a tuple containing a string and integer field - the word and the word count
            outputSchema.Add("default", new List<Type>() { typeof(string), typeof(int) });

            this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema));
        }

        // get a new instance
        public static Counter Get(Context ctx, Dictionary<string, Object> parms)
        {
            return new Counter(ctx);
        }

        // called when a new tuple is availableContext.Logger.Info("StormTestWordCount, 
        public void Execute(SCPTuple tuple)
        {
            Context.Logger.Info("StormTestWordCount, Counter Execute enter");

            //get the word form the tuple
            string word = tuple.GetString(0);

            // calculate the count
            int count = counts.ContainsKey(word) ? counts[word] : 0;
            count++;
            counts[word] = count;

            Context.Logger.Info("StormTestWordCount, Counter Emit: {0}, count: {1}", word, count);
            // Emit the word and count information
            this.ctx.Emit(Constants.DEFAULT_STREAM_ID, new List<SCPTuple> { tuple }, new Values(word, count));

            Context.Logger.Info("StormTestWordCount, Counter Execute exit");
        }
    }
}

11、定义拓扑。该示例的top图形如下:

句子从 Spout 发出,并分布到 Splitter Bolt 的实例。Splitter Bolt 将句子分割成多个单词,并将这些单词分布到 Counter Bolt。

因为字数会本地保留在 Counter 实例中,所以我们想要确保特定单词流向相同的 Counter Bolt 实例,因此只能有一个实例跟踪特定单词(即相同的单词计数保存在同一个Counter Bolt中)。但是,针对 Splitter Bolt,哪个 Bolt 收到哪个句子并不重要,因此,我们只想要将句子负载平衡到那些实例。

打开 Program.cs。重要的方法是 ITopologyBuilder,它用于定义提交到 Storm 的拓扑。将 ITopologyBuilder 的内容替换为以下代码,以实现上面所述的拓扑。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.SCP;
using Microsoft.SCP.Topology;

namespace StormTestWordCount
{
    [Active(true)]
    class Program : TopologyDescriptor
    {
        static void Main(string[] args)
        {
        }

        public ITopologyBuilder GetTopologyBuilder()
        {
            // Create a new topology
            TopologyBuilder topologyBuilder = new TopologyBuilder("StormTestWordCount" + DateTime.Now.ToString("yyyyMMddHHmmss"));

            //Context.Logger.Info("StormTestWordCount, Hello Strom!!");

            // add the spout to the topology.
            // name the component 'sentences'
            // name the field that is emitted as 'sentence'
            topologyBuilder.SetSpout(
                "sentences",
                Spout.Get,
                new Dictionary<string, List<string>>()
                {
                    {Constants.DEFAULT_STREAM_ID, new List<string>(){"sentence"}}
                },
                1);

            // add the splitter bolt to the topology.
            // name the component 'splitter'
            // name the field that is emitted 'word'
            // use sufflegrouping to distribute incoming tuples from the 'sentences' spout across instances of the splitter
            topologyBuilder.SetBolt(
                "splitter",
                Splitter.Get,
                new Dictionary<string, List<string>>()
                {
                    { Constants.DEFAULT_STREAM_ID, new List<string>() { "word"} }
                },
                1).shuffleGrouping("sentences");    //猜想,表示输入从sentences中来,负载均衡即可

            // add the counter bolt to the topology.
            // Use fieldGrouping to ensure that tuples are routed 
            //  to counter instances based on the contents of field position 0 (the word). 
            //  This could also have been List<string>(){"word"}
            topologyBuilder.SetBolt(
                "counter",
                Counter.Get,
                new Dictionary<string, List<string>>()
                {
                    { Constants.DEFAULT_STREAM_ID, new List<string>() { "word", "count"} }
                },
                1).fieldsGrouping("splitter", new List<int>() { 0 });    //猜想,表示输入从splitter来,同一个word分发到相同的Counter中

            // add topology config
            topologyBuilder.SetTopologyConfig(new Dictionary<string, string>()
            {
                {"topology.kryo.register", "[\"[B\"]" }                    //设置配置文件。详情请见:https://storm.apache.org/documentation/Serialization.html
            });


            return topologyBuilder;
        }
    }
}

12、提交拓扑。右键项目-》提交Storm到HDInsight,可能需要输入Azure账号。

13、提交一段时间后,在VS中会产生一个TOP结构。如图。此作业不会停下来,直到手动Deactivate或者Kill。

14、同样,在Azure门户的Storm仪表板中也能看到该作业正在运行。

15、如何查看运行情况呢?通过日志系统。在创建工程中,我们指定了一个存储账号。我们可以在存储账号中查看代码中Context.Logger.Info写入的信息。在VS中的服务浏览器中,选择HDInsight->storm集群名-》Hasoop Service Log-》双击下边的Table,在右侧会显示查询结构。由于Storm的其他的代码也会产生日志信息,因此需要通过过滤条件将我们的程序产生的log打印出来。结果如下所示:


接下来我会继续探索Strom的运行机制、事务处理部分、如何收集bolt的产生结果、如何通过消息机制获得数据。


参考链接:http://www.windowsazure.cn/documentation/articles/hdinsight-storm-develop-csharp-visual-studio-topology


0 条评论

    发表评论

    电子邮件地址不会被公开。 必填项已用 * 标注