Faster Data Integration Pipeline Execution using Spark-Jobserver

Download Slides

As you may already know, the open-source Spark Job Server offers a powerful platform for managing Spark jobs, jars, and contexts, turning Spark into a much more convenient and easy-to-use service. The Spark-Jobserver can keep Spark context warmed up and readily available for accepting new jobs. At Informatica we are leveraging the Spark-Jobserver offerings to solve the data-visualization use-case. Data-visualization of hierarchical data in a Big data pipeline requires executing Spark-jobs on Hadoop clusters. The Spark context reuse helped us to achieve the faster spark tast execution in combination with the jobserver. We integrated Spark-Jobserver by using its REST APIs to create and manage the life-cycle of Spark contexts. Our product combines the customer’s data pipeline logic into a JAR and submits it to the Spark-Jobserver using the API.

Afterwards Spark-Jobserver maintains the first context warmed up and submit subsequent jobs to the same context, which allows quicker execution because the time spent will be only used for running the customer’s domain logic and not in resource allocations or other boilerplate infrastructure work. Our production use-cases require parallel job execution and job monitoring which is readily provided by Spark-Jobserver on account of its smooth integration with Hadoop’s Job History server. We were introduced and adopted Spark-Jobserver through this conference and community and would like to pay it forward by talking about our journey adopting it in our data-integration product. The key takeaways will be the major configuration touch points for using Spark-Jobserver with YARN cluster mode, how we dealt with secure/SSL-enabled Yarn clusters. We’ll continue with multiple Spark-Jobserver instance, managing jobs on same/different cluster, concurrent job execution and the APIs for resolving resource for dependencies.

Watch more Spark + AI sessions here
or
Try Databricks for free

Video Transcript

– Thank you all for joining, and many thanks to the organizers for making this event possible. I’m happy to have this opportunity to address the Spark community. Today I’ll be talking about how we have used Spark job server for speeding up the data pipelines at Informatica. A little introduction to ourselves. I’m Sailee Jain, I have been working at Informatica for around six years, I have worked on several flavors of data engineering product. Along with me I have Prabhakar Gouda, Prabhakar has been working in the industry for around eight years and we have recently worked on integrating a Spark Job Server into Informatica Data Engineering Product. We are going to share our experience were all the same, so a little about our company Informatica, Informatica is a market leader for Data Engineering solutions.

Informatical

If you think Data Engineering is often the major share of data workloads, the speed and the concise advantages, code advantages of a Spark can benefit Data Engineering products as well. At Informatica, we have adopted Spark very early on with version one notice. Here are few of Informatica offerings, starting from data integration with data management, Cloud Data Management, data quality and data governance, Master Data Management, data cataloguing and data security. Informatica provides various data engineering solutions. So with this introduction, I would like to move on to today’s agenda.

Agenda

There are a couple of important topics which we’ll cover today. We’ll start with a little introduction of data pipelines and ETL, then we’ll discuss complex data types and the challenges associated with them in handling them in ETL, then we’ll take a look at one of our interesting ETL data preview use case, which we have solved by leveraging Spark Job Server, we’ll shed some light on the architecture of our product as well focusing on how we have used Spark Job Server, then we’ll delve into to our journey towards integrating a Spark Job Server into our product, under which we will cover how SGS is easily integratable into the existing product architecture. We are also going to talk about the challenges that are associated with SGS using it in YARN cluster mode. Finally, we’ll talk about how we have improved the performance of our data preview feature using Spark Job Server.

We are going to little deep, delve a little deeper into the monitoring, logging and tweaking of SGS aspects. So making SGS work for your feature requirements is what we’ll aim at the later part of the slides.

Let’s start, let’s take a look at the Informatica ETL pipeline.

Informatica ET Pipeline

This is the typical view of our developer tool which is used to create data pipelines. On the left side pan, we see mappings. At Informatica, ETL pipelines are referred to as mappings. Let’s check the first mapping.

On the right side pan, you’ll see the expanded mapping logic. There are sources from where we extract the data.

There are a bunch of transformations which can modify the data or a schema or both, after transforming the data as per your business logic, finally, we load the data into the panel. If I expand the transformation panel, you’ll see a long listing of supported transformations. With the support for the variety of transformations, you could imagine that the production pipelines can get really complicated. And the pipeline logic can sometimes go wrong, resulting in incorrect data and the targets. Let’s take a look at one such buggy pipelines in the next slide.

Dealing with buggy pipelines

