/images/resources/tutorials/configuration/tutorials-elasticsearch.png

Log Aggregation to Elasticsearch

On this page

This tutorial explains how to aggregate various types of logs from the Curity Identity Server and save them as structured JSON documents in the open source Elasticsearch system. The Kibana tool then enables real-time visualization and analysis of logs. The tutorial ends with a deployment example for a Kubernetes cluster that demonstrates end-to-end log aggregation.

There are various ways to deploy Elastic Stack components. This tutorial explains log aggregation logic using the following components.

  • Elasticsearch is the data storage engine, which ingests logs at its API endpoints.
  • Kibana is a frontend where authorized users can query and analyze log data.
  • Filebeat is a log shipping component that uploads logging events to the Elasticsearch API.

The following sections explain how to aggregate system, request and audit logs from the Curity Identity Server to Elasticsearch indices. The log aggregation flow is customizable in various ways.

Prepare Curity Identity Server Logs

Before deploying the Curity Identity Server, ensure that suitable log files are available for aggregation:

  • Configure logs for the Curity Identity Server to use a JSON Log Format.
  • Configure the Helm chart to Tail Log Files so that all logs are available for shipping.
  • Configure the Curity Identity Server to Use OpenTelemetry if you want to include trace IDs and span IDs in logs.

The following example logging events shows some source logs formats for system, request and audit logs.

The system log's message field can include a text message or key value pairs. The following logging event resulted from intentional misconfiguration of a data source connection. Exception events can lead to multiple logging events. Some may have a stack trace and others may contain key value pairs with additional details.

json
12345678910111213141516171819
{
"timeMillis": 1741275691,
"thread": "req-189",
"level": "WARN",
"loggerName": "se.curity.identityserver.errors.SystemRuntimeException",
"message": "se.curity.identityserver.errors.SystemRuntimeException@8b67fc[_httpStatus=500,_errorMessageId=system.status.internal.error,_errorCode=generic_error,_errorDescription=FATAL: database \"idsvr2\" does not exist]",
"endOfBatch": true,
"loggerFqcn": "org.apache.logging.slf4j.Log4jLogger",
"contextMap": {
"LongSessionId": "84aeadea-01cb-4431-bd55-282196a98202",
"RequestId": "kCPDgMRP",
"SessionId": "84aeadea",
"SpanId": "ca8578c5279899bb",
"TraceId": "2dddaab2003090620ad72cc856b93b9b"
},
"threadId": 42,
"threadPriority": 5,
"hostname": "curity-idsvr-runtime-6ccbf9799f-rgrrk"
}

Create an Index Template

Elasticsearch log storage uses indices that store documents. Each document contains fields with data types. Filter conditions and sort expressions can use some data types, like keywords, numbers and dates. To help ensure that data saves with the correct types, define an index template with your preferred JSON document structure.

The following example provides an index template creation command to run in Kibana Dev Tools. When Elasticsearch first receives a logging event, it creates an index that uses data types from the template. If a field is not found in the template, Elasticsearch still saves it but in some cases may use a suboptimal data type.

json
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
POST /_index_template/curity
{
"index_patterns": ["curity-*"],
"template":
{
"mappings":
{
"properties":
{
"eventTime": {
"type": "date"
},
"hostname": {
"type": "keyword"
},
"level": {
"type": "keyword"
},
"loggerName": {
"type": "keyword"
},
"loggerFqcn": {
"type": "keyword"
},
"message": {
"type": "keyword"
},
"thread": {
"type": "keyword"
},
"threadId": {
"type": "integer"
},
"threadPriority": {
"type": "integer"
},
"thrown": {
"type": "object",
"enabled": false
},
"contextMap":
{
"properties":
{
"RequestId":
{
"type": "keyword"
},
"SessionId":
{
"type": "keyword"
},
"LongSessionId":
{
"type": "keyword"
},
"TraceId":
{
"type": "keyword"
},
"SpanId":
{
"type": "keyword"
}
}
},
"system": {
"properties":
{
}
},
"request": {
"properties":
{
}
}
"audit": {
"properties":
{
}
}
}
}
}
}

