Friday, November 2, 2018

Set up an external metastore for Azure Databricks

                  Set up an external metastore for Azure Databricks


Set up an external metastore using the web UI


  1. Click the Clusters button on the sidebar.
  2. Click Create Cluster.
  3. Click Show advanced settings, and navigate to the Spark tab.
  4. Enter the following Spark configuration options:
    Set the following configurations under Spark Config.
Note :-  <mssql-username> and <mssql-password> specify the username and password of your Azure SQL database account that has read/write access to the database
```
javax.jdo.option.ConnectionURL jdbc:sqlserver://abizerdb.database.windows.net:1433;database=test_abizerDB
javax.jdo.option.ConnectionPassword < Password >
datanucleus.schema.autoCreateAll true
spark.hadoop.hive.metastore.schema.verification false
datanucleus.autoCreateSchema true
spark.sql.hive.metastore.jars maven
javax.jdo.option.ConnectionDriverName com.microsoft.sqlserver.jdbc.SQLServerDriver
spark.sql.hive.metastore.version 1.2.0
javax.jdo.option.ConnectionUserName abizer@abizerdb
datanucleus.fixedDatastore false
```

5. Continue your cluster configuration, Click Create Cluster to create the cluster.


Once cluster is up , in the driver logs you should see below details logged .


18/11/02 20:34:29 INFO SparkConfUtils$: Set spark config: spark.sql.hive.metastore.version -> 1.2.0
18/11/02 20:34:29 INFO SparkConfUtils$: Set spark config: datanucleus.autoCreateSchema -> true
18/11/02 20:34:29 INFO SparkConfUtils$: Set spark config: spark.hadoop.hive.metastore.schema.verification -> false
18/11/02 20:34:29 INFO SparkConfUtils$: Set spark config: javax.jdo.option.ConnectionURL -> jdbc:sqlserver://abizerdb.database.windows.net:1433;database=test_abizerDB
18/11/02 20:34:29 INFO SparkConfUtils$: Set spark config: datanucleus.schema.autoCreateAll -> true
18/11/02 20:34:29 INFO SparkConfUtils$: Set spark config: javax.jdo.option.ConnectionUserName -> abizer@abizerdb
18/11/02 20:34:29 INFO SparkConfUtils$: Set spark config: spark.databricks.delta.preview.enabled -> true
18/11/02 20:34:29 INFO SparkConfUtils$: Set spark config: spark.driver.tempDirectory -> /local_disk0/tmp
18/11/02 20:34:29 INFO SparkConfUtils$: Set spark config: spark.sql.hive.metastore.jars -> maven
18/11/02 20:34:29 INFO SparkConfUtils$: Set spark config: datanucleus.fixedDatastore -> false
18/11/02 20:34:29 INFO SparkConfUtils$: Set spark config: javax.jdo.option.ConnectionDriverName -> com.microsoft.sqlserver.jdbc.SQLServerDriver


Once you confirm everything looks fine attach a notebook and try to create test DB and tables as below.


I cross checked via SQLWorkbench and see all the metastore tables as expected.



Also the new Spark tables metadata is present, so external metastore is setup correctly !





Monday, October 1, 2018

How to get cluster id, workspace id, notebook url or job id in Databricks

                  How to get cluster id, workspace id, notebook url or job id in Databricks


how to get your cluster id, workspace id, notebook url, job id?  Its quite simple to get all the info from Databricks UI in few clicks, but its important to understand what and where to find the relevant info from the URL's

Lets first get URL of interest for test cluster which we created to get  cluster id and workspace id .

 - > Click on Clusters tab in left pane and then select the test cluster you created, this should take you to cluster configuration URL as below .

https://dbc-4fbccf38-2120.cloud.databricks.com/?o=4437076263937401#/setting/clusters/1002-003809-rally984/configuration



Here the URL has some important info which Databricks would need for debugging .

https://<Shard-endpoint-name>/?o=<workspaceID>#/setting/clusters/<clusterID>/configuration


Shard-endpoint-name -  This is the Shard name/Instance name which is assigned to customers for each of their unique deployments, Usually customers create separate shards for their Dev , Stage and Production to segregate the workload and grant access to only relevant users .

