Using KQL to Ingest External Data In Azure Sentinel

by Adrian Grigorof, Sorin Ciulpan

February 20, 2021

One of the most sought-after features of a modern SIEM is the ability to read data from various Internet-based sources and use it to enrich the analysis of the raw logs. Such resources could come in various “packages” – they could be freely downloadable files or they may require authentication tokens/paid subscriptions, some are CSV, other JSON, or some may be just a poorly formatted mess. What they have in common is that the information can be used to improve the quality of an alert rule.

In this article, we will look at some typical situations and how we can use the KQL magic to get this data ready to enrich alerts. Kusto is a very powerful query language that provides us with many possibilities to approach a task so what we present are examples that we used in our Sentinel deployments.

The KQL command that we will look at is externaldata(). This is considered a “tabular operator” meaning that it processes tables rather than scalars. The syntax is:

externaldata ( ColumnName : ColumnType [, …] )
[ StorageConnectionString [, …] ]
[with ( PropertyName = PropertyValue [, …] )]

In plain language, it means that we can use this command to get an external file, specify how do we want the data assigned to columns in a Kusto table and what type of file format we exect that the file has (so the command can parse it accordingly, to make our full parsing easier).

The most basic example is to get a publicly available CSV and convert it to a Kusto table. There are many free CSV files available but let’s use a very simple one, a list of COVID-related, potentially malicious IP addresses that we published back in March 2020. The file is a simple list of IPs:

103.142.24.39
103.57.211.14
104.154.60.82
104.18.49.185

For such simple data, the command to convert it to a Kusto table is:

