mirror of
https://github.com/ClusterCockpit/cc-metric-store.git
synced 2025-07-06 13:33:49 +02:00
commit
c3f6b9e33e
@ -21,6 +21,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/api"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/memorystore"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/runtimeEnv"
|
||||
@ -68,7 +69,7 @@ func main() {
|
||||
|
||||
restoreFrom := startupTime.Add(-d)
|
||||
log.Printf("Loading checkpoints newer than %s\n", restoreFrom.Format(time.RFC3339))
|
||||
files, err := ms.FromCheckpoint(config.Keys.Checkpoints.RootDir, restoreFrom.Unix())
|
||||
files, err := ms.FromCheckpointFiles(config.Keys.Checkpoints.RootDir, restoreFrom.Unix())
|
||||
loadedData := ms.SizeInBytes() / 1024 / 1024 // In MB
|
||||
if err != nil {
|
||||
log.Fatalf("Loading checkpoints failed: %s\n", err.Error())
|
||||
@ -90,11 +91,12 @@ func main() {
|
||||
ctx, shutdown := context.WithCancel(context.Background())
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(3)
|
||||
wg.Add(4)
|
||||
|
||||
memorystore.Retention(&wg, ctx)
|
||||
memorystore.Checkpointing(&wg, ctx)
|
||||
memorystore.Archiving(&wg, ctx)
|
||||
avro.DataStaging(&wg, ctx)
|
||||
|
||||
r := http.NewServeMux()
|
||||
api.MountRoutes(r)
|
||||
@ -132,9 +134,9 @@ func main() {
|
||||
MinVersion: tls.VersionTLS12,
|
||||
PreferServerCipherSuites: true,
|
||||
})
|
||||
fmt.Printf("HTTPS server listening at %s...", config.Keys.HttpConfig.Address)
|
||||
fmt.Printf("HTTPS server listening at %s...\n", config.Keys.HttpConfig.Address)
|
||||
} else {
|
||||
fmt.Printf("HTTP server listening at %s...", config.Keys.HttpConfig.Address)
|
||||
fmt.Printf("HTTP server listening at %s...\n", config.Keys.HttpConfig.Address)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
|
@ -7,10 +7,10 @@ rm sample_alex.txt
|
||||
|
||||
while [ true ]; do
|
||||
echo "Alex Metrics for hwthread types and type-ids"
|
||||
timestamp="$(date '+%s%N')"
|
||||
timestamp="$(date '+%s')"
|
||||
echo "Timestamp : "+$timestamp
|
||||
for metric in cpu_load cpu_user;do # flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in a0603 a0903; do # a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do
|
||||
for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in a0603 a0903 a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do
|
||||
for id in {0..127}; do
|
||||
echo "$metric,cluster=alex,hostname=$hostname,type=hwthread,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_alex.txt
|
||||
done
|
||||
@ -20,8 +20,8 @@ while [ true ]; do
|
||||
curl -X 'POST' 'http://localhost:8082/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" --data-binary @sample_alex.txt
|
||||
|
||||
echo "Fritz Metrics for hwthread types and type-ids"
|
||||
for metric in cpu_load cpu_user; do # flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in f0201 f0202; do # f0203 f0204 f0205 f0206 f0207 f0208 f0209 f0210 f0211 f0212 f0213 f0214 f0215 f0217 f0218 f0219 f0220 f0221 f0222 f0223 f0224 f0225 f0226 f0227 f0228 f0229 f0230 f0231 f0232 f0233 f0234 f0235 f0236 f0237 f0238 f0239 f0240 f0241 f0242 f0243 f0244 f0245 f0246 f0247 f0248 f0249 f0250 f0251 f0252 f0253 f0254 f0255 f0256 f0257 f0258 f0259 f0260 f0261 f0262 f0263 f0264 f0378; do
|
||||
for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in f0201 f0202 f0203 f0204 f0205 f0206 f0207 f0208 f0209 f0210 f0211 f0212 f0213 f0214 f0215 f0217 f0218 f0219 f0220 f0221 f0222 f0223 f0224 f0225 f0226 f0227 f0228 f0229 f0230 f0231 f0232 f0233 f0234 f0235 f0236 f0237 f0238 f0239 f0240 f0241 f0242 f0243 f0244 f0245 f0246 f0247 f0248 f0249 f0250 f0251 f0252 f0253 f0254 f0255 f0256 f0257 f0258 f0259 f0260 f0261 f0262 f0263 f0264 f0378; do
|
||||
for id in {0..71}; do
|
||||
echo "$metric,cluster=fritz,hostname=$hostname,type=hwthread,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_fritz.txt
|
||||
done
|
||||
@ -34,8 +34,8 @@ while [ true ]; do
|
||||
rm sample_alex.txt
|
||||
|
||||
echo "Alex Metrics for accelerator types and type-ids"
|
||||
for metric in cpu_load cpu_user; do # flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in a0603 a0903; do # a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do
|
||||
for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in a0603 a0903 a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do
|
||||
for id in 00000000:49:00.0 00000000:0E:00.0 00000000:D1:00.0 00000000:90:00.0 00000000:13:00.0 00000000:96:00.0 00000000:CC:00.0 00000000:4F:00.0; do
|
||||
echo "$metric,cluster=alex,hostname=$hostname,type=accelerator,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_alex.txt
|
||||
done
|
||||
@ -47,8 +47,8 @@ while [ true ]; do
|
||||
rm sample_alex.txt
|
||||
|
||||
echo "Alex Metrics for memoryDomain types and type-ids"
|
||||
for metric in cpu_load cpu_user; do # flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in a0603 a0903; do # a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do
|
||||
for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in a0603 a0903 a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do
|
||||
for id in {0..7}; do
|
||||
echo "$metric,cluster=alex,hostname=$hostname,type=memoryDomain,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_alex.txt
|
||||
done
|
||||
@ -60,8 +60,8 @@ while [ true ]; do
|
||||
rm sample_alex.txt
|
||||
|
||||
echo "Alex Metrics for socket types and type-ids"
|
||||
for metric in cpu_load cpu_user; do # flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in a0603 a0903; do # a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do
|
||||
for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in a0603 a0903 a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do
|
||||
for id in {0..1}; do
|
||||
echo "$metric,cluster=alex,hostname=$hostname,type=socket,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_alex.txt
|
||||
done
|
||||
@ -71,8 +71,8 @@ while [ true ]; do
|
||||
curl -X 'POST' 'http://localhost:8082/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" --data-binary @sample_alex.txt
|
||||
|
||||
echo "Fritz Metrics for socket types and type-ids"
|
||||
for metric in cpu_load cpu_user; do # flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in f0201 f0202; do # f0203 f0204 f0205 f0206 f0207 f0208 f0209 f0210 f0211 f0212 f0213 f0214 f0215 f0217 f0218 f0219 f0220 f0221 f0222 f0223 f0224 f0225 f0226 f0227 f0228 f0229 f0230 f0231 f0232 f0233 f0234 f0235 f0236 f0237 f0238 f0239 f0240 f0241 f0242 f0243 f0244 f0245 f0246 f0247 f0248 f0249 f0250 f0251 f0252 f0253 f0254 f0255 f0256 f0257 f0258 f0259 f0260 f0261 f0262 f0263 f0264 f0378; do
|
||||
for metric in cpu_load cpu_user flops_any cpu_irq cpu_system ipc cpu_idle cpu_iowait core_power clock; do
|
||||
for hostname in f0201 f0202 f0203 f0204 f0205 f0206 f0207 f0208 f0209 f0210 f0211 f0212 f0213 f0214 f0215 f0217 f0218 f0219 f0220 f0221 f0222 f0223 f0224 f0225 f0226 f0227 f0228 f0229 f0230 f0231 f0232 f0233 f0234 f0235 f0236 f0237 f0238 f0239 f0240 f0241 f0242 f0243 f0244 f0245 f0246 f0247 f0248 f0249 f0250 f0251 f0252 f0253 f0254 f0255 f0256 f0257 f0258 f0259 f0260 f0261 f0262 f0263 f0264 f0378; do
|
||||
for id in {0..1}; do
|
||||
echo "$metric,cluster=fritz,hostname=$hostname,type=socket,type-id=$id value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_fritz.txt
|
||||
done
|
||||
@ -85,8 +85,8 @@ while [ true ]; do
|
||||
rm sample_alex.txt
|
||||
|
||||
echo "Alex Metrics for nodes"
|
||||
for metric in cpu_irq cpu_load; do # mem_cached net_bytes_in cpu_user cpu_idle nfs4_read mem_used nfs4_write nfs4_total ib_xmit ib_xmit_pkts net_bytes_out cpu_iowait ib_recv cpu_system ib_recv_pkts; do
|
||||
for hostname in a0603 a0903; do # a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do
|
||||
for metric in cpu_irq cpu_load mem_cached net_bytes_in cpu_user cpu_idle nfs4_read mem_used nfs4_write nfs4_total ib_xmit ib_xmit_pkts net_bytes_out cpu_iowait ib_recv cpu_system ib_recv_pkts; do
|
||||
for hostname in a0603 a0903 a0832 a0329 a0702 a0122 a1624 a0731 a0224 a0704 a0631 a0225 a0222 a0427 a0603 a0429 a0833 a0705 a0901 a0601 a0227 a0804 a0322 a0226 a0126 a0129 a0605 a0801 a0934 a1622 a0902 a0428 a0537 a1623 a1722 a0228 a0701 a0326 a0327 a0123 a0321 a1621 a0323 a0124 a0534 a0931 a0324 a0933 a0424 a0905 a0128 a0532 a0805 a0521 a0535 a0932 a0127 a0325 a0633 a0831 a0803 a0426 a0425 a0229 a1721 a0602 a0632 a0223 a0422 a0423 a0536 a0328 a0703 anvme7 a0125 a0221 a0604 a0802 a0522 a0531 a0533 a0904; do
|
||||
echo "$metric,cluster=alex,hostname=$hostname,type=node value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_alex.txt
|
||||
done
|
||||
done
|
||||
@ -94,8 +94,8 @@ while [ true ]; do
|
||||
curl -X 'POST' 'http://localhost:8082/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" --data-binary @sample_alex.txt
|
||||
|
||||
echo "Fritz Metrics for nodes"
|
||||
for metric in cpu_irq cpu_load; do # mem_cached net_bytes_in cpu_user cpu_idle nfs4_read mem_used nfs4_write nfs4_total ib_xmit ib_xmit_pkts net_bytes_out cpu_iowait ib_recv cpu_system ib_recv_pkts; do
|
||||
for hostname in f0201 f0202; do # f0203 f0204 f0205 f0206 f0207 f0208 f0209 f0210 f0211 f0212 f0213 f0214 f0215 f0217 f0218 f0219 f0220 f0221 f0222 f0223 f0224 f0225 f0226 f0227 f0228 f0229 f0230 f0231 f0232 f0233 f0234 f0235 f0236 f0237 f0238 f0239 f0240 f0241 f0242 f0243 f0244 f0245 f0246 f0247 f0248 f0249 f0250 f0251 f0252 f0253 f0254 f0255 f0256 f0257 f0258 f0259 f0260 f0261 f0262 f0263 f0264 f0378; do
|
||||
for metric in cpu_irq cpu_load mem_cached net_bytes_in cpu_user cpu_idle nfs4_read mem_used nfs4_write nfs4_total ib_xmit ib_xmit_pkts net_bytes_out cpu_iowait ib_recv cpu_system ib_recv_pkts; do
|
||||
for hostname in f0201 f0202 f0203 f0204 f0205 f0206 f0207 f0208 f0209 f0210 f0211 f0212 f0213 f0214 f0215 f0217 f0218 f0219 f0220 f0221 f0222 f0223 f0224 f0225 f0226 f0227 f0228 f0229 f0230 f0231 f0232 f0233 f0234 f0235 f0236 f0237 f0238 f0239 f0240 f0241 f0242 f0243 f0244 f0245 f0246 f0247 f0248 f0249 f0250 f0251 f0252 f0253 f0254 f0255 f0256 f0257 f0258 f0259 f0260 f0261 f0262 f0263 f0264 f0378; do
|
||||
echo "$metric,cluster=fritz,hostname=$hostname,type=node value=$((1 + RANDOM % 100)).0 $timestamp" >>sample_fritz.txt
|
||||
done
|
||||
done
|
||||
@ -107,4 +107,4 @@ while [ true ]; do
|
||||
|
||||
sleep 1m
|
||||
done
|
||||
# curl -X 'POST' 'http://localhost:8081/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" -d $'cpu_load,cluster=alex,hostname=a042,type=hwthread,type-id=0 value=35.0 1725827464642231296'
|
||||
# curl -X 'POST' 'http://localhost:8081/api/write/?cluster=alex' -H "Authorization: Bearer $JWT" -d $'cpu_load,cluster=alex,hostname=a042,type=hwthread,type-id=0 value=35.0 1725827464642231296'
|
2
go.mod
2
go.mod
@ -18,8 +18,10 @@ require (
|
||||
github.com/go-openapi/jsonreference v0.21.0 // indirect
|
||||
github.com/go-openapi/spec v0.21.0 // indirect
|
||||
github.com/go-openapi/swag v0.23.0 // indirect
|
||||
github.com/golang/snappy v0.0.1 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/klauspost/compress v1.17.9 // indirect
|
||||
github.com/linkedin/goavro/v2 v2.13.1 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
|
10
go.sum
10
go.sum
@ -3,6 +3,7 @@ github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6Xge
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
|
||||
@ -19,6 +20,8 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr
|
||||
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
|
||||
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
|
||||
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
|
||||
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
|
||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
@ -44,6 +47,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/linkedin/goavro/v2 v2.13.1 h1:4qZ5M0QzQFDRqccsroJlgOJznqAS/TpdvXg55h429+I=
|
||||
github.com/linkedin/goavro/v2 v2.13.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/nats-io/nats.go v1.36.0 h1:suEUPuWzTSse/XhESwqLxXGuj8vGRuPRoG7MoRN/qyU=
|
||||
@ -59,6 +64,10 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN
|
||||
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
|
||||
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/swaggo/files v1.0.1 h1:J1bVJ4XHZNq0I46UU90611i9/YzdrF7x92oX1ig5IdE=
|
||||
@ -117,6 +126,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/memorystore"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/util"
|
||||
@ -329,7 +330,19 @@ func decodeLine(dec *lineprotocol.Decoder,
|
||||
return fmt.Errorf("host %s: timestamp : %#v with error : %#v", host, t, err.Error())
|
||||
}
|
||||
|
||||
if err := ms.WriteToLevel(lvl, selector, t.Unix(), []memorystore.Metric{metric}); err != nil {
|
||||
time := t.Unix()
|
||||
|
||||
if config.Keys.Checkpoints.FileFormat != "json" {
|
||||
avro.LineProtocolMessages <- &avro.AvroStruct{
|
||||
MetricName: string(metricBuf),
|
||||
Cluster: cluster,
|
||||
Node: host,
|
||||
Selector: append([]string{}, selector...),
|
||||
Value: metric.Value,
|
||||
Timestamp: time}
|
||||
}
|
||||
|
||||
if err := ms.WriteToLevel(lvl, selector, time, []memorystore.Metric{metric}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
474
internal/avro/avroCheckpoint.go
Normal file
474
internal/avro/avroCheckpoint.go
Normal file
@ -0,0 +1,474 @@
|
||||
package avro
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/util"
|
||||
"github.com/linkedin/goavro/v2"
|
||||
)
|
||||
|
||||
var NumWorkers int = 4
|
||||
|
||||
var ErrNoNewData error = errors.New("no data in the pool")
|
||||
|
||||
func (as *AvroStore) ToCheckpoint(dir string, dumpAll bool) (int, error) {
|
||||
levels := make([]*AvroLevel, 0)
|
||||
selectors := make([][]string, 0)
|
||||
as.root.lock.RLock()
|
||||
// Cluster
|
||||
for sel1, l1 := range as.root.children {
|
||||
l1.lock.RLock()
|
||||
// Node
|
||||
for sel2, l2 := range l1.children {
|
||||
l2.lock.RLock()
|
||||
// Frequency
|
||||
for sel3, l3 := range l2.children {
|
||||
levels = append(levels, l3)
|
||||
selectors = append(selectors, []string{sel1, sel2, sel3})
|
||||
}
|
||||
l2.lock.RUnlock()
|
||||
}
|
||||
l1.lock.RUnlock()
|
||||
}
|
||||
as.root.lock.RUnlock()
|
||||
|
||||
type workItem struct {
|
||||
level *AvroLevel
|
||||
dir string
|
||||
selector []string
|
||||
}
|
||||
|
||||
n, errs := int32(0), int32(0)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(NumWorkers)
|
||||
work := make(chan workItem, NumWorkers*2)
|
||||
for range NumWorkers {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
for workItem := range work {
|
||||
var from int64 = getTimestamp(workItem.dir)
|
||||
|
||||
if err := workItem.level.toCheckpoint(workItem.dir, from, dumpAll); err != nil {
|
||||
if err == ErrNoNewData {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("error while checkpointing %#v: %s", workItem.selector, err.Error())
|
||||
atomic.AddInt32(&errs, 1)
|
||||
} else {
|
||||
atomic.AddInt32(&n, 1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for i := range len(levels) {
|
||||
dir := path.Join(dir, path.Join(selectors[i]...))
|
||||
work <- workItem{
|
||||
level: levels[i],
|
||||
dir: dir,
|
||||
selector: selectors[i],
|
||||
}
|
||||
}
|
||||
|
||||
close(work)
|
||||
wg.Wait()
|
||||
|
||||
if errs > 0 {
|
||||
return int(n), fmt.Errorf("%d errors happend while creating avro checkpoints (%d successes)", errs, n)
|
||||
}
|
||||
return int(n), nil
|
||||
}
|
||||
|
||||
// getTimestamp returns the timestamp from the directory name
|
||||
func getTimestamp(dir string) int64 {
|
||||
// Extract the resolution and timestamp from the directory name
|
||||
// The existing avro file will be in epoch timestamp format
|
||||
// iterate over all the files in the directory and find the maximum timestamp
|
||||
// and return it
|
||||
|
||||
resolution := path.Base(dir)
|
||||
dir = path.Dir(dir)
|
||||
|
||||
files, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
var maxTs int64 = 0
|
||||
|
||||
if len(files) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
if file.IsDir() {
|
||||
continue
|
||||
}
|
||||
name := file.Name()
|
||||
|
||||
if len(name) < 5 || !strings.HasSuffix(name, ".avro") || !strings.HasPrefix(name, resolution+"_") {
|
||||
continue
|
||||
}
|
||||
|
||||
ts, err := strconv.ParseInt(name[strings.Index(name, "_")+1:len(name)-5], 10, 64)
|
||||
if err != nil {
|
||||
fmt.Printf("error while parsing timestamp: %s\n", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
if ts > maxTs {
|
||||
maxTs = ts
|
||||
}
|
||||
}
|
||||
|
||||
interval, _ := time.ParseDuration(config.Keys.Checkpoints.Interval)
|
||||
updateTime := time.Unix(maxTs, 0).Add(interval).Add(time.Duration(CheckpointBufferMinutes-1) * time.Minute).Unix()
|
||||
|
||||
if updateTime < time.Now().Unix() {
|
||||
return 0
|
||||
}
|
||||
|
||||
return maxTs
|
||||
}
|
||||
|
||||
func (l *AvroLevel) toCheckpoint(dir string, from int64, dumpAll bool) error {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
// fmt.Printf("Checkpointing directory: %s\n", dir)
|
||||
// filepath contains the resolution
|
||||
int_res, _ := strconv.Atoi(path.Base(dir))
|
||||
|
||||
// find smallest overall timestamp in l.data map and delete it from l.data
|
||||
var minTs int64 = int64(1<<63 - 1)
|
||||
for ts, dat := range l.data {
|
||||
if ts < minTs && len(dat) != 0 {
|
||||
minTs = ts
|
||||
}
|
||||
}
|
||||
|
||||
if from == 0 && minTs != int64(1<<63-1) {
|
||||
from = minTs
|
||||
}
|
||||
|
||||
if from == 0 {
|
||||
return ErrNoNewData
|
||||
}
|
||||
|
||||
var schema string
|
||||
var codec *goavro.Codec
|
||||
record_list := make([]map[string]interface{}, 0)
|
||||
|
||||
var f *os.File
|
||||
|
||||
filePath := dir + fmt.Sprintf("_%d.avro", from)
|
||||
|
||||
var err error
|
||||
|
||||
fp_, err_ := os.Stat(filePath)
|
||||
if errors.Is(err_, os.ErrNotExist) {
|
||||
err = os.MkdirAll(path.Dir(dir), 0o755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create directory: %v", err)
|
||||
}
|
||||
} else if fp_.Size() != 0 {
|
||||
f, err = os.Open(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open existing avro file: %v", err)
|
||||
}
|
||||
|
||||
br := bufio.NewReader(f)
|
||||
|
||||
reader, err := goavro.NewOCFReader(br)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create OCF reader: %v", err)
|
||||
}
|
||||
codec = reader.Codec()
|
||||
schema = codec.Schema()
|
||||
|
||||
f.Close()
|
||||
}
|
||||
|
||||
time_ref := time.Now().Add(time.Duration(-CheckpointBufferMinutes+1) * time.Minute).Unix()
|
||||
|
||||
if dumpAll {
|
||||
time_ref = time.Now().Unix()
|
||||
}
|
||||
|
||||
// Empty values
|
||||
if len(l.data) == 0 {
|
||||
// we checkpoint avro files every 60 seconds
|
||||
repeat := 60 / int_res
|
||||
|
||||
for range repeat {
|
||||
record_list = append(record_list, make(map[string]interface{}))
|
||||
}
|
||||
}
|
||||
|
||||
readFlag := true
|
||||
|
||||
for ts := range l.data {
|
||||
flag := false
|
||||
if ts < time_ref {
|
||||
data := l.data[ts]
|
||||
|
||||
schema_gen, err := generateSchema(data)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
flag, schema, err = compareSchema(schema, schema_gen)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to compare read and generated schema: %v", err)
|
||||
}
|
||||
if flag && readFlag && !errors.Is(err_, os.ErrNotExist) {
|
||||
|
||||
f.Close()
|
||||
|
||||
f, err = os.Open(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open Avro file: %v", err)
|
||||
}
|
||||
|
||||
br := bufio.NewReader(f)
|
||||
|
||||
ocfReader, err := goavro.NewOCFReader(br)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create OCF reader while changing schema: %v", err)
|
||||
}
|
||||
|
||||
for ocfReader.Scan() {
|
||||
record, err := ocfReader.Read()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read record: %v", err)
|
||||
}
|
||||
|
||||
record_list = append(record_list, record.(map[string]interface{}))
|
||||
}
|
||||
|
||||
f.Close()
|
||||
|
||||
err = os.Remove(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete file: %v", err)
|
||||
}
|
||||
|
||||
readFlag = false
|
||||
}
|
||||
codec, err = goavro.NewCodec(schema)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create codec after merged schema: %v", err)
|
||||
}
|
||||
|
||||
record_list = append(record_list, generateRecord(data))
|
||||
delete(l.data, ts)
|
||||
}
|
||||
}
|
||||
|
||||
if len(record_list) == 0 {
|
||||
return ErrNoNewData
|
||||
}
|
||||
|
||||
f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to append new avro file: %v", err)
|
||||
}
|
||||
|
||||
// fmt.Printf("Codec : %#v\n", codec)
|
||||
|
||||
writer, err := goavro.NewOCFWriter(goavro.OCFConfig{
|
||||
W: f,
|
||||
Codec: codec,
|
||||
CompressionName: goavro.CompressionDeflateLabel,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create OCF writer: %v", err)
|
||||
}
|
||||
|
||||
// Append the new record
|
||||
if err := writer.Append(record_list); err != nil {
|
||||
return fmt.Errorf("failed to append record: %v", err)
|
||||
}
|
||||
|
||||
f.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func compareSchema(schemaRead, schemaGen string) (bool, string, error) {
|
||||
var genSchema, readSchema AvroSchema
|
||||
|
||||
if schemaRead == "" {
|
||||
return false, schemaGen, nil
|
||||
}
|
||||
|
||||
// Unmarshal the schema strings into AvroSchema structs
|
||||
if err := json.Unmarshal([]byte(schemaGen), &genSchema); err != nil {
|
||||
return false, "", fmt.Errorf("failed to parse generated schema: %v", err)
|
||||
}
|
||||
if err := json.Unmarshal([]byte(schemaRead), &readSchema); err != nil {
|
||||
return false, "", fmt.Errorf("failed to parse read schema: %v", err)
|
||||
}
|
||||
|
||||
sort.Slice(genSchema.Fields, func(i, j int) bool {
|
||||
return genSchema.Fields[i].Name < genSchema.Fields[j].Name
|
||||
})
|
||||
|
||||
sort.Slice(readSchema.Fields, func(i, j int) bool {
|
||||
return readSchema.Fields[i].Name < readSchema.Fields[j].Name
|
||||
})
|
||||
|
||||
// Check if schemas are identical
|
||||
schemasEqual := true
|
||||
if len(genSchema.Fields) <= len(readSchema.Fields) {
|
||||
|
||||
for i := range genSchema.Fields {
|
||||
if genSchema.Fields[i].Name != readSchema.Fields[i].Name {
|
||||
schemasEqual = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If schemas are identical, return the read schema
|
||||
if schemasEqual {
|
||||
return false, schemaRead, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Create a map to hold unique fields from both schemas
|
||||
fieldMap := make(map[string]AvroField)
|
||||
|
||||
// Add fields from the read schema
|
||||
for _, field := range readSchema.Fields {
|
||||
fieldMap[field.Name] = field
|
||||
}
|
||||
|
||||
// Add or update fields from the generated schema
|
||||
for _, field := range genSchema.Fields {
|
||||
fieldMap[field.Name] = field
|
||||
}
|
||||
|
||||
// Create a union schema by collecting fields from the map
|
||||
var mergedFields []AvroField
|
||||
for _, field := range fieldMap {
|
||||
mergedFields = append(mergedFields, field)
|
||||
}
|
||||
|
||||
// Sort fields by name for consistency
|
||||
sort.Slice(mergedFields, func(i, j int) bool {
|
||||
return mergedFields[i].Name < mergedFields[j].Name
|
||||
})
|
||||
|
||||
// Create the merged schema
|
||||
mergedSchema := AvroSchema{
|
||||
Type: "record",
|
||||
Name: genSchema.Name,
|
||||
Fields: mergedFields,
|
||||
}
|
||||
|
||||
// Check if schemas are identical
|
||||
schemasEqual = len(mergedSchema.Fields) == len(readSchema.Fields)
|
||||
if schemasEqual {
|
||||
for i := range mergedSchema.Fields {
|
||||
if mergedSchema.Fields[i].Name != readSchema.Fields[i].Name {
|
||||
schemasEqual = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if schemasEqual {
|
||||
return false, schemaRead, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Marshal the merged schema back to JSON
|
||||
mergedSchemaJson, err := json.Marshal(mergedSchema)
|
||||
if err != nil {
|
||||
return false, "", fmt.Errorf("failed to marshal merged schema: %v", err)
|
||||
}
|
||||
|
||||
return true, string(mergedSchemaJson), nil
|
||||
}
|
||||
|
||||
func generateSchema(data map[string]util.Float) (string, error) {
|
||||
|
||||
// Define the Avro schema structure
|
||||
schema := map[string]interface{}{
|
||||
"type": "record",
|
||||
"name": "DataRecord",
|
||||
"fields": []map[string]interface{}{},
|
||||
}
|
||||
|
||||
fieldTracker := make(map[string]struct{})
|
||||
|
||||
for key := range data {
|
||||
if _, exists := fieldTracker[key]; !exists {
|
||||
key = correctKey(key)
|
||||
|
||||
field := map[string]interface{}{
|
||||
"name": key,
|
||||
"type": "double",
|
||||
"default": -1.0,
|
||||
}
|
||||
schema["fields"] = append(schema["fields"].([]map[string]interface{}), field)
|
||||
fieldTracker[key] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
schemaString, err := json.Marshal(schema)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to marshal schema: %v", err)
|
||||
}
|
||||
|
||||
return string(schemaString), nil
|
||||
}
|
||||
|
||||
func generateRecord(data map[string]util.Float) map[string]interface{} {
|
||||
|
||||
record := make(map[string]interface{})
|
||||
|
||||
// Iterate through each map in data
|
||||
for key, value := range data {
|
||||
key = correctKey(key)
|
||||
|
||||
// Set the value in the record
|
||||
record[key] = value.Double()
|
||||
}
|
||||
|
||||
return record
|
||||
}
|
||||
|
||||
func correctKey(key string) string {
|
||||
// Replace any invalid characters in the key
|
||||
// For example, replace spaces with underscores
|
||||
key = strings.ReplaceAll(key, ":", "___")
|
||||
key = strings.ReplaceAll(key, ".", "__")
|
||||
|
||||
return key
|
||||
}
|
||||
|
||||
func ReplaceKey(key string) string {
|
||||
// Replace any invalid characters in the key
|
||||
// For example, replace spaces with underscores
|
||||
key = strings.ReplaceAll(key, "___", ":")
|
||||
key = strings.ReplaceAll(key, "__", ".")
|
||||
|
||||
return key
|
||||
}
|
80
internal/avro/avroHelper.go
Normal file
80
internal/avro/avroHelper.go
Normal file
@ -0,0 +1,80 @@
|
||||
package avro
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||
)
|
||||
|
||||
func DataStaging(wg *sync.WaitGroup, ctx context.Context) {
|
||||
|
||||
// AvroPool is a pool of Avro writers.
|
||||
go func() {
|
||||
if config.Keys.Checkpoints.FileFormat == "json" {
|
||||
wg.Done() // Mark this goroutine as done
|
||||
return // Exit the goroutine
|
||||
}
|
||||
|
||||
defer wg.Done()
|
||||
|
||||
var avroLevel *AvroLevel
|
||||
oldSelector := make([]string, 0)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case val := <-LineProtocolMessages:
|
||||
//Fetch the frequency of the metric from the global configuration
|
||||
freq, err := config.Keys.GetMetricFrequency(val.MetricName)
|
||||
if err != nil {
|
||||
fmt.Printf("Error fetching metric frequency: %s\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
metricName := ""
|
||||
|
||||
for _, selector_name := range val.Selector {
|
||||
metricName += selector_name + Delimiter
|
||||
}
|
||||
|
||||
metricName += val.MetricName
|
||||
|
||||
// Create a new selector for the Avro level
|
||||
// The selector is a slice of strings that represents the path to the
|
||||
// Avro level. It is created by appending the cluster, node, and metric
|
||||
// name to the selector.
|
||||
var selector []string
|
||||
selector = append(selector, val.Cluster, val.Node, strconv.FormatInt(freq, 10))
|
||||
|
||||
if !testEq(oldSelector, selector) {
|
||||
// Get the Avro level for the metric
|
||||
avroLevel = avroStore.root.findAvroLevelOrCreate(selector)
|
||||
|
||||
// If the Avro level is nil, create a new one
|
||||
if avroLevel == nil {
|
||||
fmt.Printf("Error creating or finding the level with cluster : %s, node : %s, metric : %s\n", val.Cluster, val.Node, val.MetricName)
|
||||
}
|
||||
oldSelector = append([]string{}, selector...)
|
||||
}
|
||||
|
||||
avroLevel.addMetric(metricName, val.Value, val.Timestamp, int(freq))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func testEq(a, b []string) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i := range a {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
161
internal/avro/avroStruct.go
Normal file
161
internal/avro/avroStruct.go
Normal file
@ -0,0 +1,161 @@
|
||||
package avro
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/util"
|
||||
)
|
||||
|
||||
var LineProtocolMessages = make(chan *AvroStruct)
|
||||
var Delimiter = "ZZZZZ"
|
||||
|
||||
// CheckpointBufferMinutes should always be in minutes.
|
||||
// Its controls the amount of data to hold for given amount of time.
|
||||
var CheckpointBufferMinutes = 3
|
||||
|
||||
type AvroStruct struct {
|
||||
MetricName string
|
||||
Cluster string
|
||||
Node string
|
||||
Selector []string
|
||||
Value util.Float
|
||||
Timestamp int64
|
||||
}
|
||||
|
||||
type AvroStore struct {
|
||||
root AvroLevel
|
||||
}
|
||||
|
||||
var avroStore AvroStore
|
||||
|
||||
type AvroLevel struct {
|
||||
children map[string]*AvroLevel
|
||||
data map[int64]map[string]util.Float
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
type AvroField struct {
|
||||
Name string `json:"name"`
|
||||
Type interface{} `json:"type"`
|
||||
Default interface{} `json:"default,omitempty"`
|
||||
}
|
||||
|
||||
type AvroSchema struct {
|
||||
Type string `json:"type"`
|
||||
Name string `json:"name"`
|
||||
Fields []AvroField `json:"fields"`
|
||||
}
|
||||
|
||||
func (l *AvroLevel) findAvroLevelOrCreate(selector []string) *AvroLevel {
|
||||
if len(selector) == 0 {
|
||||
return l
|
||||
}
|
||||
|
||||
// Allow concurrent reads:
|
||||
l.lock.RLock()
|
||||
var child *AvroLevel
|
||||
var ok bool
|
||||
if l.children == nil {
|
||||
// Children map needs to be created...
|
||||
l.lock.RUnlock()
|
||||
} else {
|
||||
child, ok := l.children[selector[0]]
|
||||
l.lock.RUnlock()
|
||||
if ok {
|
||||
return child.findAvroLevelOrCreate(selector[1:])
|
||||
}
|
||||
}
|
||||
|
||||
// The level does not exist, take write lock for unqiue access:
|
||||
l.lock.Lock()
|
||||
// While this thread waited for the write lock, another thread
|
||||
// could have created the child node.
|
||||
if l.children != nil {
|
||||
child, ok = l.children[selector[0]]
|
||||
if ok {
|
||||
l.lock.Unlock()
|
||||
return child.findAvroLevelOrCreate(selector[1:])
|
||||
}
|
||||
}
|
||||
|
||||
child = &AvroLevel{
|
||||
data: make(map[int64]map[string]util.Float, 0),
|
||||
children: nil,
|
||||
}
|
||||
|
||||
if l.children != nil {
|
||||
l.children[selector[0]] = child
|
||||
} else {
|
||||
l.children = map[string]*AvroLevel{selector[0]: child}
|
||||
}
|
||||
l.lock.Unlock()
|
||||
return child.findAvroLevelOrCreate(selector[1:])
|
||||
}
|
||||
|
||||
func (l *AvroLevel) addMetric(metricName string, value util.Float, timestamp int64, Freq int) {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
KeyCounter := int(CheckpointBufferMinutes * 60 / Freq)
|
||||
|
||||
// Create keys in advance for the given amount of time
|
||||
if len(l.data) != KeyCounter {
|
||||
if len(l.data) == 0 {
|
||||
for i := range KeyCounter {
|
||||
l.data[timestamp+int64(i*Freq)] = make(map[string]util.Float, 0)
|
||||
}
|
||||
} else {
|
||||
//Get the last timestamp
|
||||
var lastTs int64
|
||||
for ts := range l.data {
|
||||
if ts > lastTs {
|
||||
lastTs = ts
|
||||
}
|
||||
}
|
||||
// Create keys for the next KeyCounter timestamps
|
||||
l.data[lastTs+int64(Freq)] = make(map[string]util.Float, 0)
|
||||
}
|
||||
}
|
||||
|
||||
closestTs := int64(0)
|
||||
minDiff := int64(Freq) + 1 // Start with diff just outside the valid range
|
||||
found := false
|
||||
|
||||
// Iterate over timestamps and choose the one which is within range.
|
||||
// Since its epoch time, we check if the difference is less than 60 seconds.
|
||||
for ts, dat := range l.data {
|
||||
// Check if timestamp is within range
|
||||
diff := timestamp - ts
|
||||
if diff < -int64(Freq) || diff > int64(Freq) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Metric already present at this timestamp — skip
|
||||
if _, ok := dat[metricName]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if this is the closest timestamp so far
|
||||
if Abs(diff) < minDiff {
|
||||
minDiff = Abs(diff)
|
||||
closestTs = ts
|
||||
found = true
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
l.data[closestTs][metricName] = value
|
||||
}
|
||||
}
|
||||
|
||||
func GetAvroStore() *AvroStore {
|
||||
return &avroStore
|
||||
}
|
||||
|
||||
// Abs returns the absolute value of x.
|
||||
func Abs(x int64) int64 {
|
||||
if x < 0 {
|
||||
return -x
|
||||
}
|
||||
return x
|
||||
}
|
@ -81,9 +81,10 @@ type Config struct {
|
||||
Metrics map[string]MetricConfig `json:"metrics"`
|
||||
HttpConfig *HttpConfig `json:"http-api"`
|
||||
Checkpoints struct {
|
||||
Interval string `json:"interval"`
|
||||
RootDir string `json:"directory"`
|
||||
Restore string `json:"restore"`
|
||||
FileFormat string `json:"file-format"`
|
||||
Interval string `json:"interval"`
|
||||
RootDir string `json:"directory"`
|
||||
Restore string `json:"restore"`
|
||||
} `json:"checkpoints"`
|
||||
Debug struct {
|
||||
DumpToFile string `json:"dump-to-file"`
|
||||
@ -113,3 +114,10 @@ func Init(file string) {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Config) GetMetricFrequency(metricName string) (int64, error) {
|
||||
if metric, ok := c.Metrics[metricName]; ok {
|
||||
return metric.Frequency, nil
|
||||
}
|
||||
return 0, fmt.Errorf("metric %s not found", metricName)
|
||||
}
|
||||
|
@ -122,7 +122,8 @@ func archiveCheckpoints(dir string, archiveDir string, from int64, deleteInstead
|
||||
return 0, err
|
||||
}
|
||||
|
||||
files, err := findFiles(entries, from, false)
|
||||
extension := config.Keys.Checkpoints.FileFormat
|
||||
files, err := findFiles(entries, from, extension, false)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -19,8 +19,10 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/util"
|
||||
"github.com/linkedin/goavro/v2"
|
||||
)
|
||||
|
||||
// Whenever changed, update MarshalJSON as well!
|
||||
@ -41,42 +43,78 @@ var lastCheckpoint time.Time
|
||||
|
||||
func Checkpointing(wg *sync.WaitGroup, ctx context.Context) {
|
||||
lastCheckpoint = time.Now()
|
||||
ms := GetMemoryStore()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
d, err := time.ParseDuration(config.Keys.Checkpoints.Interval)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if d <= 0 {
|
||||
return
|
||||
}
|
||||
if config.Keys.Checkpoints.FileFormat == "json" {
|
||||
ms := GetMemoryStore()
|
||||
|
||||
ticks := func() <-chan time.Time {
|
||||
if d <= 0 {
|
||||
return nil
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
d, err := time.ParseDuration(config.Keys.Checkpoints.Interval)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if d <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
ticks := func() <-chan time.Time {
|
||||
if d <= 0 {
|
||||
return nil
|
||||
}
|
||||
return time.NewTicker(d).C
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticks:
|
||||
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
|
||||
now := time.Now()
|
||||
n, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir,
|
||||
lastCheckpoint.Unix(), now.Unix())
|
||||
if err != nil {
|
||||
log.Printf("checkpointing failed: %s\n", err.Error())
|
||||
} else {
|
||||
log.Printf("done: %d checkpoint files created\n", n)
|
||||
lastCheckpoint = now
|
||||
}
|
||||
}
|
||||
}
|
||||
return time.NewTicker(d).C
|
||||
}()
|
||||
for {
|
||||
} else {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
d, _ := time.ParseDuration("1m")
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticks:
|
||||
log.Printf("start checkpointing (starting at %s)...\n", lastCheckpoint.Format(time.RFC3339))
|
||||
now := time.Now()
|
||||
n, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir,
|
||||
lastCheckpoint.Unix(), now.Unix())
|
||||
if err != nil {
|
||||
log.Printf("checkpointing failed: %s\n", err.Error())
|
||||
} else {
|
||||
log.Printf("done: %d checkpoint files created\n", n)
|
||||
lastCheckpoint = now
|
||||
case <-time.After(time.Duration(avro.CheckpointBufferMinutes) * time.Minute):
|
||||
// This is the first tick untill we collect the data for given minutes.
|
||||
avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, false)
|
||||
// log.Printf("Checkpointing %d avro files", count)
|
||||
|
||||
}
|
||||
|
||||
ticks := func() <-chan time.Time {
|
||||
if d <= 0 {
|
||||
return nil
|
||||
}
|
||||
return time.NewTicker(d).C
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticks:
|
||||
// Regular ticks of 1 minute to write data.
|
||||
avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, false)
|
||||
// log.Printf("Checkpointing %d avro files", count)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// As `Float` implements a custom MarshalJSON() function,
|
||||
@ -264,19 +302,7 @@ func (l *Level) toCheckpoint(dir string, from, to int64, m *MemoryStore) error {
|
||||
return bw.Flush()
|
||||
}
|
||||
|
||||
// Metrics stored at the lowest 2 levels are not loaded (root and cluster)!
|
||||
// This function can only be called once and before the very first write or read.
|
||||
// Different host's data is loaded to memory in parallel.
|
||||
func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
|
||||
if _, err := os.Stat(dir); os.IsNotExist(err) {
|
||||
// The directory does not exist, so create it using os.MkdirAll()
|
||||
err := os.MkdirAll(dir, 0755) // 0755 sets the permissions for the directory
|
||||
if err != nil {
|
||||
log.Fatalf("Error creating directory: %#v\n", err)
|
||||
}
|
||||
fmt.Printf("%#v Directory created successfully.\n", dir)
|
||||
}
|
||||
|
||||
func (m *MemoryStore) FromCheckpoint(dir string, from int64, extension string) (int, error) {
|
||||
var wg sync.WaitGroup
|
||||
work := make(chan [2]string, NumWorkers)
|
||||
n, errs := int32(0), int32(0)
|
||||
@ -287,7 +313,7 @@ func (m *MemoryStore) FromCheckpoint(dir string, from int64) (int, error) {
|
||||
defer wg.Done()
|
||||
for host := range work {
|
||||
lvl := m.root.findLevelOrCreate(host[:], len(m.Metrics))
|
||||
nn, err := lvl.fromCheckpoint(filepath.Join(dir, host[0], host[1]), from, m)
|
||||
nn, err := lvl.fromCheckpoint(m, filepath.Join(dir, host[0], host[1]), from, extension)
|
||||
if err != nil {
|
||||
log.Fatalf("error while loading checkpoints: %s", err.Error())
|
||||
atomic.AddInt32(&errs, 1)
|
||||
@ -344,6 +370,234 @@ done:
|
||||
return int(n), nil
|
||||
}
|
||||
|
||||
// Metrics stored at the lowest 2 levels are not loaded (root and cluster)!
|
||||
// This function can only be called once and before the very first write or read.
|
||||
// Different host's data is loaded to memory in parallel.
|
||||
func (m *MemoryStore) FromCheckpointFiles(dir string, from int64) (int, error) {
|
||||
|
||||
if _, err := os.Stat(dir); os.IsNotExist(err) {
|
||||
// The directory does not exist, so create it using os.MkdirAll()
|
||||
err := os.MkdirAll(dir, 0755) // 0755 sets the permissions for the directory
|
||||
if err != nil {
|
||||
log.Fatalf("Error creating directory: %#v\n", err)
|
||||
}
|
||||
fmt.Printf("%#v Directory created successfully.\n", dir)
|
||||
}
|
||||
|
||||
// Config read (replace with your actual config read)
|
||||
fileFormat := config.Keys.Checkpoints.FileFormat
|
||||
if fileFormat == "" {
|
||||
fileFormat = "avro"
|
||||
}
|
||||
|
||||
// Map to easily get the fallback format
|
||||
oppositeFormat := map[string]string{
|
||||
"json": "avro",
|
||||
"avro": "json",
|
||||
}
|
||||
|
||||
// First, attempt to load the specified format
|
||||
if found, err := checkFilesWithExtension(dir, fileFormat); err != nil {
|
||||
return 0, fmt.Errorf("error checking files with extension: %v", err)
|
||||
} else if found {
|
||||
log.Printf("Loading %s files because fileformat is %s\n", fileFormat, fileFormat)
|
||||
return m.FromCheckpoint(dir, from, fileFormat)
|
||||
}
|
||||
|
||||
// If not found, attempt the opposite format
|
||||
altFormat := oppositeFormat[fileFormat]
|
||||
if found, err := checkFilesWithExtension(dir, altFormat); err != nil {
|
||||
return 0, fmt.Errorf("error checking files with extension: %v", err)
|
||||
} else if found {
|
||||
log.Printf("Loading %s files but fileformat is %s\n", altFormat, fileFormat)
|
||||
return m.FromCheckpoint(dir, from, altFormat)
|
||||
}
|
||||
|
||||
log.Println("No valid checkpoint files found in the directory.")
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func checkFilesWithExtension(dir string, extension string) (bool, error) {
|
||||
found := false
|
||||
|
||||
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("error accessing path %s: %v", path, err)
|
||||
}
|
||||
if !info.IsDir() && filepath.Ext(info.Name()) == "."+extension {
|
||||
found = true
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error walking through directories: %s", err)
|
||||
}
|
||||
|
||||
return found, nil
|
||||
}
|
||||
|
||||
func (l *Level) loadAvroFile(m *MemoryStore, f *os.File, from int64) error {
|
||||
br := bufio.NewReader(f)
|
||||
|
||||
fileName := f.Name()[strings.LastIndex(f.Name(), "/")+1:]
|
||||
resolution, err := strconv.ParseInt(fileName[0:strings.Index(fileName, "_")], 10, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while reading avro file (resolution parsing) : %s", err)
|
||||
}
|
||||
|
||||
from_timestamp, err := strconv.ParseInt(fileName[strings.Index(fileName, "_")+1:len(fileName)-5], 10, 64)
|
||||
|
||||
// Same logic according to lineprotocol
|
||||
from_timestamp -= (resolution / 2)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("error converting timestamp from the avro file : %s", err)
|
||||
}
|
||||
|
||||
// fmt.Printf("File : %s with resolution : %d\n", fileName, resolution)
|
||||
|
||||
var recordCounter int64 = 0
|
||||
|
||||
// Create a new OCF reader from the buffered reader
|
||||
ocfReader, err := goavro.NewOCFReader(br)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
metricsData := make(map[string]util.FloatArray)
|
||||
|
||||
for ocfReader.Scan() {
|
||||
datum, err := ocfReader.Read()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while reading avro file : %s", err)
|
||||
}
|
||||
|
||||
record, ok := datum.(map[string]interface{})
|
||||
if !ok {
|
||||
panic("failed to assert datum as map[string]interface{}")
|
||||
}
|
||||
|
||||
for key, value := range record {
|
||||
metricsData[key] = append(metricsData[key], util.ConvertToFloat(value.(float64)))
|
||||
}
|
||||
|
||||
recordCounter += 1
|
||||
}
|
||||
|
||||
to := (from_timestamp + (recordCounter / (60 / resolution) * 60))
|
||||
if to < from {
|
||||
return nil
|
||||
}
|
||||
|
||||
for key, floatArray := range metricsData {
|
||||
metricName := avro.ReplaceKey(key)
|
||||
|
||||
if strings.Contains(metricName, avro.Delimiter) {
|
||||
subString := strings.Split(metricName, avro.Delimiter)
|
||||
|
||||
lvl := l
|
||||
|
||||
for i := 0; i < len(subString)-1; i++ {
|
||||
|
||||
sel := subString[i]
|
||||
|
||||
if lvl.children == nil {
|
||||
lvl.children = make(map[string]*Level)
|
||||
}
|
||||
|
||||
child, ok := lvl.children[sel]
|
||||
if !ok {
|
||||
child = &Level{
|
||||
metrics: make([]*buffer, len(m.Metrics)),
|
||||
children: nil,
|
||||
}
|
||||
lvl.children[sel] = child
|
||||
}
|
||||
lvl = child
|
||||
}
|
||||
|
||||
leafMetricName := subString[len(subString)-1]
|
||||
err = lvl.createBuffer(m, leafMetricName, floatArray, from_timestamp, resolution)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while creating buffers from avroReader : %s", err)
|
||||
}
|
||||
} else {
|
||||
err = l.createBuffer(m, metricName, floatArray, from_timestamp, resolution)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error while creating buffers from avroReader : %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Level) createBuffer(m *MemoryStore, metricName string, floatArray util.FloatArray, from int64, resolution int64) error {
|
||||
n := len(floatArray)
|
||||
b := &buffer{
|
||||
frequency: resolution,
|
||||
start: from,
|
||||
data: floatArray[0:n:n],
|
||||
prev: nil,
|
||||
next: nil,
|
||||
archived: true,
|
||||
}
|
||||
b.close()
|
||||
|
||||
minfo, ok := m.Metrics[metricName]
|
||||
if !ok {
|
||||
return nil
|
||||
// return errors.New("Unkown metric: " + name)
|
||||
}
|
||||
|
||||
prev := l.metrics[minfo.Offset]
|
||||
if prev == nil {
|
||||
l.metrics[minfo.Offset] = b
|
||||
} else {
|
||||
if prev.start > b.start {
|
||||
return errors.New("wooops")
|
||||
}
|
||||
|
||||
b.prev = prev
|
||||
prev.next = b
|
||||
|
||||
missingCount := ((int(b.start) - int(prev.start)) - len(prev.data)*int(b.frequency))
|
||||
if missingCount > 0 {
|
||||
missingCount /= int(b.frequency)
|
||||
|
||||
for range missingCount {
|
||||
prev.data = append(prev.data, util.NaN)
|
||||
}
|
||||
|
||||
prev.data = prev.data[0:len(prev.data):len(prev.data)]
|
||||
}
|
||||
}
|
||||
l.metrics[minfo.Offset] = b
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Level) loadJsonFile(m *MemoryStore, f *os.File, from int64) error {
|
||||
br := bufio.NewReader(f)
|
||||
cf := &CheckpointFile{}
|
||||
if err := json.NewDecoder(br).Decode(cf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cf.To != 0 && cf.To < from {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := l.loadFile(cf, m); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
|
||||
for name, metric := range cf.Metrics {
|
||||
n := len(metric.Data)
|
||||
@ -399,7 +653,7 @@ func (l *Level) loadFile(cf *CheckpointFile, m *MemoryStore) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, error) {
|
||||
func (l *Level) fromCheckpoint(m *MemoryStore, dir string, from int64, extension string) (int, error) {
|
||||
direntries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
@ -409,7 +663,7 @@ func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
jsonFiles := make([]fs.DirEntry, 0)
|
||||
allFiles := make([]fs.DirEntry, 0)
|
||||
filesLoaded := 0
|
||||
for _, e := range direntries {
|
||||
if e.IsDir() {
|
||||
@ -418,25 +672,32 @@ func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
|
||||
children: make(map[string]*Level),
|
||||
}
|
||||
|
||||
files, err := child.fromCheckpoint(path.Join(dir, e.Name()), from, m)
|
||||
files, err := child.fromCheckpoint(m, path.Join(dir, e.Name()), from, extension)
|
||||
filesLoaded += files
|
||||
if err != nil {
|
||||
return filesLoaded, err
|
||||
}
|
||||
|
||||
l.children[e.Name()] = child
|
||||
} else if strings.HasSuffix(e.Name(), ".json") {
|
||||
jsonFiles = append(jsonFiles, e)
|
||||
} else if strings.HasSuffix(e.Name(), "."+extension) {
|
||||
allFiles = append(allFiles, e)
|
||||
} else {
|
||||
return filesLoaded, errors.New("unexpected file: " + dir + "/" + e.Name())
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
files, err := findFiles(jsonFiles, from, true)
|
||||
files, err := findFiles(allFiles, from, extension, true)
|
||||
if err != nil {
|
||||
return filesLoaded, err
|
||||
}
|
||||
|
||||
loaders := map[string]func(*MemoryStore, *os.File, int64) error{
|
||||
"json": l.loadJsonFile,
|
||||
"avro": l.loadAvroFile,
|
||||
}
|
||||
|
||||
loader := loaders[extension]
|
||||
|
||||
for _, filename := range files {
|
||||
f, err := os.Open(path.Join(dir, filename))
|
||||
if err != nil {
|
||||
@ -444,17 +705,7 @@ func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
br := bufio.NewReader(f)
|
||||
cf := &CheckpointFile{}
|
||||
if err = json.NewDecoder(br).Decode(cf); err != nil {
|
||||
return filesLoaded, err
|
||||
}
|
||||
|
||||
if cf.To != 0 && cf.To < from {
|
||||
continue
|
||||
}
|
||||
|
||||
if err = l.loadFile(cf, m); err != nil {
|
||||
if err = loader(m, f, from); err != nil {
|
||||
return filesLoaded, err
|
||||
}
|
||||
|
||||
@ -467,10 +718,14 @@ func (l *Level) fromCheckpoint(dir string, from int64, m *MemoryStore) (int, err
|
||||
// This will probably get very slow over time!
|
||||
// A solution could be some sort of an index file in which all other files
|
||||
// and the timespan they contain is listed.
|
||||
func findFiles(direntries []fs.DirEntry, t int64, findMoreRecentFiles bool) ([]string, error) {
|
||||
func findFiles(direntries []fs.DirEntry, t int64, extension string, findMoreRecentFiles bool) ([]string, error) {
|
||||
nums := map[string]int64{}
|
||||
for _, e := range direntries {
|
||||
ts, err := strconv.ParseInt(strings.TrimSuffix(e.Name(), ".json"), 10, 64)
|
||||
if !strings.HasSuffix(e.Name(), "."+extension) {
|
||||
continue
|
||||
}
|
||||
|
||||
ts, err := strconv.ParseInt(e.Name()[strings.Index(e.Name(), "_")+1:len(e.Name())-5], 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/avro"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/config"
|
||||
"github.com/ClusterCockpit/cc-metric-store/internal/util"
|
||||
"github.com/ClusterCockpit/cc-metric-store/pkg/resampler"
|
||||
@ -76,13 +77,94 @@ func GetMemoryStore() *MemoryStore {
|
||||
}
|
||||
|
||||
func Shutdown() {
|
||||
ms := GetMemoryStore()
|
||||
log.Printf("Writing to '%s'...\n", config.Keys.Checkpoints.RootDir)
|
||||
files, err := ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
||||
var files int
|
||||
var err error
|
||||
|
||||
ms := GetMemoryStore()
|
||||
|
||||
if config.Keys.Checkpoints.FileFormat == "json" {
|
||||
files, err = ms.ToCheckpoint(config.Keys.Checkpoints.RootDir, lastCheckpoint.Unix(), time.Now().Unix())
|
||||
} else {
|
||||
files, err = avro.GetAvroStore().ToCheckpoint(config.Keys.Checkpoints.RootDir, true)
|
||||
close(avro.LineProtocolMessages)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Writing checkpoint failed: %s\n", err.Error())
|
||||
}
|
||||
log.Printf("Done! (%d files written)\n", files)
|
||||
|
||||
// ms.PrintHeirarchy()
|
||||
}
|
||||
|
||||
// func (m *MemoryStore) PrintHeirarchy() {
|
||||
// m.root.lock.Lock()
|
||||
// defer m.root.lock.Unlock()
|
||||
|
||||
// fmt.Printf("Root : \n")
|
||||
|
||||
// for lvl1, sel1 := range m.root.children {
|
||||
// fmt.Printf("\t%s\n", lvl1)
|
||||
// for lvl2, sel2 := range sel1.children {
|
||||
// fmt.Printf("\t\t%s\n", lvl2)
|
||||
// if lvl1 == "fritz" && lvl2 == "f0201" {
|
||||
|
||||
// for name, met := range m.Metrics {
|
||||
// mt := sel2.metrics[met.Offset]
|
||||
|
||||
// fmt.Printf("\t\t\t\t%s\n", name)
|
||||
// fmt.Printf("\t\t\t\t")
|
||||
|
||||
// for mt != nil {
|
||||
// // if name == "cpu_load" {
|
||||
// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data)
|
||||
// // }
|
||||
// mt = mt.prev
|
||||
// }
|
||||
// fmt.Printf("\n")
|
||||
|
||||
// }
|
||||
// }
|
||||
// for lvl3, sel3 := range sel2.children {
|
||||
// if lvl1 == "fritz" && lvl2 == "f0201" && lvl3 == "hwthread70" {
|
||||
|
||||
// fmt.Printf("\t\t\t\t\t%s\n", lvl3)
|
||||
|
||||
// for name, met := range m.Metrics {
|
||||
// mt := sel3.metrics[met.Offset]
|
||||
|
||||
// fmt.Printf("\t\t\t\t\t\t%s\n", name)
|
||||
|
||||
// fmt.Printf("\t\t\t\t\t\t")
|
||||
|
||||
// for mt != nil {
|
||||
// // if name == "clock" {
|
||||
// fmt.Printf("%d(%d) -> %#v", mt.start, len(mt.data), mt.data)
|
||||
|
||||
// mt = mt.prev
|
||||
// }
|
||||
// fmt.Printf("\n")
|
||||
|
||||
// }
|
||||
|
||||
// // for i, _ := range sel3.metrics {
|
||||
// // fmt.Printf("\t\t\t\t\t%s\n", getName(configmetrics, i))
|
||||
// // }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
func getName(m *MemoryStore, i int) string {
|
||||
for key, val := range m.Metrics {
|
||||
if val.Offset == i {
|
||||
return key
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func Retention(wg *sync.WaitGroup, ctx context.Context) {
|
||||
|
@ -28,6 +28,10 @@ func (f Float) MarshalJSON() ([]byte, error) {
|
||||
return strconv.AppendFloat(make([]byte, 0, 10), float64(f), 'f', 3, 64), nil
|
||||
}
|
||||
|
||||
func (f Float) Double() float64 {
|
||||
return float64(f)
|
||||
}
|
||||
|
||||
func (f *Float) UnmarshalJSON(input []byte) error {
|
||||
if string(input) == "null" {
|
||||
*f = NaN
|
||||
@ -45,6 +49,14 @@ func (f *Float) UnmarshalJSON(input []byte) error {
|
||||
// Same as `[]Float`, but can be marshaled to JSON with less allocations.
|
||||
type FloatArray []Float
|
||||
|
||||
func ConvertToFloat(input float64) Float {
|
||||
if input == -1.0 {
|
||||
return NaN
|
||||
} else {
|
||||
return Float(input)
|
||||
}
|
||||
}
|
||||
|
||||
func (fa FloatArray) MarshalJSON() ([]byte, error) {
|
||||
buf := make([]byte, 0, 2+len(fa)*8)
|
||||
buf = append(buf, '[')
|
||||
|
Loading…
x
Reference in New Issue
Block a user