workspaceID - The random number after "o=" is the Databricks workspaceID . Databricks workspace is deployment where Databricks platform is spun up and deployed where users can be onboard to spin up spark clusters on demand or schedule their workload . Every workspace has a Unique Workspace ID and this info is unique per workspace .

clusterID - The random key made from numbers and strings would form a cluster ID i.e 1002-003809-rally984 in this example . Databricks needs this cluster ID for troubleshooting any issues such as  cluster not start , cluster abruptly terminating , cluster not terminating etc .

Notebook URL -  Notebook URL is the https URL which can be shared with anyone on Databricks platform with proper access to be able to access the notebook , made modification, debug etc  . Every notebook cell would have different URL address .

(Notebook URL - https://dbc-4fbccf38-2120.cloud.databricks.com/?o=4437076263937401#notebook/2416703801683744/command/2416703801683746)




Job ID  - Click on Jobs tab in left pane and select the Job ID of interest and click on the Job name, this should take you to job details URL as below. This Job URL is very critical piece of info needed to troubleshoot job runs which have failed and Root cause need to be investigated.


Jobs listing :



Job Details :








Saturday, September 15, 2018

Connecting Databricks Spark cluster to Amazon Redshift

                            Connecting Databricks Spark cluster to Amazon Redshift 


The use of Redshift connector involves several network connections, illustrated in the following diagram:



This library reads and writes data to S3 when transferring data to/from Redshift. As a result, it requires IAM role with read and write access to a S3 bucket (specified using the tempdirconfiguration parameter)attached to the Spark Cluster.

Setup Details :

Redshift :

1) Create an IAM role for attaching to Redshift cluster when we bring it up. This role should have access to write to S3 bucker from Redshift service.



2) Create a redshift  cluster from AWS console after populating basic info .

Spark Cluster :

3) Create a role with complete S3 access which would be used to write to temp bucket during parallel write and reads . Below is screenshot of role created .

4)  Create a Spark cluster with IAM role having write access to temp S3 bucket and DBR version greater than 3.5
 5) Download and attach JDBC jar to the cluster as shown below.

Jar Download : https://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html



5) Import below notebook in your Db workspace for us to be able to read and write to Redshift from DB clusters.

https://docs.databricks.com/_static/notebooks/redshift.html     

5.1) Set the temp S3 location which DB cluster can use.

                
5.2)  Pass credentials for connecting to Redshift




Now the Connection is established , we will do read and write test to ensure everything works as expected.


Write Test : 

1 a ) We have a pre-populated table "diamonds" in the cluster as seen below.



b ) Running below code in Scala cell would append data from  table "diamonds" in the spark cluster to Redshift .

import org.apache.spark.sql.SaveMode
sqlContext.sql("select * from diamonds limit 10")
  .write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcUrl)
  .option("tempdir", tempDir)
  .option("dbtable", "diamonds")
  .option("aws_iam_role","arn:aws:iam::826763667205:role/Redshift_cust")
  .mode(SaveMode.Append) // <--- Append to the existing table
  .save()
         
Once cell execution is successful we can connect to Redshift cluster via Sql workbench and verify data was populated .


Read Test : 

2 a) we'll load data from the Redshift tables that we created in the previous write test i.e we'll create a DataFrame from an entire Redshift table:

Run Below code to create the DF 

val diamonds_from_redshift = sqlContext.read
  .format("com.databricks.spark.redshift")
  .option("url", jdbcUrl) // <--- JDBC URL that we configured earlier
  .option("tempdir", tempDir) // <--- temporary bucket that we created earlier
  .option("dbtable", "diamonds") // <--- name of the table in Redshift
  .option("aws_iam_role","arn:aws:iam::826763667205:role/Redshift_cust")
  .load()

Once the DF is created, we can create a temp view and query the temp table to confirm read from Redshift also works as expected .



Bingo we are able to read and write to/from redshift from DB cluster !!!



Sunday, September 9, 2018

Set up an external metastore for Databricks deployment (via Init script)

        Set up an external metastore for Databricks deployment (via Init script)


