I have the below Pyspark streaming code not working. I am trying to consume a simple Kafka topic (called "test") as a stream in Pyspark but the code is not displaying the message.
import os
import time
import sys
#import findspark
#findspark.init("/usr/lib/spark")
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="PythonSSKafka")
ssc = StreamingContext(sc,10)
print("ssc ================= {} {}");
kafkaStream = KafkaUtils.createStream(ssc, "test", "localhost:9092", {"imagetext":1})
print("contexts =================== {} {}");
lines = kafkaStream.map(lambda x: x[1])
lines.pprint()
ssc.start()
time.sleep(10)
#ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)
I am getting the below error. Help, please.
19/01/21 07:49:46 ERROR scheduler.TaskSetManager: Task 0 in stage 18.0 failed 1 times; aborting job
19/01/21 07:49:46 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks have all completed, from pool
19/01/21 07:49:46 INFO scheduler.TaskSchedulerImpl: Cancelling stage 18
19/01/21 07:49:46 INFO scheduler.DAGScheduler: ResultStage 18 (start at NativeMethodAccessorImpl.java:-2) failed in 0.327 s due to Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 18, localhost, executor driver): kafka.common.InvalidConfigException: client.id localhost:9092 is illegal, contains a character other than ASCII alphanumerics, ".", "_" and "-"
at kafka.common.Config$class.validateChars(Config.scala:32)
at kafka.consumer.ConsumerConfig$.validateChars(ConsumerConfig.scala:25)
at kafka.consumer.ConsumerConfig$.validateClientId(ConsumerConfig.scala:65)