1
0
Fork 0
mirror of https://github.com/gocsaf/csaf.git synced 2025-12-23 00:32:55 +01:00
gocsaf/cmd/csaf_aggregator/interim.go
2022-08-03 15:32:05 +02:00

381 lines
8.4 KiB
Go

// This file is Free Software under the MIT License
// without warranty, see README.md and LICENSES/MIT.txt for details.
//
// SPDX-License-Identifier: MIT
//
// SPDX-FileCopyrightText: 2022 German Federal Office for Information Security (BSI) <https://www.bsi.bund.de>
// Software-Engineering: 2022 Intevation GmbH <https://intevation.de>
package main
import (
"bytes"
"crypto/sha256"
"crypto/sha512"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/csaf-poc/csaf_distribution/csaf"
"github.com/csaf-poc/csaf_distribution/util"
)
type interimJob struct {
provider *provider
err error
}
// statusExpr is used as an expression to check the new status
// of an advisory which was interim before.
const statusExpr = `$.document.tracking.status`
// checkInterims checks the current status of the given
// interim advisories. It returns a slice of advisories
// which are not finished, yet.
func (w *worker) checkInterims(
tx *lazyTransaction,
label string,
interims []interimsEntry,
) ([]interimsEntry, error) {
var data bytes.Buffer
labelPath := filepath.Join(tx.Src(), label)
// advisories which are not interim any longer.
var notFinalized []interimsEntry
for _, interim := range interims {
local := filepath.Join(labelPath, interim.path())
url := interim.url()
// Load local SHA256 of the advisory
localHash, err := util.HashFromFile(local + ".sha256")
if err != nil {
return nil, err
}
res, err := w.client.Get(url)
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("fetching %s failed: Status code %d (%s)",
url, res.StatusCode, res.Status)
}
s256 := sha256.New()
data.Reset()
hasher := io.MultiWriter(s256, &data)
var doc interface{}
if err := func() error {
defer res.Body.Close()
tee := io.TeeReader(res.Body, hasher)
return json.NewDecoder(tee).Decode(&doc)
}(); err != nil {
return nil, err
}
remoteHash := s256.Sum(nil)
// If the hashes are equal then we can ignore this advisory.
if bytes.Equal(localHash, remoteHash) {
notFinalized = append(notFinalized, interim)
continue
}
errors, err := csaf.ValidateCSAF(doc)
if err != nil {
return nil, fmt.Errorf("failed to validate %s: %v", url, err)
}
// XXX: Should we return an error here?
for _, e := range errors {
log.Printf("validation error: %s: %v\n", url, e)
}
// We need to write the changed content.
// This will start the transcation if not already started.
dst, err := tx.Dst()
if err != nil {
return nil, err
}
// Overwrite in the cloned folder.
nlocal := filepath.Join(dst, label, interim.path())
bytes := data.Bytes()
if err := os.WriteFile(nlocal, bytes, 0644); err != nil {
return nil, err
}
name := filepath.Base(nlocal)
if err := util.WriteHashToFile(
nlocal+".sha512", name, sha512.New(), bytes,
); err != nil {
return nil, err
}
if err := util.WriteHashSumToFile(
nlocal+".sha256", name, remoteHash,
); err != nil {
return nil, err
}
// Download the signature
sigURL := url + ".asc"
ascFile := nlocal + ".asc"
// Download the signature or sign it our self.
if err := w.downloadSignatureOrSign(sigURL, ascFile, bytes); err != nil {
return nil, err
}
// Check if we can remove this advisory as it is not interim any more.
var status string
if err := w.expr.Extract(statusExpr, util.StringMatcher(&status), true, doc); err != nil {
return nil, err
}
if status == "interim" {
notFinalized = append(notFinalized, interim)
}
}
return notFinalized, nil
}
// setupProviderInterim prepares the worker for a specific provider.
func (w *worker) setupProviderInterim(provider *provider) {
log.Printf("worker #%d: %s (%s)\n",
w.num, provider.Name, provider.Domain)
w.dir = ""
w.provider = provider
// Each job needs a separate client.
w.client = w.processor.cfg.httpClient(provider)
}
func (w *worker) interimWork(wg *sync.WaitGroup, jobs <-chan *interimJob) {
defer wg.Done()
path := filepath.Join(w.processor.cfg.Web, ".well-known", "csaf-aggregator")
tooOld := w.processor.cfg.tooOldForInterims()
for j := range jobs {
w.setupProviderInterim(j.provider)
providerPath := filepath.Join(path, j.provider.Name)
j.err = func() error {
tx := newLazyTransaction(providerPath, w.processor.cfg.Folder)
defer tx.rollback()
// Try all the labels
for _, label := range []string{
csaf.TLPLabelUnlabeled,
csaf.TLPLabelWhite,
csaf.TLPLabelGreen,
csaf.TLPLabelAmber,
csaf.TLPLabelRed,
} {
label = strings.ToLower(label)
labelPath := filepath.Join(providerPath, label)
interCSV := filepath.Join(labelPath, interimsCSV)
interims, olds, err := readInterims(interCSV, tooOld)
if err != nil {
return err
}
// no interims found -> next label.
if len(interims) == 0 {
continue
}
// Compare locals against remotes.
notFinalized, err := w.checkInterims(tx, label, interims)
if err != nil {
return err
}
// Nothing has changed.
if len(notFinalized) == len(interims) {
continue
}
// Simply append the olds. Maybe we got re-configured with
// a greater interims interval later.
notFinalized = append(notFinalized, olds...)
// We want to write in the transaction folder.
dst, err := tx.Dst()
if err != nil {
return err
}
ninterCSV := filepath.Join(dst, label, interimsCSV)
if err := writeInterims(ninterCSV, notFinalized); err != nil {
return err
}
}
return tx.commit()
}()
}
}
// joinErrors creates an aggregated error of the messages
// of the given errors.
func joinErrors(errs []error) error {
if len(errs) == 0 {
return nil
}
var b strings.Builder
for i, err := range errs {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(err.Error())
}
return errors.New(b.String())
}
// interim performs the short interim check/update.
func (p *processor) interim() error {
if !p.cfg.runAsMirror() {
return errors.New("interim in lister mode does not work")
}
queue := make(chan *interimJob)
var wg sync.WaitGroup
log.Printf("Starting %d workers.\n", p.cfg.Workers)
for i := 1; i <= p.cfg.Workers; i++ {
wg.Add(1)
w := newWorker(i, p)
go w.interimWork(&wg, queue)
}
jobs := make([]interimJob, len(p.cfg.Providers))
for i, p := range p.cfg.Providers {
jobs[i] = interimJob{provider: p}
queue <- &jobs[i]
}
close(queue)
wg.Wait()
var errs []error
for i := range jobs {
if err := jobs[i].err; err != nil {
errs = append(errs, err)
}
}
return joinErrors(errs)
}
type interimsEntry [3]string
// func (ie interimsEntry) date() string { return ie[0] }
func (ie interimsEntry) path() string { return ie[1] }
func (ie interimsEntry) url() string { return ie[2] }
func writeInterims(interimsCSV string, interims []interimsEntry) error {
if len(interims) == 0 {
return os.RemoveAll(interimsCSV)
}
// Overwrite old. It's save because we are in a transaction.
f, err := os.Create(interimsCSV)
if err != nil {
return err
}
c := csv.NewWriter(f)
for _, ie := range interims {
if err := c.Write(ie[:]); err != nil {
return err
}
}
c.Flush()
err1 := c.Error()
err2 := f.Close()
if err1 != nil {
return err1
}
return err2
}
// readInterims scans a interims.csv file for matching
// interim advisories. Its sorted with youngest
// first, so we can stop scanning if entries get too old.
// It returns two slices: The advisories that are young enough
// and a slice of the advisories that are too old.
func readInterims(
interimsCSV string,
tooOld func(time.Time) bool,
) ([]interimsEntry, []interimsEntry, error) {
interimsF, err := os.Open(interimsCSV)
if err != nil {
// None existing file -> no interims.
if os.IsNotExist(err) {
return nil, nil, nil
}
return nil, nil, err
}
defer interimsF.Close()
c := csv.NewReader(interimsF)
c.FieldsPerRecord = 3
var files, olds []interimsEntry
youngEnough := true
for {
row, err := c.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, nil, err
}
if youngEnough {
t, err := time.Parse(time.RFC3339, row[0])
if err != nil {
return nil, nil, err
}
if tooOld(t) {
olds = []interimsEntry{{row[0], row[1], row[2]}}
youngEnough = false
} else {
files = append(files, interimsEntry{row[0], row[1], row[2]})
}
} else {
// These are too old.
olds = append(olds, interimsEntry{row[0], row[1], row[2]})
}
}
return files, olds, nil
}