How to use S3 for backups

I was recently bored, so like any normal person I decided to write a custom backup solution for S3. I do not claim that this is the best solution, in fact I know it is not, but I think it would be interesting to talk about the thoughts that went into it.

If you just want the script, you can find it here.

Requirements

Here are the things I wanted to achieve with this solution:

  • files are locally encrypted with my own keyfile
  • use cheapest possible storage
  • only upload files when they have actually changed
  • compress files as much as possible
  • allow restore of individual files to a degree.

Implementation – Terraform

Before we can start backing up things to S3, we first have to actually have an S3 bucket. The terraform configuration used can be seen on the gist linked above. I just want to get into some specific configurations used in more detail.

resource "aws_s3_bucket" "bucket" {
  bucket = "xyz"
  acl    = "private"
  region = "xyz"
  server_side_encryption_configuration {
    rule {
      apply_server_side_encryption_by_default {
        sse_algorithm = "AES256"
      }
    }
  }
  versioning {
    enabled = true
  }
  lifecycle_rule {
    enabled = true

    noncurrent_version_expiration {
      days = 180
    }
  }
  tags = {
    project = "backup"
  }
}

For the most part it should be self-explanatory. I believe there is no good reason to not enable encryption at rest on AWS, ever.

The first interesting part is versioning. I decided very early on that I would leave versioning up to AWS, instead of implementing it myself. The reason for this is that this simplifies checking for changes a lot. Instead of having to find the latest version I uploaded, I can just check exactly the same filename. The downsinde is that this is more expensive if you have data that changes daily. If you have 180 days of a 1 G file, you will pay for 180 G, which is a whopping 0.72 USD a month on glacier. I also do not have to handle deletion of files myself, that is done by the lifecycle rule.