Each logger can include its own particular fields that add to the base fields. Therefore, you could replace the system, request and audit placeholders in the index template with the following definitions for extra fields.

json
1234567891011121314151617181920212223242526
"system":
{
"properties":
{
"type":
{
"type": "keyword"
},
"httpStatus":
{
"type": "integer"
},
"errorMessageId":
{
"type": "keyword"
},
"errorCode":
{
"type": "keyword"
},
"errorDescription":
{
"type": "keyword"
}
}
}

Create an Ingest Pipeline

When you first receive log events, let Elasticsearch save them and apply its own defaults. Typically though, logging components may add extra fields that are not useful, you may receive some empty events, or some fields may have incorrect data types.

To resolve these types of issues, do some extra work to create an Ingest Pipeline that runs one or more Processors. Run the following command in Kibana Dev Tools to create an example ingest pipeline:

json
1234567891011121314151617181920
PUT /_ingest/pipeline/curity
{
"description": "Curity Logs Ingest Pipeline",
"processors": [
{
"drop": {
"if" : "ctx.level == null"
},
"script": {
"lang": "painless",
"description": "Transform the eventTime to a date",
"source": "ctx['eventTime'] = Instant.ofEpochMilli(ctx['timeMillis'])"
},
"remove": {
"field": ["log", "marker", "stream", "fields"],
"ignore_missing": true
}
}
]
}

This example ingest pipeline performs the following tasks:

  • The drop processor removed malformed logging events that do not contain the expected field level.
  • The script processor translates log4j2's timeMillis epoch time to a date type for use in time-based queries.
  • The remove processor clears log fields that should not save, such as those that Filebeat adds.

For more powerful transformations, write scripts in the Painless Scripting Language to customize the content or structure of the Elasticsearch data.

The following example script processes the key value pairs specific to a system logging event to save error fields in a structured format:

javascript
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
def fields = ctx['fields'];
if (fields != null) {
String logtype = fields['logtype'];
if (logtype == 'system') {
String message = ctx['message'];
Map system = [:];
int start = message.indexOf('[');
int end = message.indexOf(']');
if (start >= 0 && end >= 0 && end > start) {
String type = message.substring(0, start);
if (type.length() >= 1) {
system['type'] = type;
}
String messageFieldText = message.substring(start + 1, end);
String[] fieldPairs = messageFieldText.splitOnToken(',');
if (fieldPairs.length > 0) {
for (int i = 0; i < fieldPairs.length; i++) {
String[] parts = fieldPairs[i].splitOnToken('=');
if (parts.length == 2) {
String key = parts[0];
String value = parts[1];
if (key.substring(0, 1) == '_') {
key = key.substring(1, key.length());
}
if (key == 'httpStatus') {
system[key] = Integer.parseInt(value);
} else {
system[key] = value;
}
}
}
ctx['system'] = system;
ctx.remove('message');
}
}
}
}

Prepare the Log Shipper

Create the index template and ingestion pipeline in Elasticsearch before log shipping begins. Then configure the Filebeat component to read logs from Kubernetes nodes. For each type of log file, use file patterns to set a logtype that Filebeat sends to Elasticsearch in each logging event.

yaml
12345678910111213141516171819202122232425262728293031323334353637383940414243
filebeat.inputs:
- type: container
paths:
- /var/log/containers/curity-idsvr-runtime*runtime*.log
- /var/log/containers/tokenhandler-runtime*-runtime*.log
json:
keys_under_root: true
add_error_key: false
fields:
logtype: 'system'
- type: container
paths:
- /var/log/containers/curity-idsvr-runtime*request*.log
- /var/log/containers/tokenhandler-runtime*request*.log
json:
keys_under_root: true
add_error_key: false
fields:
logtype: 'request'
- type: container
paths:
- /var/log/containers/curity-idsvr-runtime*audit*.log
json:
keys_under_root: true
add_error_key: false
fields:
logtype: 'audit'
output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}
index: "curity-%{[fields.logtype]}-%{+yyyy.MM.dd}"
pipelines:
- pipeline: curity
setup:
ilm:
enabled: false
template:
name: "curity"
pattern: "curity-*"
enabled: false