externaldata (IPAddress: string) [h’http://managedsentinel.com/downloads/covid19_ipaddresses.txt’]

Running it in your Sentinel you get:

externaldata example 1

The (IPAddress: string) part simply instructs the operator to consider each line of the file a string and call it “IPAddress”.

Just running that command will simply list the IPs, not very helpful – we want to be able to use this list in subsequent queries against our data, such as firewall logs. So, to assign this to a reusable table, we need something like this:

let CovidIPIOCs = externaldata (IPAddress: string) [h’http://managedsentinel.com/downloads/covid19_ipaddresses.txt’];

Now we can use the CovidIPIOCs (Covid IP addresses Indicators of Compromise) table in other queries. The most common way to use such list is to use a KQL join command to identify log entries that have IPs that match some of these IOCs. CommonSecurityLog is one Sentinel table that records source and destination IPs. So, let’s say we want to check if any of our internal hosts were recorded by our Palo Alto firewall communicating with remote IPs on that list in the last hour. The command would be:

let CovidIPIOCs = externaldata (IPAddress: string) [h’http://managedsentinel.com/downloads/covid19_ipaddresses.txt’];
CommonSecurityLog
| where TimeGenerated > ago(1h)
| join kind=inner (CovidIPIOCs) on $left.DestinationIP == $right.IPAddress
| project TimeGenerated, SourceIP, DestinationIP, DestinationPort, ApplicationProtocol

So here we define the CovidIPIOCs table then perform an inner join with the CommonSecurityLog based on the CommonSecurityLog DestinationIP field matching the IPAddress field in CovidIPIOCs (that’s the “on $left.DestinationIP == $right.IPAddress” part).

externaldata 2

And it turns out that we have 3 internal hosts that communicated with 185.165.123.36 using SSL. This may require investigation on those internal hosts and maybe a doublecheck on that remote IP.

This was probably as simple as it can get, with the external data conveniently clean and easy to read. How about other formats such as XML or JSON? Well, such formats require a bit more work and a good understanding of the schema used by those files. For example, a sample JSON file like https://filesamples.com/samples/code/json/sample1.json:

{
“fruit”: “Apple”,
“size”: “Large”,
“color”: “Red”
}

can be retrieved and parsed using the following command:

externaldata(fruit: string, size: string, color: string)
[
[email protected]’https://filesamples.com/samples/code/json/sample1.json’
]
with(format=’multijson’)

with the following result:

Kusto externaldata 5

We can see that the externaldata() operator now requires a “with” that describes the format of the file. The argument of the externalcommand – (fruit: string, size: string, color: string) – instructs Kusto to “read” the json file and assume that it has 3 types of data fruit, size and color, all of type string. The “with” arguments specifies that the file is a json with potentially multiple lines.

We can also extract only some fields and add additional ones by providing a mapping of the data. For example, we may want to ignore size and add an additional column called “Orchad”:

Kusto externaldata 6

In a similar fashion one can extract data from CSV, XML, Parquet, Avro and others. The source of the data itself is not limited to text-based http/https downloadable files but it can also point to Azure blobs, zipped files, etc.

What we found is that the real world is not always Kusto-friendly and some of the data that we want to use was not intended for Kusto consumption. In the following section we will use an example that is based on a request from one our customers.

In this particular Azure Sentinel instance, we were getting a large number of alerts on potential password spraying attacks in Azure AD SignInLogs. Further investigation on the source IPs for these potential attacks revealed that many of them were coming from known Zscaler IP addresses and were not actual attacks. The alert tuning request was to identify Zscaler IPs and exclude them from the alerts. One challenge was that these IPs were not guaranteed to be the same all the time and not only they could be from a wide list of Zscaler IPs but they could also change without notice. Zscaler however provides a list that can be retrieved by applications interested in knowing the latest list of IPs used. The list is available at https://api.config.zscaler.com/zscalerthree.net/cenr/json. Simply running the externaldata() command like we did with the nicely formatted source data will not work:

externaldata (RawData: string) [h’https://api.config.zscaler.com/zscalerthree.net/cenr/json’]

This will give us an error such as “Some aspects of the query had errors so the results are not complete. If issue persists, please open a support ticket. Request id: 1121d28c-9c06-4ec8-9d14-7e6c2b87b498“.

The content of this file is a rather complex JSON file:

Kusto externaldata 7

We can see that the file is not a simple list of json objects, but each city is a distinct field name, i.e. is not a name “city” with value “Abu Dhabi I” but rather a name “city : Abu Dhabi I” with the value being a list of objects, each with a “range”, “vpn”, etc.. values. That makes the explicit mapping of such data very impractical. The “root” field name is “zscalarthree.net” so we can attempt to retrieve it using the JSON format:

externaldata ([‘zscalerthree.net’]: string) [h’https://api.config.zscaler.com/zscalerthree.net/cenr/json’]
with(format=’json’)

The result is not exactly what we want, but rather one field with the whole file content:

Kusto externaldata 8

So, how can we get an actual list of subnets used by Zscaler proxies that we can use to discard the IPs that connect through them?

Since we are interested just in the “range” field (the subnets used by Zscaler clients), we could go for a quick and dirty, brute-force extraction of these values, by using Kusto’s extract_all() scalar function:

extract_all(regex, [captureGroups,] text)

If we can specify a regular expression to match the data that we need, we can run it against that single field and get a list of subnets. In this case, the format is consistent so we can extract those subnets using the following query:

externaldata ([‘zscalerthree.net’]: string) [h’https://api.config.zscaler.com/zscalerthree.net/cenr/json’]
with(format=’json’)
| extend Subnets = extract_all(‘range”:”(.+?)”‘, [‘zscalerthree.net’])
| mv-expand Subnets
| project Subnets

The result is a list of subnets that we can use to verify if the IP address captured in Azure SigninInLogs belongs to Zscaler (so we can exclude them from the alert detection):

Kusto externaldata 9

In a similar fashion one can extract other data from files that don’t appear easy to parse at first sight.

As I mentioned, this is a quick and dirty method but what if we wanted to flatten such nested JSON file and obtain a tabular, easy to use data structure with all the fields? I actually asked one of our senior engineers (Sorin Ciulpan) to take a look and within 20 minutes he provided this more elegant solution:

externaldata ([‘zscalerthree.net’]: string) [h’https://api.config.zscaler.com/zscalerthree.net/cenr/json’]
with(format=’json’)
| mv-expand parse_json([‘zscalerthree.net’])
| parse [‘zscalerthree.net’] with * ‘: ‘ Continent ‘”‘ *
| extend Cities=iif([‘zscalerthree.net’] contains Continent,[‘zscalerthree.net’].[strcat(“continent : “,Continent)],”)
| project-away [‘zscalerthree.net’]
| mv-expand parse_json(Cities)
| parse Cities with * ‘: ‘ City ‘”‘ *
| extend CityData=iif(Cities contains City,Cities.[strcat(“city : “,City)],”)
| extend CityData=parse_json(CityData)
| project-away Cities
| mv-expand CityData
| parse CityData with * ‘range”:”‘ Range ‘”‘ * ‘vpn”:”‘ VPN ‘”‘ * ‘gre”:”‘ GRE ‘”‘ * ‘hostname”:”‘ HostName ‘”‘ * ‘latitude”:”‘ Latitude ‘”‘ * ‘longitude”:”‘ Longitude ‘”‘ *
| project-away CityData

This produces a flat, ready to use table that we can apply in subsequent queries:

Kusto externaldata

The “trick” here is the building of a dynamic index such as “continent : EMEA” in order to extract the values associated with it and eliminate one level of nesting. This is done first for the “continent” and then for the “city” (the two “iff” functions on lines 5 and 9 followed by “mv-expand” on lines 7 and 12). The final “raw” data is parsed using the “parse” operator.

So having now a list of subnets used by Zscaler (the “range” column), how can we use them to ignore the IP addresses that match them? The short answer is by using the Kusto ipv4_is_match() function. A full example will be provided in a future article.

As a conclusion, while extracting data from complex files, available on various sites may seem a very difficult exercise, with a bit of patience, trial and error and a fair amount of web searches, one can rest assured that Kusto provides the tools available to make that data usable.