Saturday, September 15, 2018

Connecting Databricks Spark cluster to Amazon Redshift

                            Connecting Databricks Spark cluster to Amazon Redshift 


The use of Redshift connector involves several network connections, illustrated in the following diagram:



This library reads and writes data to S3 when transferring data to/from Redshift. As a result, it requires IAM role with read and write access to a S3 bucket (specified using the tempdirconfiguration parameter)attached to the Spark Cluster.

Setup Details :

Redshift :

1) Create an IAM role for attaching to Redshift cluster when we bring it up. This role should have access to write to S3 bucker from Redshift service.



2) Create a redshift  cluster from AWS console after populating basic info .

Spark Cluster :

3) Create a role with complete S3 access which would be used to write to temp bucket during parallel write and reads . Below is screenshot of role created .

4)  Create a Spark cluster with IAM role having write access to temp S3 bucket and DBR version greater than 3.5
 5) Download and attach JDBC jar to the cluster as shown below.

Jar Download : https://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html



5) Import below notebook in your Db workspace for us to be able to read and write to Redshift from DB clusters.

https://docs.databricks.com/_static/notebooks/redshift.html     

5.1) Set the temp S3 location which DB cluster can use.

                
5.2)  Pass credentials for connecting to Redshift




Now the Connection is established , we will do read and write test to ensure everything works as expected.


Write Test : 

1 a ) We have a pre-populated table "diamonds" in the cluster as seen below.



b ) Running below code in Scala cell would append data from  table "diamonds" in the spark cluster to Redshift .

import org.apache.spark.sql.SaveMode
sqlContext.sql("select * from diamonds limit 10")
  .write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcUrl)
  .option("tempdir", tempDir)
  .option("dbtable", "diamonds")
  .option("aws_iam_role","arn:aws:iam::826763667205:role/Redshift_cust")
  .mode(SaveMode.Append) // <--- Append to the existing table
  .save()
         
Once cell execution is successful we can connect to Redshift cluster via Sql workbench and verify data was populated .


Read Test : 

2 a) we'll load data from the Redshift tables that we created in the previous write test i.e we'll create a DataFrame from an entire Redshift table:

Run Below code to create the DF 

val diamonds_from_redshift = sqlContext.read
  .format("com.databricks.spark.redshift")
  .option("url", jdbcUrl) // <--- JDBC URL that we configured earlier
  .option("tempdir", tempDir) // <--- temporary bucket that we created earlier
  .option("dbtable", "diamonds") // <--- name of the table in Redshift
  .option("aws_iam_role","arn:aws:iam::826763667205:role/Redshift_cust")
  .load()

Once the DF is created, we can create a temp view and query the temp table to confirm read from Redshift also works as expected .



Bingo we are able to read and write to/from redshift from DB cluster !!!



Sunday, September 9, 2018

Set up an external metastore for Databricks deployment (via Init script)

        Set up an external metastore for Databricks deployment (via Init script)


In my earlier blog we saw how to setup a Mysql DB and setup VPC peering for databricks VPC to communicate with VPC Mysql is in.
http://abizeradenwala.blogspot.com/2018/09/vpc-peering-between-two-vpcs-in-same.html
Now we will use this Mysql as an external metastore for our DB spark clusters, when you want your clusters to connect to your existing Hive metastore without explicitly setting required configurations, setting this via init scripts would be easy way to have DB cluster connect to external megastore every time cluster starts.
To set up an external metastore in the local mode using an init script, open a notebook and execute the following snippet (Connecting to ExternalMetastoreTesting cluster, yet its not using external metastore). This snippet adds an init script external-metastore.sh (this name is not mandatory) to /databricks/init/<cluster-name>/ in Databricks File System (DBFS). 

Note : 
The reason we use "mariadb" driver is since Databricks comes with JDBC libraries for MySQL out of the box. You can use any driver as long as the JDBC driver jar is specifically put in classpath. 
  • Databricks Runtime 3.4 and above include the org.mariadb.jdbc driver.
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#connecting-to-sql-databases-using-jdbc

This init script writes required configuration options to a configuration file named 00-custom-spark.conf in a JSON-like format under /databricks/driver/conf/ inside every node of the cluster, whenever a cluster with the name specified as <cluster-name> starts.

 Note :- that Databricks provides default Spark configurations in the /databricks/driver/conf/spark-branch.conf file. Configuration files in the /databricks/driver/conf directory apply in reverse alphabetical order. If you want to change the name of the 00-custom-spark.conf file, make sure that it continues to apply before the spark-branch.conf file.
