package testkit
Ordering
- Alphabetic
Visibility
- Public
- All
Type Members
- final case class ConfigParameterValue extends Product with Serializable
- case class ExecutionReport(totalRows: Long, totalQueries: Int, failures: Seq[String]) extends Product with Serializable
- class QueryExecutionMonitor extends StreamingQueryListener
- case class SparkInletTap[T](portName: String, instream: MemoryStream[T])(implicit evidence$5: Encoder[T]) extends Product with Serializable
- case class SparkOutletTap[T](portName: String, queryName: String)(implicit evidence$6: Encoder[T]) extends Product with Serializable
- trait SparkScalaTestSupport extends AnyWordSpecLike with Matchers with BeforeAndAfterAll
-
final
case class
SparkStreamletTestkit(session: SparkSession, config: Config = ConfigFactory.empty, maxDuration: Duration = 30.seconds) extends Product with Serializable
Testkit for testing Spark streamlets.
Testkit for testing Spark streamlets.
The steps to write a test using the testkit are:
- Create the test class and extend it with
SparkScalaTestSupport
- Create the Spark streamlet testkit instance
- Create the Spark streamlet
- Setup inlet tap on inlet port
- Setup outlet tap on outlet port
- Build input data and send to inlet tap
- Run the test
- Get data from outlet and assert
// 1. Create the test class and extend it with `SparkScalaTestSupport` class MySparkStreamletSpec extends SparkScalaTestSupport { "SparkProcessor" should { "process streaming data" in { // 2. Create Spark streamlet testkit instance val testKit = SparkStreamletTestkit(session) // 3. Create spark streamlet val processor = new SparkProcessor[Data, Simple] { override def createLogic(): ProcessorLogic[Data, Simple] = new ProcessorLogic[Data, Simple](OutputMode.Append) { override def process(inDataset: Dataset[Data]): Dataset[Simple] = inDataset.select($"name").as[Simple] } } // 4. Setup inlet(s) tap on inlet port(s) val in: SparkInletTap[Data] = inletAsTap[Data](processor.shape.inlet) // 5. Setup outlet tap(s) on outlet port(s) val out: SparkOutletTap[Simple] = outletAsTap[Simple](processor.shape.outlet) // 6. Prepare input data and send it to the inlet tap(s) val data = (1 to 10).map(i => Data(i, s"name$i")) in.addData(data) // 7. Run the test run(processor, Seq(in), Seq(out)) // 8. Get data from outlet tap(s) and assert val results = out.asCollection(session) results should contain(Simple("name1")) } } }
Note: Every test is executed against a
SparkSession
which gets created and removed as part of the test lifecycle methods. - Create the test class and extend it with
- case class TestContextException(portName: String, msg: String) extends RuntimeException with Product with Serializable
-
class
TestSparkStreamletContext extends SparkStreamletContext
An implementation of
SparkCtx
for unit testing.An implementation of
SparkCtx
for unit testing.readStream
reads from a streaming data source (acsv
in this case) and prepares aDataset[In]
writeStream
returns aStreamingQuery
that pushes the inputDataset[Out]
to aMemorySink
.
Value Members
- object ConfigParameterValue extends Serializable