Saturday, 27 May 2017

Azure Data Factory Spark Activity

Introduction

Spark Activity is one of the data transformation activities supported by Azure Data Factory. This activity runs the specified Spark program on your Apache Spark cluster in Azure HDInsight.

Prerequisite for ADF Spark Activity
1.    Create a Azure Storage Account and select account type as Blob storage.
2.    Create an Apache Spark cluster in Azure HDInsight and Associate the Azure storage account (Blob storage). 
While creating HDInsight Spark Cluster select Primary Storage Type as Azure Storage & select storage account that you have created.
Note- for ADF Spark Activity Blob Storage should be Primary Storage only.  You can add ADL as secondary storage if you want to access input Data from ADL.
3.      Create Folder Structure for Spark Job in Blob Storage.
Once you create a spark cluster with Storage as Blob Storage, it will create a container in Blob storage with Cluster name. Create Spark Job folder structure in this container.
Folder Structure should be like below.
SparkWordCount  --[ Folder ]
WordCount-0.0.1-SNAPSHOT.jar  --[ File ]
files   --[ Folder ]
input1.txt  --[ File ]
input2.txt  --[ File ]
SparkWordCount.properties  --[ File ]
--put all files that want to pass to spark job
jars  --[ Folder ]
package1.jar   --[ File ]
package2.jar  --[ File ]
--put dependency jars
logs  --[ Folder ]
Output  --[ Folder ]

How to create folder in Blob storage.
There is two way to create folder.
1.      Using Azure Storage Explorer. It will be install on your VM from where you want to access.
2.      Using putty . If you are using putty and creating dir
Hadoop fs -mkdir wasbs://<containername>@<accountname>.blob.core.windows.net/<path>
Hadoop fs -mkdir wasbs://mycontainer@myaccount.blob.core.windows.net/SparkWordCount

It will not show as folder in Azure portal unless you are not putting any file in this dir.

How to create a pipeline with Spark activity.

1.      Create a data factory if not exist.
2.      Create an Azure Storage linked service to link your Azure storage that is associated with your HDInsight Spark cluster to the data factory.
3.      Create an Azure HDInsight linked service to link your Apache Spark cluster in Azure HDInsight to the data factory.
4.      Create a dataset that refers to the Azure Storage linked service. Currently, you must specify an output dataset for an activity even if there is no output being produced.
5.      Create a pipeline with Spark activity that refers to the HDInsight linked service created in #2. The activity is configured with the dataset you created in the previous step as an output dataset.
The output dataset is what drives the schedule (hourly, daily, etc.). Therefore, you must specify the output dataset even though the activity does not really produce an output.

Create Data Factory

1.      Log in to the Azure portal.
2.      Click NEW on the left menu, click Data + Analytics, and click Data Factory.
3.      In the New data factory blade, enter SparkDF for the Name.
4.      Select the Azure subscription where you want the data factory to be created.
5.      Select an existing resource group or create an Azure resource group.
6.      Select Pin to dashboard option.
7.      Click Create on the New data factory blade.
8.      You see the data factory being created in the dashboard of the Azure portal as follows:
9.      After the data factory has been created successfully, you see the data factory page, which shows you the contents of the data factory. If you do not see the data factory page, click the tile for your data factory on the dashboard.
How to create folder in Blob storage ?
There is two way to create folder.
1.      Using Azure Storage Explorer. It will be install on your VM from where you want to access.
2.      Using putty . If you are using putty and creating dir
Hadoop fs -mkdir wasbs://<containername>@<accountname>.blob.core.windows.net/<path>
Hadoop fs -mkdir wasbs://mycontainer@myaccount.blob.core.windows.net/SparkWordCount

It will not show as folder in Azure portal unless you are not putting any file in this dir.

How to create a pipeline with Spark activity.
1.      Create a data factory if not exist.
2.      Create an Azure Storage linked service to link your Azure storage that is associated with your HDInsight Spark cluster to the data factory.
3.      Create an Azure HDInsight linked service to link your Apache Spark cluster in Azure HDInsight to the data factory.
4.      Create a dataset that refers to the Azure Storage linked service. Currently, you must specify an output dataset for an activity even if there is no output being produced.
5.      Create a pipeline with Spark activity that refers to the HDInsight linked service created in #2. The activity is configured with the dataset you created in the previous step as an output dataset.
The output dataset is what drives the schedule (hourly, daily, etc.). Therefore, you must specify the output dataset even though the activity does not really produce an output.

