mirror of
				https://github.com/ClusterCockpit/cc-metric-collector.git
				synced 2025-10-20 21:05:06 +02:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			v0.7.2
			...
			app_metric
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 1ce40aea16 | 
							
								
								
									
										164
									
								
								receivers/appMetricReceiver.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										164
									
								
								receivers/appMetricReceiver.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,164 @@ | |||||||
|  | package receivers | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"bufio" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"fmt" | ||||||
|  | 	"net" | ||||||
|  | 	"os" | ||||||
|  | 	"sync" | ||||||
|  |  | ||||||
|  | 	cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger" | ||||||
|  | 	lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric" | ||||||
|  | 	influx "github.com/influxdata/line-protocol" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // SampleReceiver configuration: receiver type, listen address, port | ||||||
|  | type AppMetricReceiverConfig struct { | ||||||
|  | 	Type       string `json:"type"` | ||||||
|  | 	SocketFile string `json:"socket_file"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type AppMetricReceiver struct { | ||||||
|  | 	receiver | ||||||
|  | 	config AppMetricReceiverConfig | ||||||
|  |  | ||||||
|  | 	// Storage for static information | ||||||
|  | 	meta map[string]string | ||||||
|  | 	// Use in case of own go routine | ||||||
|  | 	done chan bool | ||||||
|  | 	wg   sync.WaitGroup | ||||||
|  | 	// Influx stuff | ||||||
|  | 	handler *influx.MetricHandler | ||||||
|  | 	parser  *influx.Parser | ||||||
|  | 	// WaitGroup for individual connections | ||||||
|  | 	connWg   sync.WaitGroup | ||||||
|  | 	listener net.Listener | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (r *AppMetricReceiver) newConnection(conn net.Conn) { | ||||||
|  | 	//defer conn.Close() | ||||||
|  | 	//defer wg.Done() | ||||||
|  |  | ||||||
|  | 	buffer, err := bufio.NewReader(conn).ReadBytes('\n') | ||||||
|  | 	if err != nil { | ||||||
|  | 		conn.Close() | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	metrics, err := r.parser.Parse(buffer) | ||||||
|  | 	if err != nil { | ||||||
|  | 		cclog.ComponentError(r.name, "failed to parse received metrics") | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	for _, m := range metrics { | ||||||
|  | 		y := lp.FromInfluxMetric(m) | ||||||
|  | 		for k, v := range r.meta { | ||||||
|  | 			y.AddMeta(k, v) | ||||||
|  | 		} | ||||||
|  | 		if r.sink != nil { | ||||||
|  | 			r.sink <- y | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	r.newConnection(conn) | ||||||
|  |  | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (r *AppMetricReceiver) newAccepter(listenSocket net.Listener) { | ||||||
|  | accept_loop: | ||||||
|  | 	for { | ||||||
|  | 		select { | ||||||
|  | 		case <-r.done: | ||||||
|  | 			break accept_loop | ||||||
|  | 		default: | ||||||
|  | 			conn, err := listenSocket.Accept() | ||||||
|  | 			if err == nil { | ||||||
|  | 				r.connWg.Add(1) | ||||||
|  | 				go func() { | ||||||
|  | 					r.newConnection(conn) | ||||||
|  | 					r.connWg.Done() | ||||||
|  | 				}() | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	r.wg.Done() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Implement functions required for Receiver interface | ||||||
|  | // Start(), Close() | ||||||
|  | // See: metricReceiver.go | ||||||
|  |  | ||||||
|  | func (r *AppMetricReceiver) Start() { | ||||||
|  | 	var err error = nil | ||||||
|  | 	cclog.ComponentDebug(r.name, "START") | ||||||
|  |  | ||||||
|  | 	r.listener, err = net.Listen("unix", r.config.SocketFile) | ||||||
|  | 	if err != nil { | ||||||
|  | 		cclog.ComponentError(r.name, "failed to listen at socket", r.config.SocketFile) | ||||||
|  | 	} | ||||||
|  | 	if _, err := os.Stat(r.config.SocketFile); err != nil { | ||||||
|  | 		cclog.ComponentError(r.name, "failed to create socket", r.config.SocketFile) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	r.done = make(chan bool) | ||||||
|  | 	r.wg.Add(1) | ||||||
|  | 	go r.newAccepter(r.listener) | ||||||
|  |  | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Close receiver: close network connection, close files, close libraries, ... | ||||||
|  | func (r *AppMetricReceiver) Close() { | ||||||
|  | 	cclog.ComponentDebug(r.name, "CLOSE") | ||||||
|  |  | ||||||
|  | 	if _, err := os.Stat(r.config.SocketFile); err == nil { | ||||||
|  | 		if err := os.RemoveAll(r.config.SocketFile); err != nil { | ||||||
|  | 			cclog.ComponentError(r.name, "Failed to remove UNIX socket", r.config.SocketFile) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// in case of own go routine, send the signal and wait | ||||||
|  | 	r.listener.Close() | ||||||
|  | 	r.done <- true | ||||||
|  | 	close(r.done) | ||||||
|  | 	r.connWg.Wait() | ||||||
|  | 	r.wg.Wait() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // New function to create a new instance of the receiver | ||||||
|  | // Initialize the receiver by giving it a name and reading in the config JSON | ||||||
|  | func NewAppMetricReceiver(name string, config json.RawMessage) (Receiver, error) { | ||||||
|  | 	r := new(AppMetricReceiver) | ||||||
|  |  | ||||||
|  | 	// Set name of SampleReceiver | ||||||
|  | 	// The name should be chosen in such a way that different instances of SampleReceiver can be distinguished | ||||||
|  | 	r.name = fmt.Sprintf("AppMetricReceiver(%s)", name) | ||||||
|  |  | ||||||
|  | 	// Set static information | ||||||
|  | 	r.meta = map[string]string{"source": r.name} | ||||||
|  |  | ||||||
|  | 	// Set defaults in r.config | ||||||
|  | 	// Allow overwriting these defaults by reading config JSON | ||||||
|  | 	r.config.SocketFile = "/tmp/cc.sock" | ||||||
|  |  | ||||||
|  | 	// Read the sample receiver specific JSON config | ||||||
|  | 	if len(config) > 0 { | ||||||
|  | 		err := json.Unmarshal(config, &r.config) | ||||||
|  | 		if err != nil { | ||||||
|  | 			cclog.ComponentError(r.name, "Error reading config:", err.Error()) | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if len(r.config.SocketFile) == 0 { | ||||||
|  | 		cclog.ComponentError(r.name, "Invalid socket_file setting:", r.config.SocketFile) | ||||||
|  | 		return nil, fmt.Errorf("invalid socket_file setting: %s", r.config.SocketFile) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Check that all required fields in the configuration are set | ||||||
|  | 	// Use 'if len(r.config.Option) > 0' for strings | ||||||
|  | 	r.handler = influx.NewMetricHandler() | ||||||
|  | 	r.parser = influx.NewParser(r.handler) | ||||||
|  | 	r.parser.SetTimeFunc(DefaultTime) | ||||||
|  |  | ||||||
|  | 	return r, nil | ||||||
|  | } | ||||||
							
								
								
									
										23
									
								
								receivers/appMetricReceiver.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								receivers/appMetricReceiver.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,23 @@ | |||||||
|  | ## `appmetrics` receiver | ||||||
|  |  | ||||||
|  | The `appmetrics` receiver can be used to submit metrics from an application into the monitoring system. It listens for incoming connections on a UNIX socket. | ||||||
|  |  | ||||||
|  | ### Configuration structure | ||||||
|  |  | ||||||
|  | ```json | ||||||
|  | { | ||||||
|  |   "<name>": { | ||||||
|  |     "type": "appmetrics", | ||||||
|  |     "socket_file" : "/tmp/cc.sock", | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | - `type`: makes the receiver a `appmetrics` receiver | ||||||
|  | - `socket_file`: Listen UNIX socket | ||||||
|  |  | ||||||
|  | ### Inputs from applications | ||||||
|  |  | ||||||
|  | Applcations can connect to the `appmetrics` socket and provide metric in the [InfluxDB line protocol](https://github.com/influxdata/line-protocol). It is currently not possible to submit meta information as the Influx line protocol does not know them. | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -14,6 +14,7 @@ var AvailableReceivers = map[string]func(name string, config json.RawMessage) (R | |||||||
| 	"ipmi":       NewIPMIReceiver, | 	"ipmi":       NewIPMIReceiver, | ||||||
| 	"nats":       NewNatsReceiver, | 	"nats":       NewNatsReceiver, | ||||||
| 	"redfish":    NewRedfishReceiver, | 	"redfish":    NewRedfishReceiver, | ||||||
|  | 	"appmetrics": NewAppMetricReceiver, | ||||||
| } | } | ||||||
|  |  | ||||||
| type receiveManager struct { | type receiveManager struct { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user