final case class SparkStreamletTestkit(session: SparkSession, config: Config = ConfigFactory.empty, maxDuration: Duration = 30.seconds) extends Product with Serializable
Testkit for testing Spark streamlets.
The steps to write a test using the testkit are:
- Create the test class and extend it with
SparkScalaTestSupport - Create the Spark streamlet testkit instance
- Create the Spark streamlet
- Setup inlet tap on inlet port
- Setup outlet tap on outlet port
- Build input data and send to inlet tap
- Run the test
- Get data from outlet and assert
// 1. Create the test class and extend it with `SparkScalaTestSupport` class MySparkStreamletSpec extends SparkScalaTestSupport { "SparkProcessor" should { "process streaming data" in { // 2. Create Spark streamlet testkit instance val testKit = SparkStreamletTestkit(session) // 3. Create spark streamlet val processor = new SparkProcessor[Data, Simple] { override def createLogic(): ProcessorLogic[Data, Simple] = new ProcessorLogic[Data, Simple](OutputMode.Append) { override def process(inDataset: Dataset[Data]): Dataset[Simple] = inDataset.select($"name").as[Simple] } } // 4. Setup inlet(s) tap on inlet port(s) val in: SparkInletTap[Data] = inletAsTap[Data](processor.shape.inlet) // 5. Setup outlet tap(s) on outlet port(s) val out: SparkOutletTap[Simple] = outletAsTap[Simple](processor.shape.outlet) // 6. Prepare input data and send it to the inlet tap(s) val data = (1 to 10).map(i => Data(i, s"name$i")) in.addData(data) // 7. Run the test run(processor, Seq(in), Seq(out)) // 8. Get data from outlet tap(s) and assert val results = out.asCollection(session) results should contain(Simple("name1")) } } }
Note: Every test is executed against a SparkSession which gets created and removed as part of the test
lifecycle methods.
- Alphabetic
- By Inheritance
- SparkStreamletTestkit
- Serializable
- Serializable
- Product
- Equals
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Instance Constructors
- new SparkStreamletTestkit(session: SparkSession, config: Config = ConfigFactory.empty, maxDuration: Duration = 30.seconds)
Type Members
- implicit class InletTapOps[T] extends AnyRef
- implicit class OutletTapOps[T] extends AnyRef
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- val TestStreamletName: String
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native() @HotSpotIntrinsicCandidate()
- val config: Config
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def inletAsTap[In](in: CodecInlet[In])(implicit arg0: Encoder[In]): SparkInletTap[In]
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- val maxDuration: Duration
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def outletAsTap[Out](out: CodecOutlet[Out])(implicit arg0: Encoder[Out]): SparkOutletTap[Out]
-
def
run(sparkStreamlet: SparkStreamlet, inletTap: SparkInletTap[_], outletTaps: Seq[SparkOutletTap[_]]): ExecutionReport
Runs the
sparkStreamletusinginletTapas the source andoutletTapsas the sinks.Runs the
sparkStreamletusinginletTapas the source andoutletTapsas the sinks. EachinletTapabstracts aMemoryStreamand an inlet port, where the test input data gets added. TheoutletTapreturns a port and a query name, which gives a handle to the SparkStreamingQueryname that gets executed.- sparkStreamlet
the Sparklet to run
- inletTap
the inlet stream and port
- outletTaps
the collection of outlet query names and ports
- returns
Unit
-
def
run(sparkStreamlet: SparkStreamlet, inletTaps: Seq[SparkInletTap[_]], outletTap: SparkOutletTap[_]): ExecutionReport
Runs the
sparkStreamletusinginletTapsas the sources andoutletTapas the sink.Runs the
sparkStreamletusinginletTapsas the sources andoutletTapas the sink. EachinletTapabstracts aMemoryStreamand an inlet port, where the test input data gets added. TheoutletTapreturns a port and a query name, which gives a handle to the SparkStreamingQueryname that gets executed.- sparkStreamlet
the Sparklet to run
- inletTaps
the collection of inlet streams and ports
- outletTap
the outlet query and port
- returns
Unit
-
def
run(sparkStreamlet: SparkStreamlet, inletTap: SparkInletTap[_], outletTap: SparkOutletTap[_]): ExecutionReport
Runs the
sparkStreamletusinginletTapas the source andoutletTapas the sink.Runs the
sparkStreamletusinginletTapas the source andoutletTapas the sink. EachinletTapabstracts aMemoryStreamand an inlet port, where the test input data gets added. TheoutletTapreturns a port and a query name, which gives a handle to the SparkStreamingQueryname that gets executed.- sparkStreamlet
the Sparklet to run
- inletTap
the inlet stream and port
- outletTap
the outlet query and port
- returns
Unit
-
def
run(sparkStreamlet: SparkStreamlet, inletTaps: Seq[SparkInletTap[_]], outletTaps: Seq[SparkOutletTap[_]]): ExecutionReport
Runs the
sparkStreamletusinginletTapsas the sources andoutletTapsas the sinks.Runs the
sparkStreamletusinginletTapsas the sources andoutletTapsas the sinks. EachinletTapabstracts aMemoryStreamand an inlet port, where the test input data gets added. TheoutletTapreturns a port and a query name, which gives a handle to the SparkStreamingQueryname that gets executed.- sparkStreamlet
the Sparklet to run
- inletTaps
the collection of inlet streams and ports
- outletTaps
the collection of outlet query names and ports
- returns
Unit
- val session: SparkSession
- implicit lazy val sqlCtx: SQLContext
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
def
withConfigParameterValues(configParameterValues: ConfigParameterValue*): SparkStreamletTestkit
Adding configuration parameters and their values to the configuration used in the test.
Adding configuration parameters and their values to the configuration used in the test.
ConfigParameterValue takes a ConfigParameter and a string containing the value of the parameter.
- Annotations
- @varargs()
Deprecated Value Members
-
def
finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] ) @Deprecated
- Deprecated