Create Data Factory

1.      Log in to the Azure portal.
2.      Click NEW on the left menu, click Data + Analytics, and click Data Factory.
3.      In the New data factory blade, enter SparkDF for the Name.
4.      Select the Azure subscription where you want the data factory to be created.
5.      Select an existing resource group or create an Azure resource group.
6.      Select Pin to dashboard option.
7.      Click Create on the New data factory blade.
8.      You see the data factory being created in the dashboard of the Azure portal as follows:
9.      After the data factory has been created successfully, you see the data factory page, which shows you the contents of the data factory. If you do not see the data factory page, click the tile for your data factory on the dashboard.

Create linked services
In this step, you create two linked services, one to link your Spark cluster to your data factory, and the other to link your Azure storage to your data factory.

Create Azure Storage linked service
In this step, you link your Azure Storage account to your data factory. A dataset you create in a step later in this walkthrough refers to this linked service. The HDInsight linked service that you define in the next step refers to this linked service too.

1.      Click Author and deploy on the Data Factory blade for your data factory. You should see the Data Factory Editor.
2.      Click New data store and choose Azure storage.



3.      You should see the JSON script for creating an Azure Storage linked service in the editor.


4.      Replace account name and account key with the name and access key of your Azure storage account.
To get account name and account key

Azure Storage Account (Blob Storage) > Access Key


5.      To deploy the linked service, click Deploy on the command bar. After the linked service is deployed successfully, the Draft-1 window should disappear and you see AzureStorageLinkedService in the tree view on the left.

JSON Script ( AzureStorageLinkedService )

{
    "name": "AzureStorageLinkedService",
    "properties": {
        "description": "",
        "type": "AzureStorage",
        "typeProperties": {
            "connectionString": "DefaultEndpointsProtocol=https; AccountName=abc ;AccountKey=****
        }
    }
}
Create HDInsight linked service
In this step, you create Azure HDInsight linked service to link your HDInsight Spark cluster to the data factory. The HDInsight cluster is used to run the Spark program specified in the Spark activity of the pipeline in this sample.

1.      Click ... More on the toolbar, click New compute, and then click HDInsight cluster.

2.      Copy and paste the following snippet to the Draft-1 window. In the JSON editor, do the following steps:
1. Specify the URI for the HDInsight Spark cluster. For example: https://<sparkclustername>.azurehdinsight.net/.
2. Specify the name of the user who has access to the Spark cluster.
3. Specify the password for user.
4. Specify the Azure Storage linked service that is associated with the HDInsight Spark cluster. In this example, it is: AzureStorageLinkedService.

JSON Script ( HDInsightLinkedService)

{
    "name": "HDInsightLinkedService",
    "properties": {
        "description": "",
        "type": "HDInsight",
        "typeProperties": {
            "clusterUri": "https://<sparkclustername>.azurehdinsight.net",
            "userName": "abc",
            "password": "*****"
            "linkedServiceName": "AzureStorageLinkedService"
        }
    }
}
Create output dataset
The output dataset is what drives the schedule (hourly, daily, etc.). Therefore, you must specify an output dataset for the spark activity in the pipeline even though the activity does not really produce any output. Specifying an input dataset for the activity is optional.
1.      In the Data Factory Editor, click ... More on the command bar, click New dataset, and select Azure Blob storage.
2.      Copy and paste the following snippet to the Draft-1 window. The JSON snippet defines a dataset called OutputDataset.
Blob container created by spark cluster -  adfspark
Spark job root folder - SparkWordCount
Job output folder - Output

JSON Script ( WordCountOutputDataset)