resource "aws_s3_bucket_public_access_block" "public_access_block" {
  bucket = aws_s3_bucket.bucket.id

  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

If you do not want your data to be public, you should always enable the public_access_block. Basically the problem it solves is this: if you have a private bucket policy, it can still be overridden on a per-object level by setting an individual object to public-read. A lot of people accidentally set this and, as a result, expose data they do not want exposed. If you set public access block and try to put or update an object in a way that would make it accessible to the public, that request will get a 403 instead.

Implementation – Python

I will not go through the code line by line as that would be superfluous, but instead focus on a couple key parts that I found interesting or had to be done differently than expected.

Detecting file changes

The first important question is how file changes are actually detected. The answer, of course, is to use an MD5 hash. The first thing to realize here though is that if you use AES in any secure mode it will use an initialization vector (IV). This is basically a salt and means that if you encrypt the same file multiple times, the resulting encrypted file will have a different hash every time. That in turn means you have to get and store the MD5 hash before the file is encrypted.

Another note: I you use the command line tar an gzip, you have to do this:

tar cf - /my/dir | gzip --no-name > /myfile.tgz

If you use tar with the z option the resulting file will have a different hash every time, since per default gzip puts a last changed date into the files.

Storing MD5 hashes on AWS

There are three main ways you can store hashes when using S3

  • rely on the ETAG
  • S3 metadata
  • DynamoDB

On files that are small enough that multipart-upload is not needed, the ETAG of an S3 object is already the MD5 hash. For multi-part uploads that is not the case though, and the ETAG instead will be the hash of the part. You then need to reconstruct the correct hash.

Using custom metadata makes this a little easier and cheaper. By just doing

s3.meta.client.upload_file(
        Bucket    = bucket_name,
        Config    = transfer_config,
        Filename  = path,
        Key       = s3_key,
        ExtraArgs = {
            'ACL': 'private',
            'Metadata': { 'md5': md5_local_filesystem },
            'StorageClass': 'GLACIER'
        }
    )

you can store the md5 hash in custom metadata. This metadata can easily be received by using a head-request, for example:

aws s3api head-object --bucket $bucketname --key $keyname --query 'Metadata.md5'

The downside of using S3 for doing this at all, is that S3 requests are relatively expensive, especially on GLACIER. As such it is much cheaper to instead store the hashes in a DynamoDB table. This is very easy since we can use the – guaranteed unique – s3 key as the DynamoDB partition key. Both storing and retrieving this information is then very simple:

client.get_item(
        TableName       = dynamodb_table_name,
        Key             = {
            's3_key': {
                'S': s3_key
            }
        },
        AttributesToGet = [ 'md5' ]
    )

client.put_item(
        TableName = dynamodb_table_name,
        Item      = {
            's3_key': {
                'S': s3_key
            },
            'md5': {
                'S': md5_local_filesystem
            }
        }
    )

Encryption

I played around with implementing this myself, however crypto is hard to get right and easy to get wrong. Furthermore, I wanted to make sure I could decrypt the data everywhere – given that I still had the key. So I wanted to use standard command line tools. I settled on openssl as it makes it very simple to use a key file:

openssl enc -aes-256-cbc -pbkdf2 -pass file:/home/myuser/keyfile -in /my/file.tar.bz2 -out /my/file.tar.bz2.aes

And it is very simple to create a strong keyfile either using python:

with open(keyfile, 'wb') as f:
    f.write(os.urandom(4096))

or bash:

dd if=/dev/urandom of=/home/myuser/keyfile bs=1K count=4

Differential backups

I did not implement differential backups – 5 hours were not enough time. Also, I have not idea how to do that with encryption. So I settled an a slightly buggy solution. For every folder that I want to backup, I can define a ‘depth’. Given this folder structure:

/a/b/c/w
/a/b/d/x
/a/b/e/y
/a/b/e/z
/a/f

The following archives will be created, based on the depth setting:

depth=0: a
depth=1: b, f
depth=2: c, d, e
depth=3: w, x, y, z

The astute reader might notice that everything bigger than depth=1 will not back up f. This is due to this very concise, some might call it lazy, solution:

glob_pattern = path + '/*' * depth
return glob.glob(glob_pattern)

A proper solution here would be to write a recursive function that will only go up to depth levels, but only if there are subfolders to go into.

In depth guide to running Graylog in production

In my last post, In depth guide to running Elasticsearch in production, I talked about common pitfalls and things you ought to know when it comes to run Elasticsearch. My main experience with ES is as a backend to Graylog. So in this post, I will try to write down everything I know about that. Like before, I do not make any guarantees for completeness. I will also not talk about how to install Graylog. This is covered well in the official documentation an other places.

The basics: Inputs, Extractors, Streams, Pipelines, Index Sets, Buffers and the Journal

Before I begin writing down my experiences with Graylog, I want to go through the basic architecture and nomenclature.

Inputs are listening services that you can send logs to. Graylog supports many different input formats such as Syslog, Beats, GELF, etc. From a user perspective, every input opens a port on your Graylog server that you can then send logs of this particular format to.

Extractors are logically part of an input. Architecturally they are part of the ProcessBufferProcessor. Every extractor of an input runs on every message received on that input. There are ways to limit this, which I will get to later The goal of extractors is to get around a common issue of log formats such as Syslog. Syslog messages have some pre-defined fields such as facility, severity, date, etc. but that does not help you if you want to show all HTTP requests which took longer than 1000 ms, because ‘time to render’ is not a field defined in the syslog RFC. Graylogs solution to this problem is to allow you to extract information like this into custom fields, which are then stored as fields in Elasticsearch and allow you to do searches like

source:web-prod* AND time_to_render:>1000

Streams allow you to route messages into a category in real time based on rules you can set. Streams can be used for many applications, the most common of which is to store specific messages in specific index sets. Other cool things you can do with streams include alerts, outputs and processing pipelines.

Pipelines attach to one or multiple streams and run custom code on every message in that stream. Common tasks are to normalize the names of fields or filter out specific messages.

Index Sets are sets of Elasticsearch Indices that Graylog uses to store its data in. They are sets because they are composed of one to many individual ES indices. The indices all share a common prefix and then have a number that simply counts up. If you store a lot of data this mechanism is necessary, because it limits the amount of data that is stored in an index. The more data you store in an ES index, the slower the searching will be. Furthermore, you can also use index sets to limit the total amount of data stored.

Buffers are an important part of Graylog. As it so happens, the amount of logs that arrives at a given time can fluctuate wildly, while the processing capacity of Graylog stays constant. So there must be some way to buffer messages at various parts of the Graylog architecture. There are three main buffers: Input, Process and Output buffer. You can check the state of buffers under system -> nodes, under a specific node. These buffers are implemented as ring buffers using LMAX Disruptor, an open-source library written by a high-speed trading company.

Journals are implemented using Apache Kafka. Like buffers, they temporarily store messages. Unlike buffers, they do so on disks and can in fact store multiple million messages. Their two purposes are to keep messages safe in case the server crashes, and to store them temporarily if an output, usually Elasticsearch, does not accept messages fast enough.

A more in-depth look at Graylog

Unfortunately the Graylog documentation is somewhat opaque on architectural details and the best insight into Graylogs architecture today is a whiteboard sketch you can find here. I cannot code Java and I did not look into Graylogs source code but here is what I gathered about how Graylog actually works under the hood.

As mentioned above, Graylog buffers messages in ring buffers. If you don’t know what a ring buffer is look at this array:

my_array = [ 1, 2, 3, 4 ]

And now imagine you connect the end of the array, so my_array[n] to the first part of the array, my_array[0] using pointers. So this way, if you counted up the array and arrived at n, you would actually read the first entry again. Ring buffers have cool properties but in practical terms that means they have a limited capacity and when that is full, the oldest entries will get overwritten by new ones.

Graylog places a ring buffer in front of every important step in its architecture. We have the input, processing, and output buffer. All of these have limited size and all of these, ideally, should fit into the L3 cache of your CPU. The size of the ring buffer also has to be a power of two. The default here is 65536.

Processors in Graylog are the processes that do the actual work. Ideally, they want to have a core for themselves so it does not make sense to raise the total amount of processors higher than the amount of cores in your system.

There are three kind of processors: InputBufferProcessor, ProcessingBufferProcessor and OutputBufferProcessor. Every one of these processors has the aforementioned ring buffer in front of it.

The InputBufferProcessor is not described anywhere. I will hazard a guess that it handles the initial conversion from a raw log message into the internal Graylog messaging format (‘message’).

The ProcessBufferProcessor does most of the heavy lifting including extractors and stream routing. This will usually be your bottleneck and the part you should tweak.

The OutputBufferProcessor handles the writing of messages to outputs. Normally this will be Elasticsearch. So increasing the number of OutputBufferProcessors will not necessarily increase throughput, you will just get more parallel connections to Elasticsearch. This could actually be a bad thing if ES cannot handle it.

So how do I run Graylog in production?

Architectural considerations

If you are reading this blog, you are probably at a size where you want to deploy a multi-node setup of Graylog. If you do this there are a couple things to keep in mind. This will not touch on Elasticsearch as I talked about that in detail in my previous post.

MongoDB

This NoSQL database is what Graylog uses to store all of its settings. MongoDB, like Elasticsearch, uses a quorum-based algorithm to decide which server is the primary. This means you want 2n+1 MongoDB servers. Since Graylog settings do not change very often, it is safe to say that 3 MongoDB nodes should be sufficient.

Make sure to use a MongoDB replica set, enable authentication and encryption, and only allow connections from other MongoDB nodes as well as Graylog.

Master

One of your Graylog servers is the master. This is a setting in the config files, and does not happen automatically. The cluster can survive without a master node for a while, but not indefinitely. I have not found documentation about what exactly the master node actually does.

Load balancing

You want to load balance web and API requests to the servers, but more importantly, you want to balance logging inputs. This is currently only really possible using nginx, since HAProxy does not yet support UDP load balancing.

Resources and tuning

Honestly, there is not much to say here. You need around 8-12 G of RAM, up to 10 G of which should be heap for Graylog. Too much heap is not really a good thing since it increases garbage collection time.

You want a number of cores dependent on your workload with a minimum of four. One for the OS, and one for each processor:

processor_wait_strategy  = blocking
inputbuffer_processors   = 1
processbuffer_processors = 1
outputbuffer_processors  = 1

If you see a buffer fill up, you should give your node one more core and bump the processor in question up by one. Usually, this will be the ProcessBufferProcessor.

If your output buffer fills up, you should first check if Elasticsearch might be the bottleneck, since increasing the number of OutputBufferProcessors increases the number of simultaneous connections made to ES.

Theoretically you can increase the size of your ring buffers and the Graylog documentation helpfully states

Raise this if raising outputbuffer_processors does not help anymore.

https://docs.graylog.org/en/3.2/pages/configuration/server.conf.html

However, I question the validity of this. If your buffers fill up on a regular basis it means your processors cannot do work fast enough. The only solution is to use more processors or reduce the amount of work. Increasing ring buffer size only helps if you often have very bursty situations.

Inputs

There are four main issues you have to be aware of when using Inputs in production: network protocols, logging protocols, reverse DNS and encryption.

Network protocols

First of all, pretty much all inputs support running either on TCP or UDP. GELF also supports HTTP. As always, you have to be aware of the pros and cons here. UDP is faster, since it does not require a handshake, but it does not guarantee that your message arrives, and also it doesn’t support encryption. Finally, HAProxy cannot load balance UDP currently (it might when HTTP/3 is released).

However, a quirk of the Graylog architecture is that a message can arrive successfully but does not get stored anyway. This is due to the fact that if you get a 200 let’s say from a GELF HTTP input, it just means that the input has gotten the message. It does not mean that the message has been processed and stored. An error in one of those steps will still mean that the message is not available in Graylog.

As such I will not be able to tell you what to do here. If the only reason you want to use TCP is transmission guarantee, then in my mind that is not a good enough reason since just because a message arrives it does not mean it is processed correctly by Graylog. If your organization requires encryption for log messages, which is definitely a good idea, you will have to use TCP. Personally, I have not run into performance issues from the use of TCP over UDP. Also keep in mind you want to be able to load balance logging inputs.

Logging protocols

I am just going to come out and say it: GELF is better than syslog. The reason for that is that GELF is just JSON and as such supports arbitrary fields. This in turn means you should not need extractors on a properly used GELF input which in turn will speed up message processing significantly. As we will see, extractors can become a major bottleneck and pain point in larger Graylog installations, so this is a huge plus.

Reverse DNS

Syslog per default sends the IP of the machine as source. However, in Graylog, you get the hostname. The reason for that is that Graylog does a reverse DNS (rDNS) lookup against that IP to get the name of the machine – for every message that arrives. This can quickly result in tens of thousands of additional DNS queries per second.

My recommendation is this: configure syslog to send the hostname of the machine where possible. This depends on your logging server, in rsyslog you want something like

$PreserveFQDN on

and make sure that the hosts file has the correct entry for 127.0.0.1.

If you have clients that just cannot do this, install a DNS cache on the Graylog server, like dnsmasq. Even just a short TTL like 60 or 90 will significantly reduce the amount of DNS queries generated.

Encryption

Many logging protocols support encryption / authentication via TLS when you run them on TCP. You definitely should enable this if possible. This is very straight forward if you ever configured certificates. Just keep a couple things in mind:

  1. Certificate has to be X.509
  2. Private key has to be PKCS#8
  3. If you use multiple Graylog servers, make sure that the FQDN of the load balancer endpoint is a subject alternative name of the certificate.

Extractors

Extractors are amazing but can also become one of your biggest bottlenecks. Luckily, they can be optimized. To reiterate here is what happens on every message that arrives at an input that has extractors:

  1. for all extractors, check if the run condition is met
  2. for all matching extractors, run the extractor

Since extractors are usually written in regex (Grok just being a fancy version of regex), it is very easy to write bad ones. Also, since most people don’t actually understand the impact of extractors, you often get cases where the run condition is the same regex as the extractor itself. Since the run condition is always evaluated against all incoming messages, this can drastically reduce throughput of your logging system.

Imagine you have ten extractors taking 500 µs each for their condition. In this theoretical case a single input processor could handle

1s / (500 µs * 10) = 200 msg / s

This is a very extreme example that usually does not happen. In fact, 95th percentile condition time for an extractor is usually single-digit µs. But it is easy to see how things like this can add up:

In case you didn’t know, you can get metrics per extractor either via the API or under system -> inputs -> extractors -> details.

As a rule of thumb you should work on every extractor that meets one of these conditions:

  1. regex not containing ^ or $
  2. condition is ‘if field matches regular expression’ which is just a copy of #1
  3. condition is a regex not including ^ or $
  4. high runtime

Fixing bad regexes

I know, learning regex sounds scary, but there are some very simple steps you can do to speed them up. The best one is to use either ^ and / or $. ^ stands for ‘beginning of string’ and $ for ‘end of string’. If you do not use one of these, you are doing a full text search. Imagine you have a log message like this:

my-awesome-application hello world message: 22

If, in this case you wanted to extract the 22 into a field it might be tempting to do:

\d+

But this will do a full text search through the entire message. If you instead did:

\d+$

It will search from the end of the message and immediately discard all messages that do not have a digit at the end, a much faster process.

Fixing bad conditions

First of all, if your condition is the same as the regex, your condition is officially useless and you would be better off just removing it. If your main regex is fast, a condition is unlikely to improve the overall run time of your extractor. As such you should really only use conditions if the extractor would otherwise produce wrong data or if your extractor is very complex and the condition very simple. So if you had these two messages

my-awesome-application hello world message: 22
my-second-favorite-application hello world message: 44

And you wanted the 22 and 44 to go into different fields, you would need to use a condition like this:

^my-second-favorite-application

The second case, where your extractor is very complex but you are able to write a simple and fast condition for it, is much rarer. I have really only seen it with grok extractors where all messages have a common prefix, as seen in the code box above. In those cases, filtering by the prefix will indeed help with run time.

GROK extractors

You often have the case that you want to extract multiple fields from a single message. Many people build multiple extractors in this case, often with very generic regexes resulting in false-positives and slow overall performance. In these cases, it is usually better to invest time once to built one good extractor using grok, which allows you to populate many fields in one extractor.

Basically, grok is just a macro-language for regex. Instead of writing the regex from scratch, you define patterns that represent a regex. See this example from logstash:

MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
HOST %{HOSTNAME}
IPORHOST (?:%{HOSTNAME}|%{IP})
HOSTPORT %{IPORHOST}:%{POSINT}

You can, and should, install grok patterns in Graylog. You can do so under System -> Grok Patterns. You can then either define your own patterns, or import a pattern file, like this one from elastic.

Once you have done that you can use grok extractors in Graylog. As I said before, the great thing about them is that you can get all fields from a message using one extractor. This helps a lot with run time, but it also makes management much easier. For example, this parses iptables messages:

^%{DMESG_UPTIME}\s%{PROG:application_name}:\sIN=%{WORD:Input_int} OUT=%{DATA:Output_int} (?:MAC)=%{COMMONMAC:Sender_MAC}\:%{COMMONMAC:Receiver_MAC}\:[A-f0-9]{2}\:[A-f0-9]{2}\s+SRC=%{IP:SRCHost} DST=%{IP:DSTHost} %{GREEDYDATA:UNWANTED} PROTO=%{WORD:Protocol} SPT=%{NUMBER:SourcePort} DPT=%{NUMBER:DestinationPort}

Explaining grok is not the aim of this blog post, the really short version is this:

%{PATTERNAME:field_name}\s+ some regex here. I can also throw away stuff%{WORD:UNWANTED}

Index Sets

Chosing the right size

I would like to refer you to my previous post, In depth guide to running Elasticsearch in production. The size of an index depends on the amount of storage you have as well as the maximum amount of shards you can put on those nodes, which is explained in that article.

In most cases, you should always rotate indices by size. The reason is that this is the only way you can make sure you have enough space to store logs. If you set to rotate by time, you simply do not know how many GBs of data accumulate in that time frame.

The only exception that I have seen is data that has to be legally retained for longer, or that is small. One could for example chose to store logs of commands entered on servers for multiple years. This data is usually very small, so storing it for longer is not an issue.

Replication and shards

In general, and I know I go against other popular blogs here, always set index replicas to at least 1. Yes, even if you don’t care about the data. The reason is that Graylog will simply not write to a cluster in cluster state red. Cluster state is red when data loss has occurred. So if you have some index set without replication, and even one node that holds a shard of that dies, Graylog will stop writing to that index set altogether and manual intervention is required. Don’t let that happen.

For shards, the short answer is that the number of shards depends on how much data you will write. Logging is a write heavy workload, and every Elasticsearch node has a limited write capacity. If we only had one single primary shard, we can only get the write capacity of one node. But if we have n primary shards we can get the write capacity of n nodes. That means that for maximum performance, number of primary shards should equal

number_of_nodes / (index_replicas + 1)

If you set index replicas to 1, every primary shard will have one replica, so you are writing twice the amount of data.

Indexer failures

An important point that I think many people ignore are indexer failures. You can check those under System -> Overview -> Indexer Failures.

An indexer failure means that your message has not been stored in Elasticsearch. It can have gone through your inputs, extractors, streams, pipelines, etc. just to end up not being written. As such you have to tightly control indexer failures and take them seriously.

One common failure is that the maximum amount of fields of the ES index has been reached. Per default, an ES index can have 1000 fields. If you try to PUT a document with new fields into an index and the maximum amount of fields has already been reached, you will get this error. Yes, you can increase the maximum amount of fields but you really shouldn’t. Every field in ES takes up space, even when its empty although this behavior has gotten better since ES 6.

If you run into this problem, probably one of two things are happening:

  1. you store everything in one index set
  2. duplicate fields

If you run Graylog at a scale that requires you to read this blog post, you most likely want to have multiple index sets. How you divide these is up to you. You could divide them per application or per department. We have for example separate index sets for Debug logs, Windows, Firewall, Routing, CLI commands entered, Access logging (web-server etc.), big applications, and, most importantly, this:


Every application that does not understand Graylog and just generates new fields for every log message, gets send to the naughty index set, together with all other misbehaving applications. This way they only destroy logging for each other, and let the well-behaved applications log in peace.

Once you have multiple index sets, you can control what data is send where by using streams. Every stream allows you to set the index set to which the data is written.

Another problem that happens often, is that organizations have duplicate field names:

department
dept
department_name
render_time
rendertime
render_time_millis
render_time_seconds

This can very quickly use up a lot of fields. This is a management issue because there should be standards on how to name these things. There are two technical solutions for this. One, as already mentioned, sent different applications to different index sets. The other is to use processing pipelines to rename fields to a common standard.

Pipelines

Pipelines allow you to run code on your messages. Basically, a pipeline gets messages from one or many streams, and then sends them through a series of rules. Every rule is a piece of code you write yourself. This can be pretty simple stuff or really complicated. Not much to say here except to keep in mind that this runs on every message in a stream. That means it will slow throughput down. Doing things like renaming fields in Graylog is a crutch and the better way is to do that right in the application. In case that isn’t possible, you could for example use code like this to change field names around:

rule "standardize-juniper-fields"
when
  has_field("application_name") && (to_string($message.application_name) == "RT_FLOW" || to_string($message.application_name) == "RT_IDS")
then
  set_field("application_name", "firewall");
  rename_field("destination-address", "DSTHost");
  rename_field("source-address", "SRCHost");
  rename_field("destination-port", "DestinationPort");
  rename_field("source-port", "SourcePort");
  rename_field("source-address_city_name", "SRCHost_city_name");
  rename_field("source-address_country_code", "SRCHost_country_code");
  rename_field("source-address_geolocation", "SRCHost_geolocation");
end

Monitoring

Luckily, Graylog is written entirely as a REST API. This makes it very easy to monitor. You can see all available metrics under System -> Nodes -> Metrics. Here are a couple metrics which you should have an eye on:

system/metrics/org.graylog2.journal.utilization-ratio
# if this goes over a couple percent, you have some
# sort of issue with writing log messages to ES
# that you have to look into.
system/indexer/failures
# every indexer failure means a message not written
# as such, every indexer failure is an error that
# needs to be fixed
system/throughput
# this shows messages/sec. a good historical metric
# to have
system/metrics/org.graylog2.shared.buffers.ProcessBufferProcessor.processTime
# the mean as well as the percentiles of this are
# useful. If you have any sudden spike, someone
# added an extractor or a pipeline that is breaking
# graylog. also good to see if your extractor
# tuning does something
system/metrics/org.apache.logging.log4j.core.Appender.warn
system/metrics/org.apache.logging.log4j.core.Appender.error
system/metrics/org.apache.logging.log4j.core.Appender.fatal
# rate of graylog-server log messages that are 
# WARN ERROR or FATAL this is good to see if
# something is wrong internally

On top of that, like every Java program, use jstat to check heap and garbage collection data.

Help! Graylog does not write to Elasticsearch anymore

Go to System -> Nodes. Click on a node. Observe. Are the buffers full or is the journal filling up?

If the buffers are full then you need more ProcessBufferProcessors. If the journal is filling up you have an issue writing to Elasticsearch. In that case, check the output in the top right corner. Is it just a low number or does it constantly switch between 0 and whatever the bulk size is you set in Graylog? In the latter case your ES cluster is most likely not fast enough to handle writing the log messages. There is a back-pressure system build into ES, and Graylog will stop writing when ES is too busy.

You can also try to rotate the active write index. Sometimes Graylog does get stuck and is unable to write, rotating the write index fixes that.

If these steps do not help, restart graylog-server. We had a recurring bug which caused an entire Graylog node to stop processing messages. Our workaround was to automatically restart graylog-server whenever the journal utilization went over 3%.

If that still does not help, you might have a corrupted Kafka journal. In this case you will have to delete it, losing all messages within it:

service graylog-server stop
rm -rf /var/lib/graylog-server/journal
service graylog-server start

In depth guide to running Elasticsearch in production

If you are here, I do not need to tell you that Elasticsearch is awesome, fast and mostly just works.
If you are here, I also do not need to tell you that Elasticsearch can be opaque, confusing, and seems to break randomly for no reason. In this post I want to share my experiences and tips on how to set up Elasticsearch correctly and avoid common pitfalls.
I am not here to make money so I will mostly just jam everything into one post instead of doing a series. Feel free to skip sections.

The basics: Clusters, Nodes, Indices and Shards

If you are really new to Elasticsearch (ES) I want to explain some basic concepts first. This section will not explain best practices at all, and focuses mainly on explaining the nomenclature. Most people can probably skip this.

Elasticsearch is a management framework for running distributed installations of Apache Lucene, a Java-based search engine. Lucene is what actually holds the data and does all the indexing and searching. ES sits on top of this and allows you to run potentially many thousands of lucene instances in parallel.

The highest level unit of ES is the cluster. A cluster is a collection of ES nodes and indices.

Nodes are instances of ES. These can be individual servers or just ES processes running on a server. Servers and nodes are not the same. A VM or physical server can hold many ES processes, each of which will be a node. Nodes can join exactly one cluster. There are different Types of node. The two most interesting of which are the Data Node and the Master-eligible Node. A single node can be of multiple types at the same time. Data nodes run all data operations. That is storing, indexing and searching of data. Master -eligible nodes vote for a master that runs the cluster and index management.

Indices are the high-level abstraction of your data. Indices do not hold data themselves. They are just another abstraction for the thing that actually holds data. Any action you do on data such as INSERTS, DELETES, indexing and searching run against an Index. Indices can belong to exactly one cluster and are comprised of Shards.

Shards are instances of Apache Lucene. A shard can hold many Documents. Shards are what does the actual data storage, indexing and searching. A shard belongs to exactly one node and index. There are two types of shards: primary and replica. These are mostly the exact same. They hold the same data, and searches run against all shards in parallel. Of all the shards that hold the same data, one is the primary shard. This is the only shard that can accept indexing requests. Should the node that the primary shard resides on die, a replica shard will take over and become the primary. Then, ES will create a new replica shard and copy the data over.

At the end of the day, we end up with something like this:

A more in-depth look at Elasticsearch

If you want to run a system, it is my belief that you need to understand the system. In this section I will explain the parts of Elasticsearch I belief you should understand if you want to manage it in production. This will not have any recommendations in it those come later. Instead it aims purely at explaining necessary background.

Quorum

It is very important to understand that Elasticsearch is a (flawed) democracy. Nodes vote on who should lead them, the master. The master runs a lot of cluster-management processes and has the last say in many matters. ES is a flawed democracy because only a subclass of citizens, the master-eligible nodes, are allowed to vote. Master-eligible are all nodes that have this in their configuration:

node.master: true

On cluster start or when the master leaves the cluster, all master-eligible nodes start an election for the new master. For this to work, you need to have 2n+1 master-eligible nodes. Otherwise it is possible to have a split-brain scenario, with two nodes receiving 50% of the votes. This is a split brain scenario and will lead to the loss of all data in one of the two partitions. So don’t have this happen. You need 2n+1 master-eligible nodes.

How nodes join the cluster

When an ES process starts, it is alone in the big, wide world. How does it know what cluster it belongs to? There are different ways this can be done. However, these days the way it should be done is using what is called Seed Hosts.

Basically, Elasticsearch nodes talk with each other constantly about all the other nodes they have seen. Because of this, a node only needs to know a couple other nodes initially to learn about the whole cluster.

//EDIT, 25th Feb 2020:
Dave Turner mentioned in the comments:

This isn’t really a constant process: nodes only share information about other nodes they have discovered when they’re not part of a cluster. Once they’ve joined a cluster they stop this and rely on the cluster’s elected master to share any changes as they occur, which saves a bunch of unnecessary network chatter. Also in 7.x they only really talk about the master-eligible nodes they have seen — the discovery process ignores all the master-ineligible nodes.

davecturner

Lets look at this example of a three node cluster:

Initial state.

In the beginning, Node A and C just know B. B is the seed host. Seed hosts are either given to ES in the form of a config file or they are put directly into elasticsearch.yml.


Node A connects and exchanges information with B

As soon as node A connects to B, B now knows of the existence of A. For A, nothing changes.

Node C connects and shares information with B

Now, C connects. As soon as this happens, B tells C about the existence of A. C and B now know all nodes in the cluster. As soon as A connects to B again, it will also learn of the existence of C.

Segments and segment merging

Above I said that shards store data. This is only partially true. At the end of the day, your data is stored on a file system in the form of.. files. In Lucene, and with that also Elasticsearch, these files are called Segments. A shard will have between one and multiple thousand segments.

Again, a segment is an actual, real file you can look at in the data directory of your Elasticsearch installation. This means that using a segment is overhead. If you want to look into one, you have to find and open it. That means if you have to open many files, there will be a lot of overhead. The problem is that segments in Lucene are immutable. That is fancy language for saying they are only written once and cannot be changed. This in turn means that every document you put into ES will create a segment with only that single document in it. So clearly, a cluster that has a billion documents has a billion segments which means there are a literal billion files on the file system, right? Well, no.

In the background, Lucene does constant segment merging. It cannot change segments, but it can create new ones with the data of two smaller segments.

This way, lucene constantly tries to keep the number of segments, which means the number of files, which means the overhead, small. It is possible to force this process by using a force merge.

Message routing

In Elasticsearch, you can run any command against any node in a cluster and the result will be the same. That is interesting because at the end of the day a document will live in only one primary shard and its replicas, and ES does not know where. There is no mapping saying a specific document lives in a specific shard.

If you are searching, then the ES node that gets the request will broadcast it to all shards in the index. This means primary and replica. These shards then look into all their segments for that document.

If your are inserting, then the ES node will randomly select a primary shard and put the document in there. It is then written to that primary shard and all of its replicas.

//EDIT, 25th Feb 2020:
Dave Turner mentioned in the comments:

The word “shard” is ambiguous here and although I think you understand this correctly it’d be easy for a reader to misinterpret how this is written. Each search only fans out to one copy of each shard in the index (either a primary or a replica, but not both). If that fails then the search will of course try again on another copy, but that’s normally very rare

davecturner

So how do I run Elasticsearch in production?

Finally, the practical part. I should mention that I managed ES mostly for logging. I will try to keep this bias out of this section, but will ultimately fail.

Sizing

The first question you need to ask and subsequently answer yourself, is about sizing. What size of ES cluster do you actually need?

RAM

I am talking about RAM first, because your RAM will limit all other resources.

Heap

ES is written in Java. Java uses a heap. You can think of this as java-reserved memory. There is all kind of stuff that is important about heap which would triple this document in size so I will get down to the most important part which is heap size.

Use as much as possible, but no more than 30G of heap size.

Here is a dirty secret many people don’t know about heap: every object in the heap needs a unique address, an object pointer. This address is of fixed length, which means that the amount of objects you can address is limited. The short version of why this matters is that at a certain point, Java will start using compressed object pointers instead of uncompressed ones. That means that every memory access will have additional steps involved and be much slower. You 100% do not want to get over this threshold, which is somewhere around 32G.

I once spend an entire week locked into a dark room doing nothing else but using esrally to benchmark different file systems, heap sizes, FS and BIOS settting combinations of Elasticsearch. Long story short here is what it had to say about heap size:

Index append latency, lower is better

The naming convention is fs_heapsize_biosflags. As you can see, starting at 32G of heap size performance suddenly starts getting worse. Same with throughput:

Index append median throughput. Higher is better.

Long story short: use 29G of RAM or 30 if you are feeling lucky, use XFS, and use hardwareprefetch and llc-prefetch if possible.

FS cache

Most people run Elasticsearch on Linux, and Linux uses RAM as file system cache. A common recommendation is to use 64G for your ES servers, with the idea that it will be half cache, half heap. I have not tested FS cache. However, it is not hard to see that large ES clusters, like for logging, can benefit greatly from having a big FS cache. If all your indices fit in heap, not so much.

//EDIT, 25th Feb 2020:
Dave Turner mentioned in the comments:

Elasticsearch 7.x uses a reasonable amount of direct memory on top of its heap and there are other overheads too which is why the recommendation is a heap size no more than 50% of your physical RAM. This is an upper bound rather than a target: a 32GB heap on a 64GB host may not leave very much space for the filesystem cache. Filesystem cache is pretty key to Elasticsearch/Lucene performance, and smaller heaps can sometimes yield better performance (they leave more space for the filesystem cache and can be cheaper to GC too).

davecturner

CPU

This depends on what you are doing with your cluster. If you do a lot of indexing, you need more and faster CPUs than if you just do logging. For logging, I found 8 cores to be more than sufficient, but you will find people out there using way more since their use case can benefit from it.

Disk

Not as straightforward as you might think. First of all, if your indices fit into RAM, your disk only matters when the node is cold. Secondly, the amount of data you can actually store depends on your index layout. Every shard is a Lucene instance and they all have memory requirement. That means there is a maximum number of shards you can fit into your heap. I will talk more about this in the index layout section.

Generally, you can put all your data disks into a RAID 0. You should replicate on Elasticsearch level, so losing a node should not matter. Do not use LVM with multiple disks as that will write only to one disk at a time, not giving you the benefit of multiple disks at all.

Regarding file system and RAID settings, I have found the following things:

  • Scheduler: cfq and deadline outperform noop. Kyber might be good if you have nvme but I have not tested it
  • QueueDepth: as high as possible
  • Readahead: yes, please
  • Raid chunk size: no impact
  • FS block size: no impact
  • FS type: XFS > ext4

Index layout

This highly depends on your use case. I can only talk from a logging background, specifically using Graylog.

Shards

Short version:

  • for write heavy workloads, primary shards = number of nodes
  • for read heavy workloads, primary shards * replication = number of nodes
  • more replicas = higher search performance

Here is the thing. If you write stuff, the maximum write performance you can get is given by this equation:

node_throughput*number_of_primary_shards

The reason is very simple: if you have only one primary shard, then you can write data only as quickly as one node can write it, because a shard only ever lives on one node. If you really wanted to optimize write performance, you should make sure that every node only has exactly one shard on it, primary or replica, since replicas obviously get the same writes as the primary, and writes are largely dependent on disk IO. Note: if you have a lot of indexing this might not be true and the bottleneck could be something else.

If you want to optimize search performance, search performance is given by this equation:

node_throughput*(number_of_primary_shards + number_of_replicas)

For searching, primary and replica shards are basically identical. So if you want to increase search performance, you can just increase the number of replicas, which can be done on the fly.

Size

Much has been written about index size. Here is what I found:

30G of heap = 140 shards maximum per node

Using more than 140 shards, I had Elasticsearch processes crash with out-of-memory errors. This is because every shard is a Lucene instance, and every instance requires a certain amount of memory. That means there is a limit for how many shards you can have per node.

If you have the amount of nodes, shards and index size, here is how many indices you can fit:

number_of_indices = (140 * number_of_nodes) / (number_of_primary_shards * replication_factor)

From that and your disk size you can easily calculate how big the indices have to be

index_size = (number_of_nodes * disk_size) / number_of_indices

However, keep in mind that bigger indices are also slower. For logging it is fine to a degree but for really search heavy applications, you should size more towards the amount of RAM you have.

Segment merging

Remember that every segment is an actual file on the file system. More segments = more overhead in reading. Basically for every search query, it goes to all the shards in the index, and from there to all the segments in the shards. Having many segments drastically increases read-IOPS of your cluster up to the point of it becoming unusable. Because of this it’s a good idea to keep the number of segments as low as possible.

There is a force_merge API that allows you to merge segments down to a certain number, like 1. If you do index rotation, for example because you use Elasticsearch for logging, it is a good idea to do regular force merges when the cluster is not in use. Force merging takes a lot of resources, and will slow your cluster down significantly. Because of this it is a good idea to not let for example Graylog do it for you, but do it yourself when the cluster is used less. You definitely want to do this if you have many indices though. Otherwise, your cluster will slowly crawl to a halt.

Cluster layout

For everything but the smallest setups it is a good idea to use dedicated master-eligible nodes. The main reasons is that you should always have 2n+1 master-eligible nodes to ensure quorum. But for data nodes you just want to be able to add a new one at any time, without having to worry about this requirement. Also, you don’t want high load on the data nodes to impact your master nodes.

Finally, master nodes are ideal candidates for seed nodes. Remember that seed nodes are the easiest way you can do node discovery in Elasticsearch. Since your master nodes will seldomly change, they are the best choice for this, as they most likely already know all other nodes in the cluster.

//EDIT, 25th Feb 2020:
Dave Turner mentioned in the comments:

Master-eligible nodes are the only possible candidates for seed nodes in 7.x, because master-ineligible nodes are ignored during discovery.

davecturner

Master nodes can be pretty small, one core and maybe 4G of RAM is enough for most clusters. As always, keep an eye on actual usage and adjust accordingly.

Monitoring

I love monitoring, and I love monitoring Elasticsearch. ES gives you an absolute ton of metrics and it gives you all of them in the form of JSON, which makes it very easy to pass into monitoring tools. Here are some helpful things to monitor:

  • number of segments
  • heap usage
  • heap GC time
  • avg. search, index, merge time
  • IOPS
  • disk utilization

Conclusion

After around 5 hours of writing this, I think I dumped everything important about ES that is in my brain into this post. I hope it saves you many of the headaches I had to endure.

Resources

https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html
https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-discovery-quorums.html
https://github.com/elastic/rally
https://tech.ebayinc.com/engineering/elasticsearch-performance-tuning-practice-at-ebay/