So, how we are going to deal with these buggy pipelines, we need to find out where the error is, is it due to wrong choice of data types? Is it due to incorrect usage of transformations? If yes, which transformation or which transformations? If I have the ability to check the data after each midstream transformation, then I should be able to identify the source of the bug in this pipeline. So here comes the need for our next feature, which is Data Preview.

Let’s take a look at the detailed feature requirements for Data Preview.

Data Preview – Feature Requirements

Data Preview is supposed to have the ability to comprehend complex data types, like maps, structs, arrays. It should support a variety of data sources like Cassandra, HDFS, S3, Azure.

A very important requirement is faster execution.

We could trade off data size with execution time. We can go low on the data size in order to get faster execution here. One important point here is to work with minimal changes to the existing code views. Since we have support for Spark-summit based Architecture, we would want to make our data to preview feature to work with minimal changes to our architecture. With these requirements, let’s take a look at the first approach that we follow, a Spark-submit based Approach.

At Informatica, we already have support for the infrastructure support for Spark on YARN, and so we leveraged a Spark-summit and implemented our data preview feature.

What spark-submit based data preview achieved?

This is what we achieved using the data preview approach based on Spark-summit.

Out of all the requirements that we had stated in the previous slides, we were able to achieve four items. However, we were not able to achieve faster execution.

Although using Spark, the execution was fast for our data pipeline, but we wanted better. We wanted something that does not make our ETL developers wait for more than a few seconds in the best scenario.

So we looked at profiling our Informatica Spark jobs, and here are the results for profiling the execution of a Spark-summit based Spark jobs.

Execution Profiling Results – Spark-submit

On the left hand side, you will see, there are four phases through which Informatica pipelines go. First is validation and optimization, then after that, you will do the translation, so you’ll translate the mapping logic to the Scala code, you’ll compile the Scala code, then you create the Spark-submit job request and you submit it for execution to YARN.

If you see on the right hand side, there is a pie chart which denotes the distribution of time taken by each of these fields, and you can see that major execution time is taken by a spark execution.

Even though Spark is very promising, and it, these jobs are supposed to run fast, however, is still a major portion of our time was taken by Spark execution. So we started to look into the alternatives that we could have, instead of a Spark-submit that could do the same work and is still be faster.

We explored multiple open source alternatives and zeroed in on Spark Job Server.

Spark Job Server

So let’s see in the next slide why,

Spark Job Server for those who don’t know about it, it’s a RESTful interface for submitting and managing Apache Spark jobs, application jars and job contexts. Let me focus on the points that were really important for considering the Spark Job Server. It is very well documented. It has an active community of contributors, it allows easy integration with existing architecture. It is suitable for low-latency queries, it, we are going to interchangeably refer Spark Job Server as SJS or Job Server in the next slides.

Compare Spark-submit with Spark Job Server

Let’s compare Spark-summit and Spark Job Server on two important metrics. First is Spark context sharing, Spark-submit does not support Spark context sharing. Every job runs as a new YARN application. However, a Spark Job Server supports context sharing across the jobs. Second one is named object sharing. It is not supported by spark server, and it’s supported by Spark Job Server. These are two really important metrics which made Spark Job Server, a better approach and more promising approach in terms of execution time. So in order to good perspective to where Spark Job Server is going to fit in our architecture, I would like to tell you the relevant portion of our product architecture in the next slide.

Spark-submit based Architecture

So let’s take a look at Spark-submit based Architecture. We have three components here, on the left hand side, we have Informatica Client, in the middle, we have Informatica Server, on the right hand side, we have Hadoop cluster. Informatica Client is used by the developers to create the ETL job pipelines, and the job is submitted for execution to the Informatica Server. Within the Informatica Server, you can see data integration service. This is the service which receives the job execution request.

Data Integration service flattens the data pipelines and hands it over to the data transformation machine. Data transformation machine also referred to as DTM is the translation layer, which takes care of optimizing the mapping logic and doing the translation into Scala code. The Scala code is then submitted to the Spark engine executor, who creates the Spark-submit request and hands it over to the YARN, YARN will take the request and will take care of getting the request executed using the master node worker nodes on the, or in the Hadoop cluster. Once the job is executed, the results of the job will get staged onto HDFS, and Spark in your executor will be notified. Once the Spark engine executor gets to know that the job is done, it reads the result from the HDFS and streams it back to the developer too. So this is how a typical flow of Informaticas’ mapping would look like using the Spark-submit approach. Here we are going to replace Spark-submit with Spark Job Server. So the change that is going to be done to this architecture would be to introduce Spark Job Server between the spark engine executor and YARN. Let’s see in the next slide.

Let’s see, this is a SJS based Architecture, and let me show you the new component that got added.

