Apache Spark Kinesis示例不起作用


问题内容

我正在尝试运行JavaKinesisWordCountASL示例。

该示例似乎连接到我的Kinesis
Stream并从该流中获取数据(如下面的日志所示)。但是,在示例中,Sparks不会调用传递给unionStreams.flatMap方法的调用函数,并且不会打印任何单词计数。

我尝试同时使用Java 8和Java 7运行。我正在ubuntu实例上运行它。相同的示例适用于我的Macbook。

ec2.internal / 10.80.91.13:39149],有1条消息正在等待15/11/14 01:59:42
INFO网络。ConnectionManager:接受来自[ip-10-80-91-13.ec2.internal /
10.80.91.13的连接:56700] 15/11/15 01:59:42 WARN
storage.BlockManager:块输入-0-1416016781800已存在于此计算机上;不重新添加它14/11/15 01:59:42
INFO接收器.BlockGenerator:推入块输入-0-1416016781800 14/11/15 01:59:43
INFO存储器.MemoryStore:确保用curMem =调用的FreeSpace(256) 3776,maxMem = 938244833
14/11/15 01:59:43信息存储.MemoryStore:块input-0-1416016782800作为值存储在内存中(估计大小256.0
B,可用894.8 MB)14/11/15 01:59: 43 INFO
storage.BlockManagerInfo:在ip-10-80-91-13.ec2.internal:39149(大小:256.0
B,可用空间:894.8 MB)的内存中添加了input-0-1416016782800:15/11/15 01:59:
43信息存储.BlockManagerMaster:块输入0-1416016782800的更新信息14-15年15月15日01:59:43
WARN存储。BlockManager:块输入0-1416016782800已存在于这台机器上。不重新添加它14/11/15 01:59:43
INFO接收器.BlockGenerator:推入块输入-0-1416016782800 14/11/15 01:59:44
INFO调度器.ReceiverTracker:流0收到2个块14/11 / 15 01:59:44
INFO调度程序.ReceiverTracker:流1收到0个块14/11/15 01:59:44
INFO调度程序.JobScheduler:添加了时间为1416016784000 ms的作业14/11/15 01:59:46
INFO调度程序.ReceiverTracker:流0收到0块14/11/15 01:59:46
INFO调度程序.ReceiverTracker:流1收到0块14/11/15 01:59:46
INFO调度程序。JobScheduler:添加了时间为1416016786000 ms的作业14 / 11/15
01:59:46信息impl.CWPublisherRunnable:成功发布了17个基准。2015年11月14日01:59:46信息存储。内存存储:确保用curMem
= 4032,maxMem = 938244833 14/11/15 01:59:46
INFO存储调用的FreeFreeSpace(248).MemoryStore:阻止输入1-1416016786000作为值存储在内存中(估计大小248.0
B,可用894.8 MB)14 / 11/15
01:59:46信息存储.BlockManagerInfo:在IP-10-80-91-13.ec2上的内存中添加了input-1-1416016786000.internal:39149(大小:248.0
B,可用空间:894.8 MB)14 / 11/15
01:59:46信息存储。BlockManagerMaster:更新了块输入-1-1416016786000的信息。14/11/15 01:59:46
WARN存储。BlockManager:块输入-1-1416016786000已存在于此机器上;不重新添加它14/11/15 01:59:46
INFO接收器。BlockGenerator:推入块输入-1-1416016786000 14/11/15 01:59:46 INFO
impl.CWPublisherRunnable:已成功发布14个基准。14/11/15 01:59:48
INFO调度程序.ReceiverTracker:流0收到0个块14/11/15 01:59:48 INFO存储。MemoryStore:确保以curMem
= 4280,maxMem = 938244833调用的FreeSpace(264)14/11/15 01:59:48
INFO调度程序.ReceiverTracker:流1接收了1个块14/11/15 01:59:48
INFO存储器。块输入0-116016787800作为值存储在内存中(估计大小264.0 B,可用894.8 MB)14/11/15
01:59:48信息存储.BlockManagerInfo:在IP-10-80的内存中添加了输入0-116016787800
-91-13.ec2.internal:39149(大小:264.0 B,免费:894.8 MB)14/11/15
01:59:48信息存储.BlockManagerMaster:更新了块输入的信息-0-1416016787800 14/11/15 01:59:48
INFO调度程序。JobScheduler:为时间添加了作业1416016788000 ms 14/11/15 01:59:48
WARN存储。BlockManager:块输入-0-1416016787800已存在于此计算机上。不重新添加它14/11/15 01:59:48
INFO接收器.BlockGenerator:推送块输入-0-1416016787800 14/11/15 01:59:50
INFO调度程序.ReceiverTracker:流0接收到1个块14/11/15 01:59:50
INFO调度程序.ReceiverTracker:流1接收到0个块14/11/15 01:59:50
INFO调度程序.JobScheduler:添加了时间作业1416016790000 ms 14/11/15 01:59:51
INFO存储.MemoryStore:确保用curMem = 4544,maxMem =
938244833调用的FreeSpace(264)14/11/15 01:59:51 INFO存储.MemoryStore:块输入-0-
1416016790800作为值存储在内存中(估计大小为264.0 B,可用空间894.8 MB)14/11/15 01:59:51
INFO存储.BlockManagerInfo:在ip-10-80-91-13的内存中添加了input-0-1416016790800。
ec2.internal:39149(大小:264.0 B,免费:894.8 MB)14/11/15
01:59:51信息存储.BlockManagerMaster:更新了块输入的信息-0-1416016790800 14/11/15 01:59:51
WARN storage.BlockManager:该机器上已经存在块输入0-1416016790800。不重新添加它14/11/15 01:59:


问题答案:

这可能与您拥有多少个工作者线程有关。使用–master local
[2]运行应用程序时,我遇到了同样的问题。我花了很多时间寻找答案,却一无所获。出于好奇,我改用–master local
[4]并成功了。我不知道根本原因。也许更熟悉Spark的人可以启发我们。

注意:就我而言,我的Kinesis流有两个分片。因此,该应用创建了两个输入流,每个分片一个。