Review code of migrateTimestamps.pl and add write to influxdb

This commit is contained in:
Christoph Kluge 2022-06-22 12:05:41 +02:00
parent bdcb4544f6
commit 9a7f750dd2

View File

@ -9,56 +9,58 @@ use File::Slurp;
use Data::Dumper; use Data::Dumper;
use Time::Piece; use Time::Piece;
use Sort::Versions; use Sort::Versions;
use REST::Client;
### JOB-ARCHIVE ### JOB-ARCHIVE
my $localtime = localtime; my $localtime = localtime;
my $epochtime = $localtime->epoch; my $epochtime = $localtime->epoch;
my $targetDir = './cc-backend/var/job-archive'; my $archiveTarget = './cc-backend/var/job-archive';
my @Clusters; my $archiveSrc = './data/job-archive';
my $src = './data/job-archive'; my @ArchiveClusters;
# Get clusters by folder # Get clusters by job-archive/$subfolder
opendir my $dh, $src or die "can't open directory: $!"; opendir my $dh, $archiveSrc or die "can't open directory: $!";
while ( readdir $dh ) { while ( readdir $dh ) {
chomp; next if $_ eq '.' or $_ eq '..' or $_ eq 'job-archive'; chomp; next if $_ eq '.' or $_ eq '..' or $_ eq 'job-archive';
my $cluster = $_; my $cluster = $_;
push @Clusters, $cluster; push @ArchiveClusters, $cluster;
} }
# start for jobarchive # start for jobarchive
foreach my $cluster ( @Clusters ) { foreach my $cluster ( @ArchiveClusters ) {
print "Starting to update startTime in job-archive for $cluster\n"; print "Starting to update startTime in job-archive for $cluster\n";
opendir my $dhLevel1, "$src/$cluster" or die "can't open directory: $!"; opendir my $dhLevel1, "$archiveSrc/$cluster" or die "can't open directory: $!";
while ( readdir $dhLevel1 ) { while ( readdir $dhLevel1 ) {
chomp; next if $_ eq '.' or $_ eq '..'; chomp; next if $_ eq '.' or $_ eq '..';
my $level1 = $_; my $level1 = $_;
if ( -d "$src/$cluster/$level1" ) { if ( -d "$archiveSrc/$cluster/$level1" ) {
opendir my $dhLevel2, "$src/$cluster/$level1" or die "can't open directory: $!"; opendir my $dhLevel2, "$archiveSrc/$cluster/$level1" or die "can't open directory: $!";
while ( readdir $dhLevel2 ) { while ( readdir $dhLevel2 ) {
chomp; next if $_ eq '.' or $_ eq '..'; chomp; next if $_ eq '.' or $_ eq '..';
my $level2 = $_; my $level2 = $_;
my $src = "$src/$cluster/$level1/$level2"; my $jobSource = "$archiveSrc/$cluster/$level1/$level2";
my $target = "$targetDir/$cluster/$level1/$level2/"; my $jobTarget = "$archiveTarget/$cluster/$level1/$level2/";
my $oldsrc = $src; my $jobOrigin = $jobSource;
# check if files are directly accessible (old format) else get subfolders as file and update path
if ( ! -e "$src/meta.json") { if ( ! -e "$jobSource/meta.json") {
my @files = read_dir($src); my @folders = read_dir($jobSource);
if (!@files) { if (!@folders) {
next; next;
} }
$src = "$src/".$files[0]; # Only use first subfolder for now TODO
$jobSource = "$jobSource/".$folders[0];
} }
# check if subfolder contains file, else remove source and skip
if ( ! -e "$src/meta.json") { if ( ! -e "$jobSource/meta.json") {
rmtree $oldsrc; rmtree $jobOrigin;
next; next;
} }
my $str = read_file("$src/meta.json"); my $rawstr = read_file("$jobSource/meta.json");
my $json = decode_json($str); my $json = decode_json($rawstr);
# NOTE Start meta.json iteration here # NOTE Start meta.json iteration here
# my $random_number = int(rand(UPPERLIMIT)) + LOWERLIMIT; # my $random_number = int(rand(UPPERLIMIT)) + LOWERLIMIT;
@ -68,21 +70,22 @@ foreach my $cluster ( @Clusters ) {
$json->{startTime} = $epochtime - (int(rand(432000)) + 86400); $json->{startTime} = $epochtime - (int(rand(432000)) + 86400);
$json->{stopTime} = $json->{startTime} + $json->{duration}; $json->{stopTime} = $json->{startTime} + $json->{duration};
$target .= $json->{startTime}; # Add starttime subfolder to target path
$jobTarget .= $json->{startTime};
if ( not -d $target ){ # target is not directory
if ( not -d $jobTarget ){
# print "Writing files\n"; # print "Writing files\n";
# print "$cluster/$level1/$level2\n"; # print "$cluster/$level1/$level2\n";
make_path($target); make_path($jobTarget);
$str = encode_json($json); my $outstr = encode_json($json);
write_file("$target/meta.json", $str); write_file("$jobTarget/meta.json", $outstr);
$str = read_file("$src/data.json"); my $datstr = read_file("$jobSource/data.json");
write_file("$target/data.json", $str); write_file("$jobTarget/data.json", $datstr);
} else { } else {
#rmtree $src; # rmtree $jobSource;
} }
} }
} }
@ -94,88 +97,157 @@ sleep(2);
## CHECKPOINTS ## CHECKPOINTS
chomp(my $checkpointStart=`date --date 'TZ="Europe/Berlin" 0:00 7 days ago' +%s`); chomp(my $checkpointStart=`date --date 'TZ="Europe/Berlin" 0:00 7 days ago' +%s`);
my $halfday = 43200; my $halfday = 43200;
my $targetDirCheckpoints = './data/cc-metric-store_new'; my $checkpTarget = './data/cc-metric-store_new';
my $srcCheckpoints = './data/cc-metric-store'; my $checkpSource = './data/cc-metric-store';
my @ClustersCheckpoints; my @CheckpClusters;
# Get clusters by folder # Get clusters by cc-metric-store/$subfolder
opendir my $dhc, $srcCheckpoints or die "can't open directory: $!"; opendir my $dhc, $checkpSource or die "can't open directory: $!";
while ( readdir $dhc ) { while ( readdir $dhc ) {
chomp; next if $_ eq '.' or $_ eq '..' or $_ eq 'job-archive'; chomp; next if $_ eq '.' or $_ eq '..' or $_ eq 'job-archive';
my $cluster = $_; my $cluster = $_;
push @ClustersCheckpoints, $cluster; push @CheckpClusters, $cluster;
} }
# start for checkpoints # start for checkpoints
foreach my $cluster ( @ClustersCheckpoints ) { foreach my $cluster ( @CheckpClusters ) {
print "Starting to update startTime in checkpoint-files for $cluster\n"; print "Starting to update startTime in checkpoint-files for $cluster\n";
opendir my $dhLevel1, "$srcCheckpoints/$cluster" or die "can't open directory: $!"; opendir my $dhLevel1, "$checkpSource/$cluster" or die "can't open directory: $!";
while ( readdir $dhLevel1 ) { while ( readdir $dhLevel1 ) {
chomp; next if $_ eq '.' or $_ eq '..'; chomp; next if $_ eq '.' or $_ eq '..';
# Nodename as level1-folder
my $level1 = $_; my $level1 = $_;
if ( -d "$srcCheckpoints/$cluster/$level1" ) { if ( -d "$checkpSource/$cluster/$level1" ) {
my $srcCheckpoints = "$srcCheckpoints/$cluster/$level1/"; my $nodeSource = "$checkpSource/$cluster/$level1/";
my $target = "$targetDirCheckpoints/$cluster/$level1/"; my $nodeTarget = "$checkpTarget/$cluster/$level1/";
my $oldsrc = $srcCheckpoints; my $nodeOrigin = $nodeSource;
my @files; my @files;
if ( -e "$srcCheckpoints/1609459200.json") { # 1609459200 == First Checkpoint time in latest dump if ( -e "$nodeSource/1609459200.json") { # 1609459200 == First Checkpoint time in latest dump
@files = read_dir($srcCheckpoints); @files = read_dir($nodeSource);
my $length = @files; my $length = @files;
if (!@files || $length != 14) { # needs 14 files == 7 days worth of data if (!@files || $length != 14) { # needs 14 files == 7 days worth of data
next; next;
} }
} } else {
# rmtree $nodeOrigin;
if ( ! -e "$srcCheckpoints/1609459200.json") {
# rmtree $oldsrc;
next; next;
} }
my @sortedFiles = sort { versioncmp($a,$b) } @files; # sort alphanumerically: _Really_ start with index == 0 == 1609459200.json my @sortedFiles = sort { versioncmp($a,$b) } @files; # sort alphanumerically: _Really_ start with index == 0 == 1609459200.json
if ( not -d $nodeTarget ){
# print "processing files for $level1 \n";
make_path($nodeTarget);
while (my ($index, $file) = each(@sortedFiles)) { while (my ($index, $file) = each(@sortedFiles)) {
# print "$file\n"; # print "$file\n";
my $str = read_file("$srcCheckpoints/$file"); my $rawstr = read_file("$nodeSource/$file");
my $json = decode_json($str); my $json = decode_json($rawstr);
my $timestamp = $checkpointStart + ($index * $halfday); my $newTimestamp = $checkpointStart + ($index * $halfday);
my $oldTimestamp = $json->{from}; # Get Diff from old Timestamp
my $timeDiff = $newTimestamp - $json->{from};
# print "$oldTimestamp -> $timestamp in $srcCheckpoints\n"; # Set new timestamp
$json->{from} = $newTimestamp;
$json->{from} = $timestamp;
foreach my $metric (keys %{$json->{metrics}}) { foreach my $metric (keys %{$json->{metrics}}) {
$json->{metrics}->{$metric}->{start} -= $oldTimestamp; $json->{metrics}->{$metric}->{start} += $timeDiff;
$json->{metrics}->{$metric}->{start} += $timestamp;
} }
my $targetFile = "$target/$timestamp.json"; my $outstr = encode_json($json);
make_path($target); write_file("$nodeTarget/$newTimestamp.json", $outstr);
$str = encode_json($json); }
write_file("$targetFile", $str); } else {
# rmtree $nodeSource;
} }
# if ( not -d $target ){
# print "Writing files\n";
#
# print "$cluster/$level1/$level2\n";
# make_path($target);
#
# $str = encode_json($json);
# write_file("$target/meta.json", $str);
#
# $str = read_file("$srcCheckpoints/data.json");
# write_file("$target/data.json", $str);
# } else {
# #rmtree $src;
# }
} }
} }
} }
print "Done for checkpoints\n"; print "Done for checkpoints\n";
sleep(2);
### INFLUXDB
my $newCheckpoints = './data/cc-metric-store_new';
my $verbose = 1;
my $restClient = REST::Client->new();
$restClient->setHost('http://localhost:8087');
$restClient->addHeader('Authorization', "Token 74008ea2a8dad5e6f856838a90c6392e");
$restClient->addHeader('Content-Type', 'text/plain; charset=utf-8');
$restClient->addHeader('Accept', 'application/json');
$restClient->getUseragent()->ssl_opts(SSL_verify_mode => 0); # Temporary: Disable Cert Check
$restClient->getUseragent()->ssl_opts(verify_hostname => 0); # Temporary: Disable Cert Check
# Get clusters by folder: Reuse from above
# start to read checkpoints for influx
foreach my $cluster ( @CheckpClusters ) {
print "Starting to read checkpoint-files into influx for $cluster\n";
opendir my $dhLevel1, "$newCheckpoints/$cluster" or die "can't open directory: $!";
while ( readdir $dhLevel1 ) {
chomp; next if $_ eq '.' or $_ eq '..';
my $level1 = $_;
if ( -d "$newCheckpoints/$cluster/$level1" ) {
my $nodeSource = "$newCheckpoints/$cluster/$level1/";
my @files = read_dir($nodeSource);
my $length = @files;
if (!@files || $length != 14) { # needs 14 files == 7 days worth of data
next;
}
my @sortedFiles = sort { versioncmp($a,$b) } @files; # sort alphanumerically: _Really_ start with index == 0 == 1609459200.json
my $nodeMeasurement;
foreach my $file (@sortedFiles) {
# print "$file\n";
my $rawstr = read_file("$nodeSource/$file");
my $json = decode_json($rawstr);
my $fileMeasurement;
foreach my $metric (keys %{$json->{metrics}}) {
my $start = $json->{metrics}->{$metric}->{start};
my $timestep = $json->{metrics}->{$metric}->{frequency};
my $data = $json->{metrics}->{$metric}->{data};
my $length = @$data;
my $measurement;
while (my ($index, $value) = each(@$data)) {
if ($value) {
my $timestamp = $start + ($timestep * $index);
$measurement .= "$metric,cluster=$cluster,hostname=$level1,type=node value=".$value." $timestamp"."\n";
}
}
# Use v2 API for Influx2
if ($measurement) {
# print "Adding: #VALUES $length KEY $metric"."\n";
$fileMeasurement .= $measurement;
}
}
if ($fileMeasurement) {
$nodeMeasurement .= $fileMeasurement;
}
}
$restClient->POST("/api/v2/write?org=ClusterCockpit&bucket=ClusterCockpit&precision=s", "$nodeMeasurement");
my $responseCode = $restClient->responseCode();
if ( $responseCode eq '204') {
if ( $verbose ) {
print "INFLUX API WRITE: CLUSTER $cluster HOST $level1"."\n";
};
} else {
if ( $responseCode ne '422' ) { # Exclude High Frequency Error 422 - Temporary!
my $response = $restClient->responseContent();
print "INFLUX API WRITE ERROR CODE ".$responseCode.": ".$response."\n";
};
};
}
}
}
print "Done for influx\n";