{
    "name": "WordCountOutputDataset",
    "properties": {
        "published": false,
        "type": "AzureBlob",
        "linkedServiceName": "AzureStorageLinkedService",
        "typeProperties": {
            "fileName": "WordCountOutput.txt",
            "folderPath": "adfspark/SparkWordCount/Output",
            "format": {
                "type": "TextFormat",
                "columnDelimiter": "\t"
            }
        },
        "availability": {
            "frequency": "Day",
            "interval": 1,
            "offset": "15:25:00"
        }
    }
}
Create pipeline
1.      In the Data Factory Editor, click … More on the command bar, and then click New pipeline.
2.      Replace the script in the Draft-1 window with the following script:
3.      The type property is set to HDInsightSpark.
4.      The rootPath is set to adfspark/SparkWordCount where adfspark is the Azure Blob container and SparkWordCount is folder in that container. In this example, the Azure Blob Storage is the one that is associated with the Spark cluster. You can upload the file to a different Azure Storage. If you do so, create an Azure Storage linked service to link that storage account to the data factory. Then, specify the name of the linked service as a value for the sparkJobLinkedService property.
5.      The entryFilePath is set to the WordCount-0.0.1-SNAPSHOT.jar, which is the spark program jar file.
6.      The getDebugInfo property is set to Always, which means the log files are always generated (success or failure).
The outputs section has one output dataset. You must specify an output dataset even if the spark program does not produce any output. The output dataset drives the schedule for the pipeline (hourly, daily, etc.).

JSON Script ( WordCountPipeline)

{
    "name": "WordCountPipeline",
    "properties": {
        "activities": [
            {
                "type": "HDInsightSpark",
                "typeProperties": {
                    "rootPath": "adfspark/SparkWordCount",
                    "entryFilePath": "WordCount-0.0.1-SNAPSHOT.jar",
                    "arguments": [ "arg1", "arg2" ],
                    "sparkConfig": {
                        "spark.executor.memory": "512m"
                    },
                    "className": "wordcount.WordCount",
                    "getDebugInfo": "Always"
                },
                "outputs": [
                    {
                        "name": "WordCountOutputDataset"
                    }
                ],
                "scheduler": {
                    "frequency": "Day",
                    "interval": 1,
                    "offset": "15:25:00"
                },
                "name": "MySparkActivity",
                "description": "This activity invokes the Spark program",
                "linkedServiceName": "HDInsightLinkedService"
            }
        ],
        "start": "2017-05-23T15:24:00Z",
        "end": "2017-05-27T00:00:00Z",
        "isPaused": false,
        "pipelineMode": "Scheduled"
    }
}
Monitor pipeline
Click X to close Data Factory Editor blades and to navigate back to the Data Factory home page. Click Monitor and Manage to launch the monitoring application in another tab.



Saturday, 28 January 2017

RECOMMENDATION ENGINE - CONTENT-BASED FILTERING & COLLABORATIVE FILTERING

RECOMMENDATION ENGINE - content-based filtering & COLLABORATIVE FILTERING  
Recommendation engines are probably among the best types of machine learning model known to the general public. Even if people do not know exactly what a recommendation engine is, they have most likely experienced one through the use of popular websites such as Amazon, Netflix, YouTube, Twitter, LinkedIn, and Facebook. Recommendations are a core part of all these businesses, and in some cases, they drive significant percentages of their revenue.
The idea behind recommendation engines is to predict what people might like and to uncover relationships between items to aid in the discovery process (in this way, it is similar and, in fact, often complementary to search engines, which also play a role in discovery). However, unlike search engines, recommendation engines try to present people with relevant content that they did not necessarily search for or that they might not even have heard of.
Typically, a recommendation engine tries to model the connections between users and some type of item. If we can do a good job of showing our users movies related to a given movie, we could aid in discovery and navigation on our site, again improving our users' experience, engagement, and the relevance of our content to them.
However, recommendation engines are not limited to movies, books, or products. The techniques we will explore in this article can be applied to just about any user-to-item relationship as well as user-to-user connections, such as those found on social networks, allowing us to make recommendations such as people you may know or who to follow.
In the immortal words of Steve Jobs - “a lot of times, people don’t know what they want until you show it to them.
The customer personalization journeys of Amazon and Netflix demonstrate just how powerful recommendation engines can be. See how these online giants built cutting edge recommendation engines that keep subscribers coming back for more.

Amazon

Netflix

