您需要将json属性从json对象中拉出,并将两个值(json对象和String groupId)作为双值元组传递。当您将流声明为拓扑规范逻辑的一部分时,您将为第二个字段指定名称“groupId”,并且事情应该正常工作。如果您不想修改Kafka喷口,则需要有一个中间螺栓,其唯一目的是将groupId拆分出json对象。中间螺栓还可以使用定向流(emitDirect()方法),将目标放在json对象中的groupId上。
这就是为什么我不重复使用Kafka喷口的一个原因 - 除了盲目地将数据写入流之外,我经常需要做其他事情,但这既不在这里也不在那里。