Here we have added a Spark Job Server between the executor and YARN. Spark engine executor will submit REST request to the Spark Job Server and Spark Job Server will process these requests. These requests can be uploading binaries, can be for creating the request, creating the spark context, or can be for job execution. Let’s see our mappings or data pipeline jobs execution flow in the next slide.

We have four components here, Informatica Client, Informatica Server, Spark Job Server and Hadoop cluster.

The user will submit, create data pipeline request to the Informatica Server.

Informatica Server from there on will start the Spark Job Server for the first mapping done, then it will create the Spark context.

The job, the request will, this the REST request which gets submitted to the Spark Job Server, it will then submit the same request to Hadoop cluster and will create the Spark context. Note that this is persistent Spark context.

Once the Spark context is created, Informatica Server will then submit the job execution request to the Spark Job Server. The job server will then forward the request to the cluster and cluster will take care of executing the job on the pre-running context, which is the persistent context which got created in the previous request. So this is how a typical execution flow for the first job run will look like. Next we’ll see the subsequent job runs.

So you can see here the difference. In the first run, we started this Spark Job Server as well as we created the Spark context. However, in the next slides, we are not doing the same. So this is how we save time in the subsequent runs in comparison to the first one, so all the jobs that are executed subsequently will be faster. An important point here is we should have a mechanism to monitor the load and if the Spark Job Server is idle, then we can go ahead and stop the running context on the Hadoop cluster as well as bring the Spark Job Server down.

With this execution flow, I think I would have given a good view of the architecture and let’s take a look at what performance is achieved.

Spark Job Server vs Spark-submit

Let me compare the Spark Job Server against the Spark-summit Approach. And in the right hand side of the slide, you’ll see a bar graph. In this bar graph, we are having a Spark-summit compared with a Spark Job Server. Spark-summit is represented with a solid bar and the job server is represented with the dotted bar. We have compared the first one and the subsequent ones. For the first one, in the bottom you will see that the timing which is required, which is taken by the Spark-submit as well as the Spark Job Server is on path. However, for the subsequent runs, a Spark Job Server has performed much better than a Spark-submit. So this is how we have achieved significant gain and improve, gaining performance for the Informatica jobs, and this is how we have solved a use case of data preview which required faster execution. Not just this has helped our customers, but it has also helped developers like us in order to get visual feedback of our data while handling production pipeline works and this has ensured quicker root cause analysis of our production bugs. With this, I would like to hand over to my colleague, Prabhakar, who will continue with the rest of the presentation and share our journey with Spark Job Server integration in Informatica. Thank you so much. – Hi, good morning. Thanks, Sailee, thanks all for joining the session, and big thanks to the organizers for making the Spark-summit possible, even in unprecedented situation. I am Prabhakar Gouda, an engineer at Informatica. I have started using Spark since Spark 1.6.

Recently, I have worked on Spark Data Preview using Spark Job Server. The next few minutes, I will talk about our overall experience and learnings that we have encountered while learn, while integrating Spark Job Server with Informatica ETL. I will focus more on our development process and our deep debugging tips.

Setup Details

Well, when we started our PLC, these are the major set of considerations. Spark Job Server needs to be started on same host as Informatica Server for better lifecycle management. We picked Spark Job Server 0.9.0 as we wanted to support latest version, you can choose Spark Job Server version based on the Spark version you want it to support. And we targeted these for Hadoop distributions, and it covers majority of our customers.

Getting started

So to begin with, we need to build Spark Job Server source code, you can download Spark Job Server source code from GitHub or you can Gitlone.

So it has sbt dependency you need to build, you need to install sbt on the machine you’re building. So, next step is to configure the ENV and configuration file according to your setup. Both ENV and config templates are available in the project. I will talk more about ENV and configuration in the coming slides. Spark Job Server jar will be generated upon executing server_package.sh.

Environment Variables(local.sh.template)

Well, in the previous slide, I mentioned about the ENV variables. These are the few important ENV variables required to start and execute the job using Spark Job Server, PIDFILE to store the process ID of job server, you can configure the job server memory blog directory, YARN_CONF directory is required if you’re using a YARN cluster mode. You need to place all site.XML under YARN_CONF directory. Spark Home directory is to place all your Spark configuration files.

Application Code Migration

Okay, if you’re already using Spark-submit, then there is a slight change in the execution flow, you need to modify your application to extend from Spark Job. Interrupt using Spark CLI, you will be using REST APIs to upload your jar and execute your jobs. Spark Job Server serialize the result into a JSON object you can query as job server to get the status of job and get the results as well. It is better to use a Scala streaming if you’re dealing with large set of data.