In my earlier blog we saw how to setup a Mysql DB and setup VPC peering for databricks VPC to communicate with VPC Mysql is in.
http://abizeradenwala.blogspot.com/2018/09/vpc-peering-between-two-vpcs-in-same.html
Now we will use this Mysql as an external metastore for our DB spark clusters, when you want your clusters to connect to your existing Hive metastore without explicitly setting required configurations, setting this via init scripts would be easy way to have DB cluster connect to external megastore every time cluster starts.
To set up an external metastore in the local mode using an init script, open a notebook and execute the following snippet (Connecting to ExternalMetastoreTesting cluster, yet its not using external metastore). This snippet adds an init script external-metastore.sh (this name is not mandatory) to /databricks/init/<cluster-name>/ in Databricks File System (DBFS). 

Note : 
The reason we use "mariadb" driver is since Databricks comes with JDBC libraries for MySQL out of the box. You can use any driver as long as the JDBC driver jar is specifically put in classpath. 
  • Databricks Runtime 3.4 and above include the org.mariadb.jdbc driver.
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#connecting-to-sql-databases-using-jdbc

This init script writes required configuration options to a configuration file named 00-custom-spark.conf in a JSON-like format under /databricks/driver/conf/ inside every node of the cluster, whenever a cluster with the name specified as <cluster-name> starts.

 Note :- that Databricks provides default Spark configurations in the /databricks/driver/conf/spark-branch.conf file. Configuration files in the /databricks/driver/conf directory apply in reverse alphabetical order. If you want to change the name of the 00-custom-spark.conf file, make sure that it continues to apply before the spark-branch.conf file.
Now edit the cluster and set below properties in spark config .
spark.hadoop.hive.metastore.schema.verification false datanucleus.fixedDatastore false datanucleus.schema.autoCreateAll true datanucleus.autoCreateSchema true



Verification : We can see now the metadata is coming from the external metastore we configured.


Directly connecting to Mysql does show the metastore table "abihive" exist and has required tables.

C02WG59KHTD5:a2df71c3-a02a-11e8-821f-000d3a04560d abizeradenwala$ mysql -h externalmetastore.cj11tymkwz5w.us-west-2.rds.amazonaws.com -P 3306 -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 3403
Server version: 5.7.22-log Source distribution

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| ExternalMetastore  |
| abihive            |             |
| innodb             |
| mysql              |
| performance_schema |
| sys                |
+--------------------+
11 rows in set (0.08 sec)

mysql> use abihive;

Database changed
mysql> show tables;
+---------------------------+
| Tables_in_abihive         |
+---------------------------+
| BUCKETING_COLS            |
| CDS                       |
| COLUMNS_V2                |
| DATABASE_PARAMS           |
| DBS                       |
| FUNCS                     |
| FUNC_RU                   |
| GLOBAL_PRIVS              |
| PARTITIONS                |
| PARTITION_KEYS            |
| PARTITION_KEY_VALS        |
| PARTITION_PARAMS          |
| PART_COL_STATS            |
| ROLES                     |
| SDS                       |
| SD_PARAMS                 |
| SEQUENCE_TABLE            |
| SERDES                    |
| SERDE_PARAMS              |
| SKEWED_COL_NAMES          |
| SKEWED_COL_VALUE_LOC_MAP  |
| SKEWED_STRING_LIST        |
| SKEWED_STRING_LIST_VALUES |
| SKEWED_VALUES             |
| SORT_COLS                 |
| TABLE_PARAMS              |
| TAB_COL_STATS             |
| TBLS                      |
| VERSION                   |
+---------------------------+
29 rows in set (0.08 sec)

mysql> 

Also we can review driver logs when the cluster starts to review all the required configs are picked up correctly.

