Azure Storm入门(一)——从一个例子开始
近几日刚刚进入Hadoop及其相关产品的世界。发觉从0到有的过程的确非常煎熬。项目中,需要用到一个实时搜索,最开始想用hadoop来弄,发现hadoop适合离线批处理。而对于搜索来说,希望能够完成一个实时处理。经多方打听,storm适合处理实时工作。
关于storm的入门文档转载文章:http://www.kangry.net/blog/?type=article&article_id=374
这里纪录一下使用Azure HDInsight Storm的过程。
1、首先,得拥有一个Azue账户。进入Azure门户,创建HDInsight storm集群。如下所示。输入相关的信息,注意存储账户也应该指定,若没有,需要创建存储账户。HDInsight默认使用blob作为文件系统。后面我们也会看到,其相关日志也是放在存储账户中。点击“创建HDInsight集群”,后台自动创建集群。整个过程大概需要15分钟。
2、安装下列一个版本的Visual Studio
(1)Visual Studio 2012 Update 4
(2)Visual Studio 2013 Update 4 或 Visual Studio 2013 Community
(3)Visual Studio 2015 或 Visual Studio 2015 Community
果断选择安装2015啦
3、安装最新版的Azure SDK。官网下载链接:https://azure.microsoft.com/en-us/downloads/,注意与你的语言和开发工具对应。
安装好Azure SDK后,VS左侧的服务浏览器中应该有HDInsight了
4、vs中,选择“文件-》新建-》项目”。在新建项目窗口中,展开“已安装-》模板”,然后选择HDInsight。选择“Storm应用程序”,并输入应用名称
注意:这里的项目文件和源码文件最好放在同一个文件夹中,否则在以后的在提交的时候会出现问题。具体见:https://social.msdn.microsoft.com/Forums/en-US/c2463b9a-fc87-4886-8f0c-14f32b7f0cd2/failed-to-find-scpnet-package-for-current-storm-project?forum=hdinsight
5、创建项目后,包含以下文件:
(1)Program.cs:定义项目的拓扑。请注意,默认情况下会创建包含一个 Spout 和一个 Bolt 的默认拓扑。
(2)Spout.cs:发出随机数的示例 Spout。
(3)Bolt.cs:保留 Spout 所发出数字计数的示例 Bolt。
在创建项目过程中,将会从 NuGet 下载最新的SCP.NET 包。(倘若不是,后续过程会提示你的SCP需要更新,可以通过.NuGet管理包更新SCP包)
接下来的例子中,我们修改现有代码,实现一个单词计数的storm程序。建议自己重新输入代码,以期更好的理解执行过程。
6、打开Spout.cs。spout用于将外部数据源的数据读入拓扑。Spout含义为喷泉,相当于Storm系统中的数据产生器。其主要组件如下:
(1)NextTuple: 允许Spout发出新的Tuple时由Storm调用。大白话讲就是,这段代码由整个Storm调用,是程序的入口,而且是循环不停的调用。
(2)Ack(仅限事务拓扑):针对从Spout发送的Tuple,处理拓扑中其他组件所发起的确认。确认Tuple可让Spout知道下游组件已经成功处理Tuple。也就是说,当Spout将任务发出去后,其实不知道任务是否已经做完,通过这个函数来验证(后续我会跟进这部分如何使用。本示例不会用到这部分的功能)
(3)Fail(仅限事务拓扑):处理其他组件无法处理的Tuple。这提供机会来重新发出Tuple,以重新处理Tuple。(同样,本示例不会用到这方面的功能)
7、将Spout文件内的内容替换为以下内容。若能全部手动输入更好,至少要通读一遍代码。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Threading; using Microsoft.SCP; using Microsoft.SCP.Rpc.Generated; // 将外部源中的数据读入拓扑。 namespace StormTestWordCount { public class Spout : ISCPSpout { private Context ctx; private Random r = new Random(); string[] sentences = new string[] { "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; public Spout(Context ctx) { //set the instance context this.ctx = ctx; Context.Logger.Info("StormTestWordCount, Spout constructor called"); //Declare output schema Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>(); //The schema for the default output stream is // a tuple that contains a string field outputSchema.Add("default", new List<Type>() { typeof(string) }); this.ctx.DeclareComponentSchema(new ComponentStreamSchema(null, outputSchema)); } // get an instance of the spout public static Spout Get(Context ctx, Dictionary<string, Object> parms) { return new Spout(ctx); } // 由storm调用 public void NextTuple(Dictionary<string, Object> parms) { Context.Logger.Info("StormTestWordCount, Spout NextTuple enter"); System.Threading.Thread.Sleep(1000 * 60); // The sentence to be emitted string sentence; // get a rondom sentence sentence = sentences[r.Next(0, sentences.Length - 1)]; Context.Logger.Info("StormTestWordCount, Spout Emit: {0}", sentence); //Emit it ctx.Emit(new Values(sentence)); Context.Logger.Info("StormTestWordCount, Spout NextTuple exit"); } public void Ack(long seqId, Dictionary<string, Object> parms) { // only used for transactional topologies } public void Fail(long seqId, Dictionary<string, Object> parms) { // only used for transactional topologies } } }
8、删除项目中的现有Bolt.cs文件,在“资源管理器”中,右键-》添加-》新建项。从列表中选择“Storm Bolt”,将名字改为“Splitter.cs”。同样,创建另外一个Bolt“Counter.cs”
(1)Splitter.cs:实现Bolt,获得从Spout发过来的句子,分割句子,并将分割后得到的单词发出。
(2)Counter.cs:实现Bolt,统计从Splitter中发过来的单词计数。
9、默认情况下,Splitter.cs中包含一个方法Execute,当获得Tuple时,调用此方法。将Splitter的内容替换如下:
using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Threading; using Microsoft.SCP; using Microsoft.SCP.Rpc.Generated; // 实现Bolt,以将橘子分割成不同的单词并发出一串新单词 namespace StormTestWordCount { public class Splitter : ISCPBolt { private Context ctx; public Splitter(Context ctx) { Context.Logger.Info("StormTestWordCount, Splitter constructor called"); this.ctx = ctx; // declare input and output schemas Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>(); // Input contains a tuple with a string field (the sentence) inputSchema.Add("default", new List<Type>() { typeof(string) }); Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>(); // output contains a tuple with a string field (the word) outputSchema.Add("default", new List<Type>() { typeof(string) }); this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema)); } // get a new instance of the bolt public static Splitter Get(Context ctx, Dictionary<string, Object> parms) { return new Splitter(ctx); } // 在Bolt收到要处理的Tuple时将调用此方法。此时,你可以读取和处理Tuple,以及发出传出Tuple // Called whe a new tuple is available public void Execute(SCPTuple tuple) { Context.Logger.Info("StormTestWordCount, Splitter Excute enter"); // get the sentence from the tuple string sentence = tuple.GetString(0); // split at space characters foreach (var word in sentence.Split(' ')) { Context.Logger.Info("StormTestWordCount, Splitter Emit: {0}", word); //Emit each word this.ctx.Emit(new Values(word)); } Context.Logger.Info("StormTestWordCount, Splitter Execute exit"); } } }
10、Counter.cs中默认也只有Execute函数。将Counter.cs的内容替换如下:
using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Threading; using Microsoft.SCP; using Microsoft.SCP.Rpc.Generated; // 实现Bolt,以统计每个单词的数目,并发出一串新单词和每个单词的计数 namespace StormTestWordCount { public class Counter : ISCPBolt { private Context ctx; // Dictionary for holding words and counts private Dictionary<string, int> counts = new Dictionary<string, int>(); // Constructor public Counter(Context ctx) { Context.Logger.Info("StormTestWordCount, Counter constructor called"); // set instance context this.ctx = ctx; // Declare input and output schemas Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>(); // a tuple containing a string field - the word inputSchema.Add("default", new List<Type>() { typeof(string) }); Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>(); // a tuple containing a string and integer field - the word and the word count outputSchema.Add("default", new List<Type>() { typeof(string), typeof(int) }); this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema)); } // get a new instance public static Counter Get(Context ctx, Dictionary<string, Object> parms) { return new Counter(ctx); } // called when a new tuple is availableContext.Logger.Info("StormTestWordCount, public void Execute(SCPTuple tuple) { Context.Logger.Info("StormTestWordCount, Counter Execute enter"); //get the word form the tuple string word = tuple.GetString(0); // calculate the count int count = counts.ContainsKey(word) ? counts[word] : 0; count++; counts[word] = count; Context.Logger.Info("StormTestWordCount, Counter Emit: {0}, count: {1}", word, count); // Emit the word and count information this.ctx.Emit(Constants.DEFAULT_STREAM_ID, new List<SCPTuple> { tuple }, new Values(word, count)); Context.Logger.Info("StormTestWordCount, Counter Execute exit"); } } }
11、定义拓扑。该示例的top图形如下:
句子从 Spout 发出,并分布到 Splitter Bolt 的实例。Splitter Bolt 将句子分割成多个单词,并将这些单词分布到 Counter Bolt。
因为字数会本地保留在 Counter 实例中,所以我们想要确保特定单词流向相同的 Counter Bolt 实例,因此只能有一个实例跟踪特定单词(即相同的单词计数保存在同一个Counter Bolt中)。但是,针对 Splitter Bolt,哪个 Bolt 收到哪个句子并不重要,因此,我们只想要将句子负载平衡到那些实例。
打开 Program.cs。重要的方法是 ITopologyBuilder,它用于定义提交到 Storm 的拓扑。将 ITopologyBuilder 的内容替换为以下代码,以实现上面所述的拓扑。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.SCP; using Microsoft.SCP.Topology; namespace StormTestWordCount { [Active(true)] class Program : TopologyDescriptor { static void Main(string[] args) { } public ITopologyBuilder GetTopologyBuilder() { // Create a new topology TopologyBuilder topologyBuilder = new TopologyBuilder("StormTestWordCount" + DateTime.Now.ToString("yyyyMMddHHmmss")); //Context.Logger.Info("StormTestWordCount, Hello Strom!!"); // add the spout to the topology. // name the component 'sentences' // name the field that is emitted as 'sentence' topologyBuilder.SetSpout( "sentences", Spout.Get, new Dictionary<string, List<string>>() { {Constants.DEFAULT_STREAM_ID, new List<string>(){"sentence"}} }, 1); // add the splitter bolt to the topology. // name the component 'splitter' // name the field that is emitted 'word' // use sufflegrouping to distribute incoming tuples from the 'sentences' spout across instances of the splitter topologyBuilder.SetBolt( "splitter", Splitter.Get, new Dictionary<string, List<string>>() { { Constants.DEFAULT_STREAM_ID, new List<string>() { "word"} } }, 1).shuffleGrouping("sentences"); //猜想,表示输入从sentences中来,负载均衡即可 // add the counter bolt to the topology. // Use fieldGrouping to ensure that tuples are routed // to counter instances based on the contents of field position 0 (the word). // This could also have been List<string>(){"word"} topologyBuilder.SetBolt( "counter", Counter.Get, new Dictionary<string, List<string>>() { { Constants.DEFAULT_STREAM_ID, new List<string>() { "word", "count"} } }, 1).fieldsGrouping("splitter", new List<int>() { 0 }); //猜想,表示输入从splitter来,同一个word分发到相同的Counter中 // add topology config topologyBuilder.SetTopologyConfig(new Dictionary<string, string>() { {"topology.kryo.register", "[\"[B\"]" } //设置配置文件。详情请见:https://storm.apache.org/documentation/Serialization.html }); return topologyBuilder; } } }
12、提交拓扑。右键项目-》提交Storm到HDInsight,可能需要输入Azure账号。
13、提交一段时间后,在VS中会产生一个TOP结构。如图。此作业不会停下来,直到手动Deactivate或者Kill。
14、同样,在Azure门户的Storm仪表板中也能看到该作业正在运行。
15、如何查看运行情况呢?通过日志系统。在创建工程中,我们指定了一个存储账号。我们可以在存储账号中查看代码中Context.Logger.Info写入的信息。在VS中的服务浏览器中,选择HDInsight->storm集群名-》Hasoop Service Log-》双击下边的Table,在右侧会显示查询结构。由于Storm的其他的代码也会产生日志信息,因此需要通过过滤条件将我们的程序产生的log打印出来。结果如下所示:
接下来我会继续探索Strom的运行机制、事务处理部分、如何收集bolt的产生结果、如何通过消息机制获得数据。
0 条评论