Google Image Search
·        A recommendation engine can engage audiences with the right content
·        A recommendations engine can customize ads or sponsored content for a user based on their preferences
·        A recommendations engine for publishing website
Types of recommendation models
Recommender systems are widely studied, and there are many approaches used, but there are two that are probably most prevalent:
·        Content-based filtering
·        Collaborative filtering
Ø  Item-based collaborative filtering
Ø  User- based collaborative filtering
Content-based filtering
Assume a “real world” case: “John’s favourite cake is Napoleon (left picture below). He went to a shop for it, but such cakes were sold out. John asked a marketer to recommend something similar and was recommended a Napoleon torte (right picture below) that has the same ingredients. John bought it.”

This is an example of pure content-based filtering in the real world. The marketer has recommended the torte considering the ingredients similarity. A content-based filtering system has similar intuition behind it.
Content-based (CB) filtering systems are systems recommending items similar to items a user liked in the past.
Before we proceed, let me define a couple of terms:
  • Item would refer to content whose attributes are used in the recommender models. These could be movies, documents, book etc.
  • Attribute refers to the characteristic of an item. A movie tag, words in a document are examples.
These systems focus on algorithms, which assemble user’s preferences into user’s profiles and all items information into items’ profiles. Then they recommend those items close to the user by similarity of their profiles.
A user profile might be seen as a set of assigned keywords (terms, features) collected by algorithm from items found relevant (or interesting) by the user.
An item profile is a set of assigned keywords (terms, features) of the item itself.
Actual profiles building process is handled by various information retrieval or machine learning techniques. For instance, the most frequent terms in the document describing an item can represent the item’s profile.
Now the example can be reformulated in recommender terms: John liked cake Napoleon, its ingredients formed John’s user profile. The system reviewed other available item profiles and found that the most similar is the “torte Napoleon” item profile. The similarity is high because both cake and torte have the same ingredients. This was the reason for the recommendation.
The principal advantage of the content-based filtering approach is in its nature: it can start to recommend as soon as there is information about items available. The latter means that a recommender system does not require any user input to recommend.
How do Content Based Recommender Systems work?
A content based recommender works with data that the user provides, either explicitly (rating) or implicitly (clicking on a link). Based on that data, a user profile is generated, which is then used to make suggestions to the user. As the user provides more inputs or takes actions on the recommendations, the engine becomes more and more accurate.
Collaborative filtering
This is Collaborative Filtering (CF) approach – recommendations were given by others who have similar tastes in the past, but who already experienced an item yet unknown to the current user.
Collaborative filtering systems require users to express opinions on items. They collect opinions and recommend items based on people’s opinions similarity. Those who agree most are the contributors.
Now the example can be reformulated again: John asked a recommendation about “best fit” drink. Collaborative filtering system reviewed opinions only those from people who have tried and liked Napoleon torte in the past. The recommended “Mint tea” is merely the highly rated item among others by these people.
Collaborative filtering systems usually review more than just one common item to define a set of users, which influence results. For example, John should been tried many various cakes, and his friends also must tried the same cakes in past, to get better recommendation (Movielens requires at least 20 movies to be rated before it produces recommendations [Movielens.org])


Item-based collaborative filtering
Item based collaborative filtering is a model-based algorithm for recommender engines. In item based collaborative filtering similarities between items are calculated from rating-matrix. And based upon these similarities, user’s preference for an item not rated by him is calculated. Here is a step-by-step worked out example for four users and three items. We will consider the following sample data of preference of four users for three items:
ID
user
item
rating
241
u1
m1
2
222
u1
m3
3
276
u2
m1
5
273
u2
m2
2
200
u3
m1
3
229
u3
m2
3
231
u3
m3
1
239
u4
m2
2
286
u4
m3
2

Step 1: Write the user-item ratings data in a matrix form. The above table gets rewritten as follows:
Here rating of user u1 for item m3 is 3. There is no rating for item m2 by user u1. And no rating also for item m3 by user u2.

