原创|使用教程|编辑:陈俊吉|2016-07-13 10:17:20.000|阅读 420 次
概述:IBM InfoSphere Streams是一个高级计算平台,帮助用户开发的应用程序快速摄取、分析和关联来自数千个实时源的信息。该解决方案可处理非常高的数据吞吐率,最高可达每秒数百万个事件或消息。
# 慧都年终大促·界面/图表报表/文档/IDE等千款热门软控件火热促销中 >>
相关链接:
是一个高级计算平台,帮助用户开发的应用程序快速摄取、分析和关联来自数千个实时源的信息。该解决方案可处理非常高的数据吞吐率,最高可达每秒数百万个事件或消息。该平台支持流数据的实时处理,支持不断更新持续查询的结果,可在仍在移动的数据流中检测洞察。Streams旨在从一个几分钟到几小时的窗口中的移动信息(数据流)中揭示有意义的模式。该平台能够获取低延迟洞察,并为注重时效的应用程序(比如欺诈检测或网络管理)获取更好的成果,从而提供业务价值。流处理的演示如下图所示:
Streams 的主要设计目的是:
提供了一种编程模型和 IDE 来定义数据来源,还提供了已融合到处理执行单元中的称为运算符的软件分析模块。它还提供了基础架构来支持从这些组件合成可扩展的流处理应用程序。主要平台组件包括:
Streams Processing Language (SPL),Streams 的编程语言,是一种分布式数据流合成语言。它是一种类似 C++ 或 Java™ 的可扩展且全功能的语言,支持用户定义的数据类型。您可以使用 SPL 或原生语言(C++ 或Java)编写自定义函数。也可以使用 C++ 或 Java 编写用户定义的运算符。
Streams 通过SPL将应用程序会描述一个导向图,该图由各个互联且处理多个数据流的运算符组成。数据流可来自系统外部,或者在应用程序内部生成。SPL 程序的基本构建块包括:
Java 作为面向对象的高级编程语言,以其使用简单、完全面象对象、平台可移植性、健壮的沙盒安全机制、动态性,以及大量可用的开发包等一系列优势,在互联网分布式环境下得到了极其广泛的应用,具有广泛的用户基础。为了Streams用户重用已有的Java开发技能、保护已有的Java资产,IBM Streams平台提供了使用 Java 编程语言来构建 Streams 应用程序的框架,具体包括 Java 运算符模型描述文件以及 Java 运算符 API(JavaOp)两种方式。这两种方式在一定程度上让开发人员集成Java功能模块。
虽然Streams所提供的Java运算符模型描述文件以及Java运算符API(JavaOp)方式支持了Java代码调用,但是,传统的Java是面向对象的编程语言,它只能帮助开发人员实现业务逻辑或重用Java代码,但它无法以“流处理”的思维,直接进行类似SPL的流应用开发。
streamsx.topology开源项目的出现,丰富了Streams的开发方式,为流应用的开发者提供更多的语言选择。streamsx.topology项目提供Java Application API,面向流处理应用的将Java封装成一套类库,使得开发者完全使用Java和Scala语言并按照“流处理”的思维创建IBM Streams流处理应用。
streamsx.topology开源项目参考网址:
//ibmstreams.github.io/streamsx.topology/
1. 从www.ibm.com/software/data/infosphere/stream-computing/trials.html下载“IBM InfoSphere Streams 4.0 Java API BetaQuickStart VM Image”。Streams Quick StartEdition 是 InfoSphere Streams 的一个免费的、可下载的非生产版本,它没有数据或时间限制,支持您在自己的独特环境中试验流计算,构建一个强大的分析平台。该平台能够处理难以置信的高数据吞吐量,高达每秒数百万个事件或消息。InfoSphere Streams QuickStart Edition 没有提供支持选项,仅适用于非生产用途。要获得相应的支持,请购买 InfoSphereStreams。
2.解压VM镜像,并在VMPlayer启动VM。
该VM已经安装com.ibm.streamsx.topology工具箱,工具箱位于/home/streamsadmin/streamx.topology/streamsx.topology,包含:
1)在桌面双击InfoSphere Streams Studio (Eclipse)图标启动Streams Studio.
2)指定workspace为:/home/streamsadmin/Workspaces/topology/
3) 运行"Hello World" 示例程序:在Project Explorer标签, 打开src->simple->HelloWorld->HelloWorld.java,代码如下:
package simple; import com.ibm.streamsx.topology.TStream; import com.ibm.streamsx.topology.Topology; import com.ibm.streamsx.topology.context.StreamsContextFactory; publicclass HelloWorld { publicstaticvoid main(String[] args) throws Exception { /* * Create the container for the topology that will * hold the streams of tuples. */ Topology topology = new Topology("HelloWorld"); /* * Declare a source stream (hw) with String tuples containing two tuples, * "Hello" and "World!". */ TStream<String> hw = topology.strings("Hello", "World!"); /* * Sink hw by printing each of its tuples to System.out. */ hw.print(); if (args.length == 0) StreamsContextFactory.getEmbedded().submit(topology).get(); else StreamsContextFactory.getStreamsContext(args[0]).submit(topology) .get(); } } |
4) 运行"Hello World" 示例程序:右击HelloWorld.java,选择Run As-> Run Configurations. 在Run Configurations 'Main' 标签页面,确保Main class填 simple.HelloWorld. 在 arguments标签页面, 设置Program arguments为EMBEDDED (EMBEDDED表示程序独立编译并嵌入到JVM运行,而不依赖Streams运行时环境)。
5) 设置必要参数后,运行该应用您会看到以下的输出:
Hello
world!
我们创建一个名叫MyGrep的Sample应用,用于指导关键字搜索某个文件夹下的文件,搜索到则显示相应内容所在的行数和内容。具体步骤如下:
1)创建Java项目: File->New->Project->Java->JavaProject,点击Next,在Create a Java Project填写MySamples,点击Next。
2)在Libraies标签页:
点击External Jar按钮,选择com.ibm.streams.topology.jar
点击Add Library按钮,选择IBM InfoSphere Streams
点击Next和Finish完成项目的创建。新创建项目视图如下图所示:
3)创建命名空间:右击src->New->Package->JavaPackage的Name填写:mysapce
4)创建Java主类:src->右击myspace->New->Class,在Name填写:mysapce,确保勾选“public static void main(String[]args)”。确定后生成MyGrep.java。
5)创建Java类:src->右击myspace->New->Class,在Name填写:GrepInfo,不要勾选“public static void main(String[]args)”,确定后生成GrepInfo.java。
6)MyGrep.java和GrepInfo.java的代码内容如下:
MyGrep.java
package myspace; import java.io.ObjectStreamException; import java.util.Arrays; import java.util.concurrent.Future; import com.ibm.streamsx.topology.TStream; import com.ibm.streamsx.topology.Topology; import com.ibm.streamsx.topology.context.StreamsContextFactory; import com.ibm.streamsx.topology.file.FileStreams; import com.ibm.streamsx.topology.function7.Function; publicclass MyGrep { publicstaticvoid main(String[] args) throws Exception { String contextType = args[0]; String directory = args[1]; final String term = args[2]; Topology topology = new Topology("MyGrep"); TStream<String> filePaths = FileStreams.directoryWatcher(topology, directory); TStream<String> lines = FileStreams.textFileReader(filePaths); TStream<GrepInfo> grepInfo = lines.multiTransform( new Function<String, Iterable<GrepInfo>>() { privatestaticfinallongserialVersionUID = 1L; privateintlineNum = 0; @Override public Iterable<GrepInfo> apply(String line) { ++lineNum; if(line.contains(term)){ return Arrays.asList(new GrepInfo(lineNum, line)); } else returnnull; } private Object readResolve() throws ObjectStreamException { returnthis; } }, GrepInfo.class); grepInfo.print(); Future<?> future = StreamsContextFactory.getStreamsContext(contextType) .submit(topology); Thread.sleep(30 * 1000); future.cancel(true); } } |
GrepInfo.java
package myspace; import java.io.Serializable; import com.ibm.streamsx.topology.tuple.Keyable; publicclass GrepInfo implements Keyable<GrepInfo>, Serializable { privatestaticfinallongserialVersionUID = 1L; intlineNum; String lineStr; public GrepInfo(int ln, String ls) { this.lineNum = ln; this.lineStr = ls; } @Override public String toString() { return"Line Num " + lineNum + " : " + lineStr; } @Override public GrepInfo getKey() { // TODO Auto-generated method stub returnnull; } } |
7)运行MyGrep之前,请确保Streams Instance已经启动,并在/home/streamsadmin/test创建一个文本文件并写如若干内容。
8)运行程序:右击MyGrep.java,选择Run As -> RunConfigurations. 在Run Configurations 'Main' 标签页面,确保Project填写MySamples和Main class填 myspace.MyGrep。
在 arguments标签页面, 设置Program arguments为DISTRIBUTED /home/streamsadmin/test China (DISTRIBUTED 表示程序部署到Streams运行时环境,/home/streamsadmin/test是程序搜索关键的目录;China是搜索关键字)。
9)查看结果:
在Streams Exploere -> StreamsInstances ->右击default:<instance>@<Domain>,选择Show Instance Graph
在Instance Graph窗口,我们能看到MyGrep最终运行图。右击最后的Print PE->Show Log->Show PEConsole
在Console将会显现MyGrep运行的结果
streams.topology开源项目所提供的Java Application API使得Streams开发者对流应用的编程语言有了新的选择,它能帮助开发者重用Java编程能力,并按照“流处理”的思路简化流应用的开发过程,让开发者更专注于业务的处理逻辑而不是流处理的框架。然而,该项目还处于早期阶段,很多功能和接口尚未实现;对比成熟的、完善的SPL,Java Application API的功能和成熟性还有很大差距。相信在不久的将来,streams.topology将会逐渐完善并成为IBM Streams平台的一个重要补充。
详情请咨询!
客服热线:023-66090381
本站文章除注明转载外,均为本站原创或翻译。欢迎任何形式的转载,但请务必注明出处、不得修改原文相关链接,如果存在内容上的异议请邮件反馈至chenjj@cahobeh.cn