WordCount Example

Okay, here is an example with word count code, where your new application should extend from Spark job and you need to write two methods, One is the run job and the validate method. The run job is where actual implementation logic and validate is the quick validation on your input or maybe you know context configurations for, to have better throughput. In order to enable Spark SQL and HIVE, you need to create a Spark session context. This requires your application to extend from Spark session job.

Running Jobs

Once you have your application archive ready, then these are the steps you need to follow. Firstly, you need to create Spark context. In fact, Spark Job Server supports both shared and non-shared Spark context, so then you need to upload your application binaries, Spark Job Server supports Java and Python archives. So then you will use Jobs API to execute the job. In fact, you can choose to use in a synchronous or a synchronous way of execution.

Okay, these are the few useful REST APIs available in the Spark Job Server to manage your ETL jobs. Data API is to manage your all dependent input files, binaries to upload your Java or Python binaries.

Jobs API is required to manage complete job lifecycle. Context API is to manage a Spark context.

So let’s see how we resolve some of the challenges.

Handling Job Dependencies

In Spark-submit, we can resolve job dependencies using files, archive or jars option. Say in shared context, if your application save the common set of dependencies, then you can start your context using Hadoop configuration mentioned in these BBT.

Well, job specific dependencies can be resolved using context configuration parameter in job server. It provides the dependent-jar-uris and CPS context configuration parameter to handle job dependencies.

Multiple Spark Job Servers

So well, this is particular to Informatica where no single SJS instance can execute jobs against only one Hadoop cluster, where Informatica Server is capable of running jobs against multiple Hadoop clusters simultaneously. In order to support multiple clusters, we need to bring multiples of job servers in a machine. So we can configure these three dedicated ports to have multiple Spark Job Server running on the same machine. So JMX port is for monitoring HTTP port for Spark Job Server web UI, and H2DP port is an optional port, you can use MySQL or Postgres SQL for metadata management.

So well, major part concurrency, so we can configure maximum jobs per context in configuration file.

Concurrency

So if you are not configured, then the default value will be set to the number of ports available on the server machine. So next major part is Spark task level concurrency. As you know, a single Spark job can split into multiple Spark tasks, we need to set parallelism count to run the Spark task in parallel. There is a simple rule that you can calculate the number of parallelism and partition using input data and partition size. In case of shared contexts, you cannot configure job level parallelism count, the best way is to use the maximum data size that you will be dealing with and set the parallelism and partition count accordingly.

Dependency conflicts

So, Spark Job Server is an uber jar, it contains all it’s dependencies along with that, say it has Nettie, archival and Cassandra dependencies. If you add Spark jar into your application class path, you might get dependency version conflict. So we tried all these three possible solutions and we picked the jar shading as it was very simple and easy to manage.

Support for Kerberos

So, security aspect, we can execute jobs on Kerberos cluster, either using user Kerberos principal and Keytab or by using impersonation user. If you are using Keytab and principal, then the Spark context will be started using job server user. So job server supports in fact LDAP active user as well using Apache Shiro, you need to configure shiro.nie to have LDAP and active user support. In case of impersonation user, the Spark context will be started using impersonation user. In order to have impersonation user, you need to have a valid Kerberos token before starting Spark context. With a shared Spark context, we cannot support job level user impersonation.

Well, HTTP/SSL enabled, so job server is written using spray.can HTTP server. Spray.can HTTP server is an actor based embedded, low-latency high performance HTTP server. You can enable HTTPS on Spray.can HTTP server by configuring key store details. So client side authentication can be enabled using valid trust or file.

So another major aspect logging, we can control both server and cluster side logging by using log4j.properties. I have shared a simple chain that we did in order to have job ID in the cluster log, and these are the server log files that gets generated if you’re executing Spark jobs using Spark Job Server on the server machine.

So, if you’re planning to use a job server, then here are the few recommendations.

Key Takeaways

So, you know, especially in the YARN cluster mode, there is a possibility that your network delay or even clusters known as can cause timeout exceptions, you need to increase the timeout in a job server configuration file. If you are loading multiple binaries to run the job, then make sure that you use unique package name to avoid the class-path issues. So resource configuration is bit tricky as the Spark context will be created only once and the resources will be set during the Spark context creation. So, you need to pick the right resources before starting Spark context, say executor memory, driver memory and code and so on. So, you can consider having dynamic location in order to have executor created dynamically based on the workload. So, to enable dynamic allocation, you need to have shuffle service enabled.