Step 2: We will now create an item-to-item similarity matrix. The idea is to calculate how similar an item is to another item. There are a number of ways of calculating this. We will use cosine similarity measure.  To calculate similarity between items m1 and m2, for example, look at all those users who have rated both these items. In our case, both m1 and m2 have been rated by users u2 and u3. We create two item-vectors, v1 for item m1 and v2 for item m2, in the user-space of (u2, u3) and then find the cosine of angle between these vectors. A zero angle or overlapping vectors with cosine value of 1 means total similarity (or per user, across all items, there is same rating) and an angle of 90 degree would mean cosine of 0 or no similarity. Thus, the two item-vectors would be,

            v1 = 5 u2 + 3 u3
            v2 = 3 u2 + 3 u3

The cosine similarity between the two vectors, v1 and v2, would then be:

             cos (v1,v2) = (5*3 + 3*3)/sqrt[(25 + 9)*(9+9)] = 0.76

Similarly, to calculate similarity between m1 and m3, we consider only users u1 and u3 who have rated both these items. The two item vectors, v1 for item m1 and v3 for item m3, in the user-space would be as follows:

             v1 = 2 u1 + 3 u3
             v3 = 3 u1 + 1 u3

The cosine similarity measure between v1 and v3 is:
             cos (v1,v3) = (2*3 + 3*1)/sqrt[(4 + 9)*(9+1)] = 0.78

We can similarly calculate similarity between items m2 and m3 using ratings given to both by users u3 and u4. The two item-vectors v3 and v4 would be:

             v2 = 3 u3 + 2 u4
             v3 = 1 u3 + 2 u4

And cosine similarity between them is:

             cos (v2,v3) = (3*1 + 2*2)/sqrt[(9 + 4)*(1 + 4)] = 0.86

We now have the complete item-to-item similarity matrix as follows:

Step 3: For each user, we next predict his ratings for items that he had not rated. We will calculate rating for user u1 in the case of item m2 (target item). To calculate this we weigh the just-calculated similarity-measure between the target item and other items that user has already rated. The weighing factor is the ratings given by the user to items already rated by him. We further scale this weighted sum with the sum of similarity-measures so that the calculated rating remains within a predefined limits. Thus, the predicted rating for item m2 for user u1 would be calculated using similarity measures between (m2, m1) and (m2, m3) weighted by the respective ratings for m1 and m3

                Rating = (2 * 0.76 + 3 * 0.86)/ (0.76+0.86) = 2.53

Recommender engine using item based collaborative filtering can be constructed using R package recommenderlab.
############### Collaborative filtering in R (Recommendation Engine) ################

R Script
Train Data Set for Model
Test Data Set for Model

# Set data path as per your data file (for example: "c://abc//" )
setwd("F:/Data Science/Data Science/Ashish/Recommendation Engine Dataset")

# If not installed, first install following three packages in R
#install.packages("recommenderlab")
library(recommenderlab)
library(reshape2)
library(ggplot2)
# Read training file along with header
tr<-read.csv("train_v2.csv",header=TRUE)
# Just look at first few lines of this file
head(tr)
# Remove 'id' column. We do not need it
tr<-tr[,-c(1)]
# Check, if removed
tr[tr$user==1,]
# Using acast to convert above data as follows:
# m1  m2   m3   m4
# u1    3   4    2    5
# u2    1   6    5
# u3    4   4    2    5
g<-acast(tr, user ~ movie)
# Check the class of g
class(g)

# Convert it as a matrix
R<-as.matrix(g)

# Convert R into realRatingMatrix data structure
#   realRatingMatrix is a recommenderlab sparse-matrix like data-structure
r <- as(R, "realRatingMatrix")
r

# view r in other possible ways
as(r, "list")     # A list
as(r, "matrix")   # A sparse matrix

# I can turn it into data-frame
head(as(r, "data.frame"))

# normalize the rating matrix
r_m <- normalize(r)
r_m
as(r_m, "list")

# Draw an image plot of raw-ratings & normalized ratings
#  A column represents one specific movie and ratings by users
#   are shaded.
#   Note that some items are always rated 'black' by most users
#    while some items are not rated by many users
#     On the other hand a few users always give high ratings
#      as in some cases a series of black dots cut across items
image(r, main = "Raw Ratings")      
image(r_m, main = "Normalized Ratings")