Now edit the cluster and set below properties in spark config .
spark.hadoop.hive.metastore.schema.verification false datanucleus.fixedDatastore false datanucleus.schema.autoCreateAll true datanucleus.autoCreateSchema true



Verification : We can see now the metadata is coming from the external metastore we configured.


Directly connecting to Mysql does show the metastore table "abihive" exist and has required tables.

C02WG59KHTD5:a2df71c3-a02a-11e8-821f-000d3a04560d abizeradenwala$ mysql -h externalmetastore.cj11tymkwz5w.us-west-2.rds.amazonaws.com -P 3306 -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 3403
Server version: 5.7.22-log Source distribution

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| ExternalMetastore  |
| abihive            |             |
| innodb             |
| mysql              |
| performance_schema |
| sys                |
+--------------------+
11 rows in set (0.08 sec)

mysql> use abihive;

Database changed
mysql> show tables;
+---------------------------+
| Tables_in_abihive         |
+---------------------------+
| BUCKETING_COLS            |
| CDS                       |
| COLUMNS_V2                |
| DATABASE_PARAMS           |
| DBS                       |
| FUNCS                     |
| FUNC_RU                   |
| GLOBAL_PRIVS              |
| PARTITIONS                |
| PARTITION_KEYS            |
| PARTITION_KEY_VALS        |
| PARTITION_PARAMS          |
| PART_COL_STATS            |
| ROLES                     |
| SDS                       |
| SD_PARAMS                 |
| SEQUENCE_TABLE            |
| SERDES                    |
| SERDE_PARAMS              |
| SKEWED_COL_NAMES          |
| SKEWED_COL_VALUE_LOC_MAP  |
| SKEWED_STRING_LIST        |
| SKEWED_STRING_LIST_VALUES |
| SKEWED_VALUES             |
| SORT_COLS                 |
| TABLE_PARAMS              |
| TAB_COL_STATS             |
| TBLS                      |
| VERSION                   |
+---------------------------+
29 rows in set (0.08 sec)

mysql> 

Also we can review driver logs when the cluster starts to review all the required configs are picked up correctly.

18/09/09 20:07:13 INFO MetastoreMonitor$: Generic external metastore configurd (config=jdbc:mariadb://externalmetastore.cj11tymkwz5w.us-west-2.rds.amazonaws.com:3306/abihive?createDatabaseIfNotExist=true) 
18/09/09 20:07:14 INFO DriverCorral: Creating the driver context 
18/09/09 20:07:14 INFO DatabricksILoop$: Class Server Dir: /tmp/spark-6467922a-76bc-45ff-885f-d1c1879c95a9 
18/09/09 20:07:14 INFO SparkConfUtils$: Customize spark config according to file /tmp/custom-spark.conf 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: datanucleus.autoCreateSchema -> true 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: spark.executor.tempDirectory -> /local_disk0/tmp 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: spark.hadoop.hive.metastore.schema.verification -> false 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: datanucleus.fixedDatastore -> false 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: datanucleus.schema.autoCreateAll -> true 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: spark.driver.tempDirectory -> /local_disk0/tmp 
18/09/09 20:07:14 WARN SparkConf: The configuration key 'spark.scheduler.listenerbus.eventqueue.size' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.scheduler.listenerbus.eventqueue.capacity' instead. 
18/09/09 20:07:14 INFO SparkContext: Running Spark version 2.3.1 
18/09/09 20:07:15 INFO SparkContext: Submitted application: Databricks Shell 18/09/09 20:07:15 INFO SparkContext: Spark configuration: datanucleus.autoCreateSchema=true datanucleus.fixedDatastore=false datanucleus.schema.autoCreateAll=true eventLog.rolloverIntervalSeconds=3600 spark.akka.frameSize=256 spark.app.name=Databricks Shell spark.cleaner.referenceTracking.blocking=false spark.databricks.acl.client=com.databricks.spark.sql.acl.client.SparkSqlAclClient spark.databricks.acl.provider=com.databricks.sql.acl.ReflectionBackedAclProvider spark.databricks.cloudProvider=AWS spark.databricks.clusterSource=UI spark.databricks.clusterUsageTags.autoTerminationMinutes=120 spark.databricks.clusterUsageTags.clusterAllTags=[{"key":"Vendor","value":"Databricks"},{"key":"Creator","value":"abizer.adenwala@databricks.com"},{"key":"ClusterName","value":"ExternalMetastoreTesting"},{"key":"ClusterId","value":"0909-180303-tall49"},{"key":"Name","value":"cust-success-worker"}]

Yay , external metastore is now configured .

Note:-
Below google doc link has my notebook in HTML and DBC format for reviewer to review and easily use .
Troubleshooting :

Incase if the Databricks UI shows the database tables not loading, we can review driver logs and checkout errors if any .

1) In below case it is looking for "abihive" database in external metastore but this database is missing hence the problem . To fix this either create this DB in external metastore manually or add "?createDatabaseIfNotExist=true" while giving metastore string as below.

