1
0
Fork 0
mirror of https://github.com/gocsaf/csaf.git synced 2025-12-22 05:40:11 +01:00

Merge pull request #530 from oxisto/slog

Added support for structured logging in `csaf_aggregator`
This commit is contained in:
Bernhard Herzog 2024-04-25 13:13:11 +02:00 committed by GitHub
commit 617deb4c17
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 135 additions and 91 deletions

View file

@ -12,7 +12,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"runtime"
@ -178,9 +178,11 @@ func (p *provider) ageAccept(c *config) func(time.Time) bool {
}
if c.Verbose {
log.Printf(
"Setting up filter to accept advisories within time range %s to %s\n",
r[0].Format(time.RFC3339), r[1].Format(time.RFC3339))
slog.Debug(
"Setting up filter to accept advisories within time range",
"from", r[0].Format(time.RFC3339),
"to", r[1].Format(time.RFC3339),
)
}
return r.Contains
}
@ -393,6 +395,17 @@ func (c *config) setDefaults() {
}
}
// prepareLogging sets up the structured logging.
func (c *config) prepareLogging() error {
ho := slog.HandlerOptions{
Level: slog.LevelDebug,
}
handler := slog.NewTextHandler(os.Stdout, &ho)
logger := slog.New(handler)
slog.SetDefault(logger)
return nil
}
// compileIgnorePatterns compiles the configured patterns to be ignored.
func (p *provider) compileIgnorePatterns() error {
pm, err := filter.NewPatternMatcher(p.IgnorePattern)

View file

@ -11,7 +11,7 @@ package main
import (
"errors"
"fmt"
"log"
"log/slog"
"os"
"path/filepath"
"strings"
@ -29,11 +29,13 @@ type fullJob struct {
err error
}
// setupProviderFull fetches the provider-metadate.json for a specific provider.
// setupProviderFull fetches the provider-metadata.json for a specific provider.
func (w *worker) setupProviderFull(provider *provider) error {
log.Printf("worker #%d: %s (%s)\n",
w.num, provider.Name, provider.Domain)
w.log.Info("Setting up provider",
"provider", slog.GroupValue(
slog.String("name", provider.Name),
slog.String("domain", provider.Domain),
))
w.dir = ""
w.provider = provider
@ -55,7 +57,7 @@ func (w *worker) setupProviderFull(provider *provider) error {
"provider-metadata.json has %d validation issues", len(errors))
}
log.Printf("provider-metadata: %s\n", w.loc)
w.log.Info("Using provider-metadata", "url", w.loc)
return nil
}
@ -79,7 +81,7 @@ func (w *worker) fullWork(wg *sync.WaitGroup, jobs <-chan *fullJob) {
func (p *processor) full() error {
if p.cfg.runAsMirror() {
log.Println("Running in aggregator mode")
p.log.Info("Running in aggregator mode")
// check if we need to setup a remote validator
if p.cfg.RemoteValidatorOptions != nil {
@ -96,16 +98,18 @@ func (p *processor) full() error {
}()
}
} else {
log.Println("Running in lister mode")
p.log.Info("Running in lister mode")
}
queue := make(chan *fullJob)
var wg sync.WaitGroup
log.Printf("Starting %d workers.\n", p.cfg.Workers)
p.log.Info("Starting workers...", "num", p.cfg.Workers)
for i := 1; i <= p.cfg.Workers; i++ {
wg.Add(1)
w := newWorker(i, p)
go w.fullWork(&wg, queue)
}
@ -135,12 +139,22 @@ func (p *processor) full() error {
for i := range jobs {
j := &jobs[i]
if j.err != nil {
log.Printf("error: '%s' failed: %v\n", j.provider.Name, j.err)
p.log.Error("Job execution failed",
slog.Group("job",
slog.Group("provider"),
"name", j.provider.Name,
),
"err", j.err,
)
continue
}
if j.aggregatorProvider == nil {
log.Printf(
"error: '%s' does not produce any result.\n", j.provider.Name)
p.log.Error("Job did not produce any result",
slog.Group("job",
slog.Group("provider"),
"name", j.provider.Name,
),
)
continue
}

View file

@ -12,7 +12,6 @@ import (
"bufio"
"encoding/csv"
"fmt"
"log"
"os"
"path/filepath"
"sort"
@ -377,7 +376,7 @@ func (w *worker) writeIndices() error {
}
for label, summaries := range w.summaries {
log.Printf("%s: %d\n", label, len(summaries))
w.log.Debug("Writing indices", "label", label, "summaries.num", len(summaries))
if err := w.writeInterims(label, summaries); err != nil {
return err
}

View file

@ -17,7 +17,6 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
@ -102,12 +101,12 @@ func (w *worker) checkInterims(
// XXX: Should we return an error here?
for _, e := range errors {
log.Printf("validation error: %s: %v\n", url, e)
w.log.Error("validation error", "url", url, "err", e)
}
// We need to write the changed content.
// This will start the transcation if not already started.
// This will start the transaction if not already started.
dst, err := tx.Dst()
if err != nil {
return nil, err
@ -159,8 +158,7 @@ func (w *worker) checkInterims(
// setupProviderInterim prepares the worker for a specific provider.
func (w *worker) setupProviderInterim(provider *provider) {
log.Printf("worker #%d: %s (%s)\n",
w.num, provider.Name, provider.Domain)
w.log.Info("Setting up worker", provider.Name, provider.Domain)
w.dir = ""
w.provider = provider
@ -262,7 +260,7 @@ func (p *processor) interim() error {
queue := make(chan *interimJob)
var wg sync.WaitGroup
log.Printf("Starting %d workers.\n", p.cfg.Workers)
p.log.Info("Starting workers...", "num", p.cfg.Workers)
for i := 1; i <= p.cfg.Workers; i++ {
wg.Add(1)
w := newWorker(i, p)

View file

@ -9,7 +9,7 @@
package main
import (
"log"
"log/slog"
"os"
"path/filepath"
@ -85,7 +85,8 @@ func (lt *lazyTransaction) commit() error {
os.RemoveAll(lt.dst)
return err
}
log.Printf("Move %q -> %q\n", symlink, lt.src)
slog.Debug("Moving directory", "from", symlink, "to", lt.src)
if err := os.Rename(symlink, lt.src); err != nil {
os.RemoveAll(lt.dst)
return err

View file

@ -11,10 +11,12 @@ package main
import (
"fmt"
"log/slog"
"os"
"path/filepath"
"github.com/csaf-poc/csaf_distribution/v3/internal/options"
"github.com/gofrs/flock"
)
@ -44,8 +46,9 @@ func lock(lockFile *string, fn func() error) error {
func main() {
_, cfg, err := parseArgsConfig()
options.ErrorCheck(err)
options.ErrorCheck(cfg.prepare())
p := processor{cfg: cfg}
options.ErrorCheck(lock(cfg.LockFile, p.process))
cfg.prepareLogging()
options.ErrorCheckStructured(err)
options.ErrorCheckStructured(cfg.prepare())
p := processor{cfg: cfg, log: slog.Default()}
options.ErrorCheckStructured(lock(cfg.LockFile, p.process))
}

View file

@ -16,7 +16,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"net/url"
"os"
@ -47,7 +47,7 @@ func (w *worker) mirror() (*csaf.AggregatorCSAFProvider, error) {
if err != nil && w.dir != "" {
// If something goes wrong remove the debris.
if err := os.RemoveAll(w.dir); err != nil {
log.Printf("error: %v\n", err)
w.log.Error("Could not remove directory", "path", w.dir, "err", err)
}
}
return result, err
@ -166,7 +166,7 @@ func (w *worker) writeProviderMetadata() error {
{Expr: `$.public_openpgp_keys`, Action: util.ReMarshalMatcher(&pm.PGPKeys)},
}, w.metadataProvider); err != nil {
// only log the errors
log.Printf("extracting data from orignal provider failed: %v\n", err)
w.log.Error("Extracting data from original provider failed", "err", err)
}
// We are mirroring the remote public keys, too.
@ -196,11 +196,11 @@ func (w *worker) mirrorPGPKeys(pm *csaf.ProviderMetadata) error {
for i := range pm.PGPKeys {
pgpKey := &pm.PGPKeys[i]
if pgpKey.URL == nil {
log.Printf("ignoring PGP key without URL: %s\n", pgpKey.Fingerprint)
w.log.Warn("Ignoring PGP key without URL", "fingerprint", pgpKey.Fingerprint)
continue
}
if _, err := hex.DecodeString(string(pgpKey.Fingerprint)); err != nil {
log.Printf("ignoring PGP with invalid fingerprint: %s\n", *pgpKey.URL)
w.log.Warn("Ignoring PGP key with invalid fingerprint", "url", *pgpKey.URL)
continue
}
@ -344,7 +344,7 @@ func (w *worker) doMirrorTransaction() error {
// Check if there is a sysmlink already.
target := filepath.Join(w.processor.cfg.Folder, w.provider.Name)
log.Printf("target: '%s'\n", target)
w.log.Debug("Checking for path existance", "path", target)
exists, err := util.PathExists(target)
if err != nil {
@ -359,7 +359,7 @@ func (w *worker) doMirrorTransaction() error {
}
}
log.Printf("sym link: %s -> %s\n", w.dir, target)
w.log.Debug("Creating sym link", "from", w.dir, "to", target)
// Create a new symlink
if err := os.Symlink(w.dir, target); err != nil {
@ -368,7 +368,7 @@ func (w *worker) doMirrorTransaction() error {
}
// Move the symlink
log.Printf("Move: %s -> %s\n", target, webTarget)
w.log.Debug("Moving sym link", "from", target, "to", webTarget)
if err := os.Rename(target, webTarget); err != nil {
os.RemoveAll(w.dir)
return err
@ -499,14 +499,14 @@ func (w *worker) mirrorFiles(tlpLabel csaf.TLPLabel, files []csaf.AdvisoryFile)
u, err := url.Parse(file.URL())
if err != nil {
log.Printf("error: %s\n", err)
w.log.Error("Could not parse advisory file URL", "err", err)
continue
}
// Should we ignore this advisory?
if w.provider.ignoreURL(file.URL(), w.processor.cfg) {
if w.processor.cfg.Verbose {
log.Printf("Ignoring %s: %q\n", w.provider.Name, file.URL())
w.log.Info("Ignoring advisory", slog.Group("provider", "name", w.provider.Name), "file", file)
}
continue
}
@ -514,7 +514,7 @@ func (w *worker) mirrorFiles(tlpLabel csaf.TLPLabel, files []csaf.AdvisoryFile)
// Ignore not conforming filenames.
filename := filepath.Base(u.Path)
if !util.ConformingFileName(filename) {
log.Printf("Not conforming filename %q. Ignoring.\n", filename)
w.log.Warn("Ignoring advisory because of non-conforming filename", "filename", filename)
continue
}
@ -531,19 +531,18 @@ func (w *worker) mirrorFiles(tlpLabel csaf.TLPLabel, files []csaf.AdvisoryFile)
}
if err := downloadJSON(w.client, file.URL(), download); err != nil {
log.Printf("error: %v\n", err)
w.log.Error("Error while downloading JSON", "err", err)
continue
}
// Check against CSAF schema.
errors, err := csaf.ValidateCSAF(advisory)
if err != nil {
log.Printf("error: %s: %v", file, err)
w.log.Error("Error while validating CSAF schema", "err", err)
continue
}
if len(errors) > 0 {
log.Printf("CSAF file %s has %d validation errors.\n",
file, len(errors))
w.log.Error("CSAF file has validation errors", "num.errors", len(errors), "file", file)
continue
}
@ -551,29 +550,27 @@ func (w *worker) mirrorFiles(tlpLabel csaf.TLPLabel, files []csaf.AdvisoryFile)
if rmv := w.processor.remoteValidator; rmv != nil {
rvr, err := rmv.Validate(advisory)
if err != nil {
log.Printf("Calling remote validator failed: %s\n", err)
w.log.Error("Calling remote validator failed", "err", err)
continue
}
if !rvr.Valid {
log.Printf(
"CSAF file %s does not validate remotely.\n", file)
w.log.Error("CSAF file does not validate remotely", "file", file.URL())
continue
}
}
sum, err := csaf.NewAdvisorySummary(w.expr, advisory)
if err != nil {
log.Printf("error: %s: %v\n", file, err)
w.log.Error("Error while creating new advisory", "file", file, "err", err)
continue
}
if util.CleanFileName(sum.ID) != filename {
log.Printf("ID %q does not match filename %s",
sum.ID, filename)
w.log.Error("ID mismatch", "id", sum.ID, "filename", filename)
}
if err := w.extractCategories(label, advisory); err != nil {
log.Printf("error: %s: %v\n", file, err)
w.log.Error("Could not extract categories", "file", file, "err", err)
continue
}
@ -624,7 +621,7 @@ func (w *worker) downloadSignatureOrSign(url, fname string, data []byte) error {
if err != nil {
if err != errNotFound {
log.Printf("error: %s: %v\n", url, err)
w.log.Error("Could not find signature URL", "url", url, "err", err)
}
// Sign it our self.
if sig, err = w.sign(data); err != nil {

View file

@ -10,14 +10,14 @@ package main
import (
"fmt"
"log"
"log/slog"
"os"
"path/filepath"
"github.com/ProtonMail/gopenpgp/v2/crypto"
"github.com/csaf-poc/csaf_distribution/v3/csaf"
"github.com/csaf-poc/csaf_distribution/v3/util"
"github.com/ProtonMail/gopenpgp/v2/crypto"
)
type processor struct {
@ -26,6 +26,9 @@ type processor struct {
// remoteValidator is a globally configured remote validator.
remoteValidator csaf.RemoteValidator
// log is the structured logger for the whole processor.
log *slog.Logger
}
type summary struct {
@ -48,6 +51,7 @@ type worker struct {
dir string // Directory to store data to.
summaries map[string][]summary // the summaries of the advisories.
categories map[string]util.Set[string] // the categories per label.
log *slog.Logger // the structured logger, supplied with the worker number.
}
func newWorker(num int, processor *processor) *worker {
@ -55,6 +59,7 @@ func newWorker(num int, processor *processor) *worker {
num: num,
processor: processor,
expr: util.NewPathEval(),
log: processor.log.With(slog.Int("worker", num)),
}
}
@ -86,9 +91,10 @@ func (w *worker) locateProviderMetadata(domain string) error {
if w.processor.cfg.Verbose {
for i := range lpmd.Messages {
log.Printf(
"Loading provider-metadata.json of %q: %s\n",
domain, lpmd.Messages[i].Message)
w.log.Info(
"Loading provider-metadata.json",
"domain", domain,
"message", lpmd.Messages[i].Message)
}
}
@ -141,7 +147,7 @@ func (p *processor) removeOrphans() error {
fi, err := entry.Info()
if err != nil {
log.Printf("error: %v\n", err)
p.log.Error("Could not retrieve file info", "err", err)
continue
}
@ -153,13 +159,13 @@ func (p *processor) removeOrphans() error {
d := filepath.Join(path, entry.Name())
r, err := filepath.EvalSymlinks(d)
if err != nil {
log.Printf("error: %v\n", err)
p.log.Error("Could not evaluate symlink", "err", err)
continue
}
fd, err := os.Stat(r)
if err != nil {
log.Printf("error: %v\n", err)
p.log.Error("Could not retrieve file stats", "err", err)
continue
}
@ -169,18 +175,18 @@ func (p *processor) removeOrphans() error {
}
// Remove the link.
log.Printf("removing link %s -> %s\n", d, r)
p.log.Info("Removing link", "path", fmt.Sprintf("%s -> %s", d, r))
if err := os.Remove(d); err != nil {
log.Printf("error: %v\n", err)
p.log.Error("Could not remove symlink", "err", err)
continue
}
// Only remove directories which are in our folder.
if rel, err := filepath.Rel(prefix, r); err == nil &&
rel == filepath.Base(r) {
log.Printf("removing directory %s\n", r)
p.log.Info("Remove directory", "path", r)
if err := os.RemoveAll(r); err != nil {
log.Printf("error: %v\n", err)
p.log.Error("Could not remove directory", "err", err)
}
}
}

View file

@ -13,13 +13,12 @@ import (
"fmt"
"io"
"log"
"log/slog"
"net/http"
"os"
"path/filepath"
"time"
"golang.org/x/exp/slog"
"github.com/csaf-poc/csaf_distribution/v3/internal/certs"
"github.com/csaf-poc/csaf_distribution/v3/internal/filter"
"github.com/csaf-poc/csaf_distribution/v3/internal/models"

View file

@ -19,6 +19,7 @@ import (
"fmt"
"hash"
"io"
"log/slog"
"net/http"
"net/url"
"os"
@ -29,8 +30,6 @@ import (
"sync"
"time"
"golang.org/x/exp/slog"
"github.com/ProtonMail/gopenpgp/v2/crypto"
"golang.org/x/time/rate"

View file

@ -12,14 +12,13 @@ import (
"bytes"
"crypto/tls"
"io"
"log/slog"
"mime/multipart"
"net/http"
"os"
"path/filepath"
"strings"
"golang.org/x/exp/slog"
"github.com/csaf-poc/csaf_distribution/v3/internal/misc"
"github.com/csaf-poc/csaf_distribution/v3/util"
)

View file

@ -14,6 +14,7 @@ import (
"encoding/json"
"errors"
"io"
"log/slog"
"mime"
"mime/multipart"
"net/http"
@ -22,8 +23,6 @@ import (
"strings"
"testing"
"golang.org/x/exp/slog"
"github.com/csaf-poc/csaf_distribution/v3/internal/options"
"github.com/csaf-poc/csaf_distribution/v3/util"
)

View file

@ -11,11 +11,10 @@ package main
import (
"context"
"log/slog"
"os"
"os/signal"
"golang.org/x/exp/slog"
"github.com/csaf-poc/csaf_distribution/v3/internal/options"
)

View file

@ -8,7 +8,7 @@
package main
import "golang.org/x/exp/slog"
import "log/slog"
// stats contains counters of the downloads.
type stats struct {

View file

@ -11,9 +11,8 @@ package main
import (
"bytes"
"encoding/json"
"log/slog"
"testing"
"golang.org/x/exp/slog"
)
func TestStatsAdd(t *testing.T) {