# Can also turn the matrix into a 0-1 binary matrix
r_b <- binarize(r, minRating=1)
as(r_b, "matrix")

# Create a recommender object (model)
#   Run anyone of the following four code lines.
#     Do not run all four
#       They pertain to four different algorithms.
#        UBCF: User-based collaborative filtering
#        IBCF: Item-based collaborative filtering
#      Parameter 'method' decides similarity measure
#        Cosine or Jaccard
rec=Recommender(r[1:nrow(r)],method="UBCF", param=list(normalize = "Z-score",method="Cosine",nn=5, minRating=1))
rec=Recommender(r[1:nrow(r)],method="UBCF", param=list(normalize = "Z-score",method="Jaccard",nn=5, minRating=1))
rec=Recommender(r[1:nrow(r)],method="IBCF", param=list(normalize = "Z-score",method="Jaccard",minRating=1))
rec=Recommender(r[1:nrow(r)],method="POPULAR")

# Depending upon your selection, examine what you got
print(rec)
names(getModel(rec))
getModel(rec)$nn

############Create predictions#############################
# This prediction does not predict movie ratings for test.
#   But it fills up the user 'X' item matrix so that
#    for any userid and movieid, I can find predicted rating
#     dim(r) shows there are 6040 users (rows)
#      'type' parameter decides whether you want ratings or top-n items
#         get top-10 recommendations for a user, as:
#             predict(rec, r[1:nrow(r)], type="topNList", n=10)
recom <- predict(rec, r[1:nrow(r)], type="ratings")
recom

########## Examination of model & experimentation #############
########## This section can be skipped #########################

# Convert prediction into list, user-wise
as(recom, "list")
# Study and Compare the following:
as(r, "matrix")[1:10,1:10]      # Has lots of NAs. 'r' is the original matrix
as(recom, "matrix") # Is full of ratings. NAs disappear
as(recom, "matrix")[1:10,1:10] # Show ratings for all users for items 1 to 10
as(recom, "matrix")[5,3]   # Rating for user 5 for item at index 3
as.integer(as(recom, "matrix")[5,3]) # Just get the integer value
as.integer(round(as(recom, "matrix")[6039,8])) # Just get the correct integer value
as.integer(round(as(recom, "matrix")[368,3717]))

# Convert all your recommendations to list structure
rec_list<-as(recom,"list")
head(summary(rec_list))
# Access this list. User 2, item at index 2
rec_list[[2]][2]
rec_list[[1837]][4]
# Convert to data frame all recommendations for user 1
u1<-as.data.frame(rec_list[[1]])
attributes(u1)
class(u1)
head(u1)
# Create a column by name of id in data frame u1 and populate it with row names
u1$id<-row.names(u1)
# Check movie ratings are in column 1 of u1
u1
# Now access movie ratings in column 1 for u1
u1[u1$id==3952,]

########## Create submission File from model #######################
# Read test file
test<-read.csv("test_v2.csv",header=TRUE)
head(test)
# Get ratings list
rec_list<-as(recom,"list")
head(summary(rec_list))
ratings<-NULL
# For all lines in test file, one by one
for ( u in 1:length(test[,2]))
{
  # Read userid and movieid from columns 2 and 3 of test data
  userid <- test[u,2]
  movieid<-test[u,3]
 
  # Get as list & then convert to data frame all recommendations for user: userid
  u1<-as.data.frame(rec_list[[userid]])
  # Create a (second column) column-id in the data-frame u1 and populate it with row-names
  # Remember (or check) that rownames of u1 contain are by movie-ids
  # We use row.names() function
  u1$id<-row.names(u1)
  # Now access movie ratings in column 1 of u1
  x= u1[u1$id==movieid,1]
  # print(u)
  # print(length(x))
  # If no ratings were found, assign 0. You could also
  #   assign user-average
  if (length(x)==0)
  {
    ratings[u] <- 0
  }
  else
  {
    ratings[u] <-x
  }
 
}
length(ratings)
tx<-cbind(test[,1],round(ratings))
# Write to a csv file: submitfile.csv in your folder
write.table(tx,file="submitfile.csv",row.names=FALSE,col.names=FALSE,sep=',')