final case class SparkStreamletTestkit(session: SparkSession, config: Config = ConfigFactory.empty, maxDuration: Duration = 30.seconds) extends Product with Serializable
Testkit for testing Spark streamlets.
The steps to write a test using the testkit are:
- Create the test class and extend it with
SparkScalaTestSupport
- Create the Spark streamlet testkit instance
- Create the Spark streamlet
- Setup inlet tap on inlet port
- Setup outlet tap on outlet port
- Build input data and send to inlet tap
- Run the test
- Get data from outlet and assert
// 1. Create the test class and extend it with `SparkScalaTestSupport` class MySparkStreamletSpec extends SparkScalaTestSupport { "SparkProcessor" should { "process streaming data" in { // 2. Create Spark streamlet testkit instance val testKit = SparkStreamletTestkit(session) // 3. Create spark streamlet val processor = new SparkProcessor[Data, Simple] { override def createLogic(): ProcessorLogic[Data, Simple] = new ProcessorLogic[Data, Simple](OutputMode.Append) { override def process(inDataset: Dataset[Data]): Dataset[Simple] = inDataset.select($"name").as[Simple] } } // 4. Setup inlet(s) tap on inlet port(s) val in: SparkInletTap[Data] = inletAsTap[Data](processor.shape.inlet) // 5. Setup outlet tap(s) on outlet port(s) val out: SparkOutletTap[Simple] = outletAsTap[Simple](processor.shape.outlet) // 6. Prepare input data and send it to the inlet tap(s) val data = (1 to 10).map(i => Data(i, s"name$i")) in.addData(data) // 7. Run the test run(processor, Seq(in), Seq(out)) // 8. Get data from outlet tap(s) and assert val results = out.asCollection(session) results should contain(Simple("name1")) } } }
Note: Every test is executed against a SparkSession
which gets created and removed as part of the test
lifecycle methods.
- Alphabetic
- By Inheritance
- SparkStreamletTestkit
- Serializable
- Serializable
- Product
- Equals
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Instance Constructors
- new SparkStreamletTestkit(session: SparkSession, config: Config = ConfigFactory.empty, maxDuration: Duration = 30.seconds)
Type Members
- implicit class InletTapOps[T] extends AnyRef
- implicit class OutletTapOps[T] extends AnyRef
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- val TestStreamletName: String
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native() @HotSpotIntrinsicCandidate()
- val config: Config
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def inletAsTap[In](in: CodecInlet[In])(implicit arg0: Encoder[In]): SparkInletTap[In]
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- val maxDuration: Duration
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def outletAsTap[Out](out: CodecOutlet[Out])(implicit arg0: Encoder[Out]): SparkOutletTap[Out]
-
def
run(sparkStreamlet: SparkStreamlet, inletTap: SparkInletTap[_], outletTaps: Seq[SparkOutletTap[_]]): ExecutionReport
Runs the
sparkStreamlet
usinginletTap
as the source andoutletTaps
as the sinks.Runs the
sparkStreamlet
usinginletTap
as the source andoutletTaps
as the sinks. EachinletTap
abstracts aMemoryStream
and an inlet port, where the test input data gets added. TheoutletTap
returns a port and a query name, which gives a handle to the SparkStreamingQuery
name that gets executed.- sparkStreamlet
the Sparklet to run
- inletTap
the inlet stream and port
- outletTaps
the collection of outlet query names and ports
- returns
Unit
-
def
run(sparkStreamlet: SparkStreamlet, inletTaps: Seq[SparkInletTap[_]], outletTap: SparkOutletTap[_]): ExecutionReport
Runs the
sparkStreamlet
usinginletTaps
as the sources andoutletTap
as the sink.Runs the
sparkStreamlet
usinginletTaps
as the sources andoutletTap
as the sink. EachinletTap
abstracts aMemoryStream
and an inlet port, where the test input data gets added. TheoutletTap
returns a port and a query name, which gives a handle to the SparkStreamingQuery
name that gets executed.- sparkStreamlet
the Sparklet to run
- inletTaps
the collection of inlet streams and ports
- outletTap
the outlet query and port
- returns
Unit
-
def
run(sparkStreamlet: SparkStreamlet, inletTap: SparkInletTap[_], outletTap: SparkOutletTap[_]): ExecutionReport
Runs the
sparkStreamlet
usinginletTap
as the source andoutletTap
as the sink.Runs the
sparkStreamlet
usinginletTap
as the source andoutletTap
as the sink. EachinletTap
abstracts aMemoryStream
and an inlet port, where the test input data gets added. TheoutletTap
returns a port and a query name, which gives a handle to the SparkStreamingQuery
name that gets executed.- sparkStreamlet
the Sparklet to run
- inletTap
the inlet stream and port
- outletTap
the outlet query and port
- returns
Unit
-
def
run(sparkStreamlet: SparkStreamlet, inletTaps: Seq[SparkInletTap[_]], outletTaps: Seq[SparkOutletTap[_]]): ExecutionReport
Runs the
sparkStreamlet
usinginletTaps
as the sources andoutletTaps
as the sinks.Runs the
sparkStreamlet
usinginletTaps
as the sources andoutletTaps
as the sinks. EachinletTap
abstracts aMemoryStream
and an inlet port, where the test input data gets added. TheoutletTap
returns a port and a query name, which gives a handle to the SparkStreamingQuery
name that gets executed.- sparkStreamlet
the Sparklet to run
- inletTaps
the collection of inlet streams and ports
- outletTaps
the collection of outlet query names and ports
- returns
Unit
- val session: SparkSession
- implicit lazy val sqlCtx: SQLContext
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
def
withConfigParameterValues(configParameterValues: ConfigParameterValue*): SparkStreamletTestkit
Adding configuration parameters and their values to the configuration used in the test.
Adding configuration parameters and their values to the configuration used in the test.
ConfigParameterValue takes a ConfigParameter and a string containing the value of the parameter.
- Annotations
- @varargs()
Deprecated Value Members
-
def
finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] ) @Deprecated
- Deprecated