18/09/09 20:07:13 INFO MetastoreMonitor$: Generic external metastore configurd (config=jdbc:mariadb://externalmetastore.cj11tymkwz5w.us-west-2.rds.amazonaws.com:3306/abihive?createDatabaseIfNotExist=true) 
18/09/09 20:07:14 INFO DriverCorral: Creating the driver context 
18/09/09 20:07:14 INFO DatabricksILoop$: Class Server Dir: /tmp/spark-6467922a-76bc-45ff-885f-d1c1879c95a9 
18/09/09 20:07:14 INFO SparkConfUtils$: Customize spark config according to file /tmp/custom-spark.conf 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: datanucleus.autoCreateSchema -> true 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: spark.executor.tempDirectory -> /local_disk0/tmp 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: spark.hadoop.hive.metastore.schema.verification -> false 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: datanucleus.fixedDatastore -> false 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: datanucleus.schema.autoCreateAll -> true 
18/09/09 20:07:14 INFO SparkConfUtils$: Set spark config: spark.driver.tempDirectory -> /local_disk0/tmp 
18/09/09 20:07:14 WARN SparkConf: The configuration key 'spark.scheduler.listenerbus.eventqueue.size' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.scheduler.listenerbus.eventqueue.capacity' instead. 
18/09/09 20:07:14 INFO SparkContext: Running Spark version 2.3.1 
18/09/09 20:07:15 INFO SparkContext: Submitted application: Databricks Shell 18/09/09 20:07:15 INFO SparkContext: Spark configuration: datanucleus.autoCreateSchema=true datanucleus.fixedDatastore=false datanucleus.schema.autoCreateAll=true eventLog.rolloverIntervalSeconds=3600 spark.akka.frameSize=256 spark.app.name=Databricks Shell spark.cleaner.referenceTracking.blocking=false spark.databricks.acl.client=com.databricks.spark.sql.acl.client.SparkSqlAclClient spark.databricks.acl.provider=com.databricks.sql.acl.ReflectionBackedAclProvider spark.databricks.cloudProvider=AWS spark.databricks.clusterSource=UI spark.databricks.clusterUsageTags.autoTerminationMinutes=120 spark.databricks.clusterUsageTags.clusterAllTags=[{"key":"Vendor","value":"Databricks"},{"key":"Creator","value":"abizer.adenwala@databricks.com"},{"key":"ClusterName","value":"ExternalMetastoreTesting"},{"key":"ClusterId","value":"0909-180303-tall49"},{"key":"Name","value":"cust-success-worker"}]

Yay , external metastore is now configured .

Note:-
Below google doc link has my notebook in HTML and DBC format for reviewer to review and easily use .
Troubleshooting :

Incase if the Databricks UI shows the database tables not loading, we can review driver logs and checkout errors if any .

1) In below case it is looking for "abihive" database in external metastore but this database is missing hence the problem . To fix this either create this DB in external metastore manually or add "?createDatabaseIfNotExist=true" while giving metastore string as below.

"spark.hadoop.javax.jdo.option.ConnectionURL" = "jdbc:mariadb://externalmetastore.cj11tymkwz5w.us-west-2.rds.amazonaws.com:3306/abihive?createDatabaseIfNotExist=true"

================================================================
18/09/09 19:16:56 ERROR Schema: Failed initialising database.
Unable to open a test connection to the given database. JDBC url = jdbc:mariadb://externalmetastore.cj11tymkwz5w.us-west-2.rds.amazonaws.com:3306/abihive, username = root. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ------
java.sql.SQLSyntaxErrorException: Unknown database 'abihive'
at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.get(ExceptionMapper.java:163)
------
org.datanucleus.exceptions.NucleusDataStoreException: Unable to open a test connection to the given database. JDBC url = jdbc:mariadb://externalmetastore.cj11tymkwz5w.us-west-2.rds.amazonaws.com:3306/abihive, username = root. Terminating connection pool (set lazyInit to true if you expect to start your database after your app). Original Exception: ------
java.sql.SQLSyntaxErrorException: Unknown database 'abihive'
at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.get(ExceptionMapper.java:163)
at org.mariadb.jdbc.internal.util.exceptions.ExceptionMapper.getException(ExceptionMapper.java:106)
================================================================



Saturday, September 8, 2018

NACL and Security Group settings in Databricks

                               NACL and Security Group settings in Databricks


AWS offers virtual firewalls to organizations, for filtering traffic that crosses their cloud network segments.  The AWS firewalls are managed using a concept called Security Groups. 