The output.elasticsearch section references the ingest pipeline and its index uses the logtype to save each type of log data to the correct index. In this example, the name of each index has the following format, where each index contains a single day's log data so that it is straightforward to remove old logs after a time period.

text
123
curity-system-20250305
curity-request-20250305
curity-audit-20250305

The following example structured logs show one possible format for clean and type-safe log data after ingestion.

json
123456789101112131415161718192021222324
{
"eventTime": "2025-03-06T15:41:31.000Z",
"loggerFqcn": "org.apache.logging.slf4j.Log4jLogger",
"level": "WARN",
"endOfBatch": true,
"thread": "req-189",
"threadPriority": 5,
"threadId": 42,
"hostname": "curity-idsvr-runtime-6ccbf9799f-rgrrk",
"system": {
"errorCode": "generic_error",
"errorMessageId": "system.status.internal.error",
"errorDescription": "FATAL: database idsvr2 does not exist",
"type": "se.curity.identityserver.errors.SystemRuntimeException@8b67fc",
"httpStatus": 500
},
"contextMap": {
"LongSessionId": "84aeadea-01cb-4431-bd55-282196a98202",
"RequestId": "kCPDgMRP",
"SessionId": "84aeadea",
"SpanId": "ca8578c5279899bb",
"TraceId": "2dddaab2003090620ad72cc856b93b9b"
}
}

Use Kibana to Analyze Logs

Use Kibana Role Management to assign groups of users permissions to indices. For example, you might grant engineers access to system and request logs and compliance teams access to audit logs. Those users can then visualize logs in in Kibana. For example, create Data Views with the following patterns:

  • curity-system-*
  • curity-request-*
  • curity-audit-*

Then use the Discover feature to quickly sort and filter logs or display them in dashboards.

Kibana Live Analysis

Kibana Dev Tools also provides powerful ways to query and partition log data. The following Lucene Query returns data from the system, request and audit logs that match a particular OpenTelemetry trace ID:

json
12345678910
GET curity-*/_search
{
"query":
{
"match":
{
"contextMap.TraceId": "2dddaab2003090620ad72cc856b93b9b"
}
}
}

The following example returns up to 100 JSON documents from system logs that contain a particular error code:

json
1234567891011121314
GET curity-system*/_search
{
"query":
{
"match":
{
"system.errorCode": "external_service_error"
}
},
"sort": [{
"eventTime": "desc"
}],
"size": 100
}

Alternatively, use SQL Queries to perform operations like selecting particular fields or grouping results. The following example SQL query returns the 10 slowest OAuth requests for a particular time period:

text
123456789101112
POST _sql?format=txt
{"query": """
SELECT TOP 10
request.uri,
request.duration
FROM
"curity-request*"
WHERE
eventTime between '2025-03-05' and '2025-03-07'
ORDER BY
request.duration DESC
"""}

Example Deployment

The GitHub link at the top of this page provides a worked example deployment of Elastic Stack components to a Kubernetes cluster. Study the deployment and adapt it to your own requirements. The example runs on a local computer, where you could run an end-to-end flow with the following resources:

  • Run the Curity Identity Server base Kubernetes deployment.
  • Run the Curity Token Handler base Kubernetes deployment to deploy a working web application.
  • Run the Elastic Stack deployment and use the web application to generate logs.
  • Use Kibana to visualize and query log data.

Conclusion

The Curity Identity Server provides detailed logs as part of its observability and reliability design. To enable real-time troubleshooting and analysis, follow Logging Best Practices to ship logs to a log aggregation system. For best results, do some extra work to transform logs and ensure that all log fields have the correct data types. Teams can then use logs to their full potential when they need to troubleshoot or analyze OAuth related behaviors.

Newsletter

Join our Newsletter

Get the latest on identity management, API Security and authentication straight to your inbox.

Newsletter

Start Free Trial

Try the Curity Identity Server for Free. Get up and running in 10 minutes.

Start Free Trial