So, if you are executing lots of jobs, then you might need to consider cleaning your binaries because you can keep the database light if you’re removing the binaries that you don’t need anymore. So, another important point related to Spark context is that, you, it is our responsibility to manage Spark context life cycle, because it is possible that Spark Job Server might go down without shutting down the Spark context. In that case, you need to manually go and kill the Spark context.

Okay, these are the few important timeout configuration and the default values. As you notice, the default values are pretty low, pretty less, these might cause no timeout exception in YARN cluster mode. So for example, YARN context creation time, YARN context efficient time. So there is this spray.can server idle timeout, so spray.can server idle timeout should always be greater than request timeout. So these timeout values, you can update in job server configuration file.

Complex Data Representation in Informatica Developer Tool

Okay, this is a sample representation of complex data in Informatica developer tool. So I have highlighted both primitives and supported complex types, and the requirement of using Spark for Data Preview came in pretty late. We were using a native engine all these days. In POC, Spark did really well in terms of processing complex files. We’ll be having these representation in more detail in coming demo.

Monitoring; Binaries

Well, in terms of monitoring, Spark Job Server maintains the list of binaries that we have uploaded, and it maintains a list of running Spark context.

Monitoring: Spark Context

Okay, so Spark Job Server stores to bind the job result

Monitoring: Jobs

and job configuration in the underlying database. You can query the job status once the execution is completed.

Monitoring; Yarn Job

So this is a typical view of you in the resource manager, URI, you can see the long running Spark context. So well, so there is this, the context name and impersonation user. You can check-ins YARN resource manager, URI. Okay, this is the new resource manager UI representation, you can see a long running Spark context in there. So now you have heard enough of theory, let’s go to the demo. – [Instructor] This is a Spark Job Server web interface. It contains a list of running, completed and failed jobs. In the binary section, we can see all the binaries that we have uploaded in order to run these jobs. Let’s see how to execute ETL pipeline preview using Spark Job Server. This is Informatica developer tool, I have created a simple ETL pipeline to process the contact list. where we have JSON contact list, so there is a transmission logics and we are writing it to parquet.

So let us take a look at the source.

It contains the contact information, where it has name, age, gender, company, email, poll, et cetera. As you notice, tags is of type array,

and friends is of type array of struct.

This is how complex objects are represented in Informatica developer tools. Let us check the data flowing through transformation objects.

All mappings are Canvas based, we have a long list of transformations supported in order to design ETL pipelines more efficiently. Informatica developers need data visualization to understand the data very well.

As you’ll see, so, I have defined filter transmission to filter only active context.

So the data coming out of this particular filter to contain all the contact list, having act, is active set to one.

Say for some reason, you have to modify the transmission logic, so let’s take a look at this particular filter.

This is designed to filter eye color with green, so let us change it to blue.

We can quickly change the logic and check the correctness by previewing the transmission.

ETL developers needs to modify the mapping logic if there is a change in requirements or there is change in data. Faster Data Preview helps to validate the transformation logic change mode quickly.

With the change in logic, we are able to see the filter with eye color being set to blue.

Let us see how Data Preview helps us to identify the bug in the pipeline.

This is a buggy pipeline where target is getting null rows. Let us check what is the data flowing through each transmission. Well, there is some data, there is some data flowing similarly.

Well, so filter transmission here, there is no data going out of it, so this is evident that filter one here is a buggy, the developer can come and fix the filter one.

We are using shared Spark context to run all of our Data Preview jobs.

Spark Job Server maintains the list of all running context. In our demo setup, this was the Spark context name, this is the Spark history server URI. You can kill Spark context using kill option in the job server.

This is what I wanted to show for the demo.

Watch more Spark + AI sessions here
or
Try Databricks for free
« back
About Sailee Jain

Informatica

Sailee is a Senior Software Engineer in Informatica's Big Data Management team focused on data integration solutions in Hadoop environment. She recently worked on the data visualization project which allows previewing hierarchical data flowing through various transformations in a Spark data pipeline. Sailee received her Master's in Computer Science from Indian Institute of Technology Bombay. She is currently working on Informatica's Elastic Cloud data integration project.

About Prabhakar Gouda

Informatica

Prabhakar Gouda works as a senior software engineer at Informatica with 8+ year of experience in software development and design techniques. He is part of the Informatica core Engineering team that works on various Informatica's Big Data processing platforms which leverages the power of Big Data processing and management using Apache Spark, Hadoop, YARN, Hbase, HDFS and Hive. Recently he worked on data visualization project that utilizes Spark-jobserver to submit and manage the spark tasks on to the Hadoop cluster.