Security Groups are:

  1. Stateful  -- easier to manage, by just setting rules for one direction.
  2. VPC Scoped -- work in any AZ or Subnet
  3. Allow rules only -- everything is implicitly denied (Whitelisting only)

To further enhance and enrich its security filtering capabilities AWS also offers a feature called Network Access Control Lists (NACLs).  Like security groups, each NACL is a list of rules, but there also have Deny rules and Order to apply rules can be specified.
NACLS are:
  1. Stateless -- Inbound and Outbound rules must always be configured. 
  2. Subnet Scoped --Must be explictly associated to one or more subnets
  3. Allow and Deny both rules can be set
  4. Rules processed in order -- when a rule is matched, no rules further down the list are evaluated
  5. Rules processed at the subnet boundary

How are Network rule applied to specific EC2 instance ? Answer is "It’s all about the order"

Since NACL has the ability to write both ‘allow’ rules and ‘deny’ rules, the order of the rules now becomes important.  If you switch the order of the rules between a ‘deny’ and ‘allow’ rule, then you’re potentially changing your filtering policy quite dramatically. To manage this, AWS uses the concept of a ‘rule number’ within each NACL.  By specifying the rule number, you can identify the correct order of the rules for your needs. You can choose which traffic you deny at the outset, and which you then actively allow. As such, with NACLs you can manage security tasks in a way that you cannot do with security groups alone.  However an instance inherits security rules from both the security groups, and from the NACLs .
-  For inbound traffic, AWS’s infrastructure first assesses the NACL rules.  If traffic gets through the NACL, then all the security groups that are associated with that specific instance are evaluated, and the order in which this happens within and among the security groups is unimportant because they are all ‘allow’ rules.
-  For outbound traffic, this order is reversed:  the traffic is first evaluated against the security groups, and then finally against the NACL that is associated with the relevant subnet.
Now that we understand how SG and NACL concepts work for AWS instances we will now see what are default SG and NACL applied when Databricks spins up instances for clusters in newly provisioned Shard.


Currently every EC2 instance in Databricks which comes up in the specific VPC configured will be associated with 2 security groups by default which are

1) *-worker - This is managed by Databricks engineering (Has port ranges and source CIDR already specified and cannot be changed)
2) *-worker-unmanaged - This can be customized to whitelist ports and source traffic as appropriate.

Lets see practically how to get to SG's from one of our spark cluster spun up.

1) Click on Spark UI tab to get the hostname for Driver.



2) Details for the instance can be pulled up from searching under EC2 dashboard.




SG-id (sg-a2614edc) ) for Managed SG .




UnManaged SG

3)  Security inbound rule at instance level can be viewed as below


Managed SG inbound rules



Un-Managed SG inbound rules

4) Security out-inbound rule at instance level can be viewed as below



                                                 Managed SG outbound rules

                                                         Un-managed SG outbound rules



Below is list of ports and why they are used opened and what services run or communicate on them.

Port                     Port use

22                        SSH to core instance
2200                    Used to ssh to Spark Containers (Driver or workers)
4040-4100           Used internally by Spark
7077                     Spark Driver port
10000                  JDBC port for third-party applications such as Tableau and ZoomData
32768-61000.      Ephemeral port range used to bind Spark UIs so they can be accessed by the webapp 
6060                    Cluster Manager and Webapp
7072                     Cluster Manager: Data Daemon ops port
8080                    Cluster Manager: Node daemon port
8081                     Cluster Manager: Ganglia
8649, 8651          Cluster Manager: JDBC
10000                   Cluster Manager: JDBC
29998-29999.    Cluster Manager: Tachyon
32768-61000      WebApp: Ephemeral ports for UI
8649(UDP).         WebApp: Ganglia


NACL :

NACL related to DB are wide open and would be good way to block traffic on any specific port. Like I have worked on few issues where all DNS traffic from specific CIDR needs to be blocked , if the inbound rules are updated to block all the traffic on port 53 from specific source CIDR it would be sufficient.

                                            NACL Inbound rules allowing all traffic

                                            NACL Outbound rules allowing all traffic