"spark.hadoop.javax.jdo.option.ConnectionURL" = "jdbc:mariadb://externalmetastore.cj11tymkwz5w.us-west-2.rds.amazonaws.com:3306/abihive?createDatabaseIfNotExist=true"

================================================================
18/09/09 19:16:56 ERROR Schema: Failed initialising database.
Unable to open a test connection to the given database. JDBC url = jdbc:mariadb://externalmetastore.cj11tymkwz5w.us-west-2.rds.amazonaws.com:3306/abihive, username = root. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ------
java.sql.SQLSyntaxErrorException: Unknown database 'abihive'
at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.get(ExceptionMapper.java:163)
------
org.datanucleus.exceptions.NucleusDataStoreException: Unable to open a test connection to the given database. JDBC url = jdbc:mariadb://externalmetastore.cj11tymkwz5w.us-west-2.rds.amazonaws.com:3306/abihive, username = root. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ------
java.sql.SQLSyntaxErrorException: Unknown database 'abihive'
at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.get(ExceptionMapper.java:163)
at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.getException(ExceptionMapper.java:106)
================================================================



Saturday, September 8, 2018

NACL and Security Group settings in Databricks

                               NACL and Security Group settings in Databricks


AWS offers virtual firewalls to organizations, for filtering traffic that crosses their cloud network segments.  The AWS firewalls are managed using a concept called Security Groups. 


Security Groups are:

  1. Stateful  -- easier to manage, by just setting rules for one direction.
  2. VPC Scoped -- work in any AZ or Subnet
  3. Allow rules only -- everything is implicitly denied (Whitelisting only)

To further enhance and enrich its security filtering capabilities AWS also offers a feature called Network Access Control Lists (NACLs).  Like security groups, each NACL is a list of rules, but there also have Deny rules and Order to apply rules can be specified.
NACLS are:
  1. Stateless -- Inbound and Outbound rules must always be configured. 
  2. Subnet Scoped --Must be explictly associated to one or more subnets
  3. Allow and Deny both rules can be set
  4. Rules processed in order -- when a rule is matched, no rules further down the list are evaluated
  5. Rules processed at the subnet boundary

How are Network rule applied to specific EC2 instance ? Answer is "It’s all about the order"

Since NACL has the ability to write both ‘allow’ rules and ‘deny’ rules, the order of the rules now becomes important.  If you switch the order of the rules between a ‘deny’ and ‘allow’ rule, then you’re potentially changing your filtering policy quite dramatically. To manage this, AWS uses the concept of a ‘rule number’ within each NACL.  By specifying the rule number, you can identify the correct order of the rules for your needs. You can choose which traffic you deny at the outset, and which you then actively allow. As such, with NACLs you can manage security tasks in a way that you cannot do with security groups alone.  However an instance inherits security rules from both the security groups, and from the NACLs .
-  For inbound traffic, AWS’s infrastructure first assesses the NACL rules.  If traffic gets through the NACL, then all the security groups that are associated with that specific instance are evaluated, and the order in which this happens within and among the security groups is unimportant because they are all ‘allow’ rules.
-  For outbound traffic, this order is reversed:  the traffic is first evaluated against the security groups, and then finally against the NACL that is associated with the relevant subnet.
Now that we understand how SG and NACL concepts work for AWS instances we will now see what are default SG and NACL applied when Databricks spins up instances for clusters in newly provisioned Shard.


Currently every EC2 instance in Databricks which comes up in the specific VPC configured will be associated with 2 security groups by default which are

1) *-worker - This is managed by Databricks engineering (Has port ranges and source CIDR already specified and cannot be changed)
2) *-worker-unmanaged - This can be customized to whitelist ports and source traffic as appropriate.

Lets see practically how to get to SG's from one of our spark cluster spun up.

1) Click on Spark UI tab to get the hostname for Driver.



2) Details for the instance can be pulled up from searching under EC2 dashboard.




SG-id (sg-a2614edc) ) for Managed SG .




UnManaged SG

3)  Security inbound rule at instance level can be viewed as below


Managed SG inbound rules



Un-Managed SG inbound rules

4) Security out-inbound rule at instance level can be viewed as below



                                                 Managed SG outbound rules

                                                         Un-managed SG outbound rules



Below is list of ports and why they are used opened and what services run or communicate on them.

Port                     Port use

22                        SSH to core instance
2200                    Used to ssh to Spark Containers (Driver or workers)
4040-4100           Used internally by Spark
7077                     Spark Driver port
10000                  JDBC port for third-party applications such as Tableau and ZoomData
32768-61000.      Ephemeral port range used to bind Spark UIs so they can be accessed by the webapp 
6060                    Cluster Manager and Webapp
7072                     Cluster Manager: Data Daemon ops port
8080                    Cluster Manager: Node daemon port
8081                     Cluster Manager: Ganglia
8649, 8651          Cluster Manager: JDBC
10000                   Cluster Manager: JDBC
29998-29999.    Cluster Manager: Tachyon
32768-61000      WebApp: Ephemeral ports for UI
8649(UDP).         WebApp: Ganglia


NACL :

NACL related to DB are wide open and would be good way to block traffic on any specific port. Like I have worked on few issues where all DNS traffic from specific CIDR needs to be blocked , if the inbound rules are updated to block all the traffic on port 53 from specific source CIDR it would be sufficient.

                                            NACL Inbound rules allowing all traffic

                                            NACL Outbound rules allowing all traffic







Friday, September 7, 2018

VPC peering between two VPC's in same account

                             VPC peering between two VPC's in same account

VPC peering allows your Databricks clusters to connect to your other AWS infrastructure (RDS, Redshift, Kafka, Cassandra, and so on) using private IP addresses within the internal AWS network. In order to establish a peering connection, both the Databricks VPC and the VPC hosting your other infrastructure must exist in the same AWS region. Also the VPC hosting the other infrastructure must have a CIDR range distinct from the Databricks VPC and any other CIDR range included as a destination in the Databricks VPC main route table.
In this Blog we will see how to create VPC peering between Databricks VPC where databricks shard is setup/running and a VPC where I will run Mysql DB.

Below table is ideal to keep up with details needed for this peering.

VPCDatabricks VPCvpc-7f4c0d1810.205.0.0/16
VPCmysql VPCvpc-09848ffe9f7781e96172.30.0.0/16
Route TableDatabricks Main Route Tablertb-c1c629a7

1) Databricks VPC where databricks shard is setup/running.





2) Create Mysql instance from Amazon RDS in VPC with no overlapping IP's.


Mysql VPC details :

Once Mysql is up we will get the endpoint (externalmetastore.cj11tymkwz5w.us-west-2.rds.amazonaws.com).



3)   Create a peering connection

  1. Navigate to the VPC Dashboard.
  2. Select Peering Connections.
  3. Click Create Peering Connection (Fill details as in screenshot)


Once peering request succeeds you should see as below


4)  Record the ID of the peering connection and update our table.


VPCDatabricks VPCvpc-7f4c0d1810.205.0.0/16
VPCmysql VPCvpc-09848ffe9f7781e96172.30.0.0/16
Route TableDatabricks Main Route Tablertb-c1c629a7
Peering ConnectionDatabricks VPC <> Mysql VPCpcx-0f503173dab903f9d

5) Accept the peering connection request as seen in fig below.


6) Add DNS resolution to peering connection

    7) Add destination to Databricks VPC main route table
  1. Select Route Tables in the VPC Dashboard.
  2. Search for the Databricks VPC ID.
  3. Click the Edit button under the Routes tab.
  4. Click Add another route.
  5. Enter the CIDR range of the Aurora VPC for the Destination.
  6. Enter the ID of the peering connection for the Target.
    8) Add destination to Aurora VPC main route table
    1. Select Route Tables in the VPC Dashboard.
    2. Search for the Mysql VPC ID.
    3. Click the Edit button under the Routes tab.
    4. Click Add another route.
    5. Enter the CIDR range of the Databricks VPC for the Destination.
    6. Enter the ID of the peering connection for the Target.













  9) Test connectivity :


  1. Create a Databricks cluster and attach a notebook.
  2. Check to see if you can connect to the database with the following netcat command:

    Yay !! the connection was successful .