Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/ssh-watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/kelseyhightower/envconfig"
"github.com/mgla96/ssh-watcher/internal/linetracker"
"github.com/mgla96/ssh-watcher/internal/notifier"
"github.com/mgla96/ssh-watcher/internal/watcher"
"github.com/rs/zerolog"
Expand All @@ -26,6 +27,9 @@ type Config struct {
WatchFailedLogin bool `envconfig:"WATCH_SETTINGS_FAILED_LOGIN" default:"true"`
WatchFailedLoginInvalidUsername bool `envconfig:"WATCH_SETTINGS_FAILED_LOGIN_INVALID_USERNAME" default:"false"`
WatchSleepIntervalSeconds int `envconfig:"WATCH_SETTINGS_SLEEP_INTERVAL_SECONDS" default:"2"`
// StateFilePath is location of file that keeps track of the last processed line
// by ssh watcher so restarts of the service do not reprocess all ssh history.
StateFilePath string `envconfig:"STATE_FILE_PATH" default:"/var/lib/ssh-watcher/authlog-state"`
}

func loadConfig() (*Config, error) {
Expand All @@ -46,6 +50,7 @@ func main() {
}

notifier := notifier.NewSlackNotifier(config.Slack.WebhookUrl, config.Slack.Channel, config.Slack.Username, config.Slack.Icon)
processedLineTracker := linetracker.NewFileProcessedLineTracker(config.StateFilePath)
watcher := watcher.NewLogWatcher(
config.LogFileLocation,
notifier,
Expand All @@ -56,6 +61,7 @@ func main() {
WatchFailedLoginInvalidUsername: config.WatchFailedLoginInvalidUsername,
WatchSleepInterval: time.Duration(config.WatchSleepIntervalSeconds) * time.Second,
},
processedLineTracker,
)

log.Info().Msg(fmt.Sprintf("starting watcher, webhook url: %s, logfile: %s", config.Slack.WebhookUrl, config.LogFileLocation))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/mgla96/ssh-watcher

go 1.21.4
go 1.21.5

require (
github.com/kelseyhightower/envconfig v1.4.0
Expand Down
74 changes: 74 additions & 0 deletions internal/linetracker/line_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package linetracker

import (
"bufio"
"fmt"
"os"
"path/filepath"
"strconv"

"github.com/rs/zerolog/log"
)

func NewFileProcessedLineTracker(stateFilePath string) FileProcessedLineTracker {
return FileProcessedLineTracker{
StateFilePath: stateFilePath,
}
}

func handleError(message string, err error) error {
log.Error().Err(err).Msg(message)
return fmt.Errorf("%v: %w", message, err)
}

type FileProcessedLineTracker struct {
StateFilePath string
}

// GetLastProcessedLine reads the statefile and extracts the last processed line number
// in the ssh log file.
func (f FileProcessedLineTracker) GetLastProcessedLine() (int, error) {
dir := filepath.Dir(f.StateFilePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return 0, handleError("failed to create directory for state file", err)
}

state, err := os.OpenFile(f.StateFilePath, os.O_CREATE|os.O_RDONLY, 0644)
if err != nil {
return 0, handleError("failed opening or creating state file", err)
}
defer state.Close()

scanner := bufio.NewScanner(state)
var currentLine int
for scanner.Scan() {
currentLine, err = strconv.Atoi(scanner.Text())
if err != nil {
return 0, handleError("failed converting state file line to int", err)
}
}

if err := scanner.Err(); err != nil {
return 0, fmt.Errorf("%v: %w", "error while reading state file", err)
}

return currentLine, nil
}

func (f FileProcessedLineTracker) UpdateLastProcessedLine(lineNumber int) error {
state, err := os.Create(f.StateFilePath)
if err != nil {
return handleError("failed to create or truncate state file", err)
}
defer state.Close()

if _, err := state.WriteString(fmt.Sprintf("%d", lineNumber)); err != nil {
return handleError("failed to write to state file", err)
}

if err := state.Sync(); err != nil {
return handleError("failed to syc state file", err)
}

return nil
}
File renamed without changes.
File renamed without changes.
140 changes: 45 additions & 95 deletions internal/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io/fs"
"os"
"strconv"
"strings"
"time"

Expand All @@ -14,33 +13,34 @@ import (
"github.com/rs/zerolog/log"
)

const (
// stateFile keeps track of the last processed line by ssh watcher so restarts
// of the service do not reprocess all ssh history.
stateFile = "/var/lib/ssh-watcher/authlog-state"
)

type WatchSettings struct {
WatchAcceptedLogins bool
WatchFailedLogins bool
WatchFailedLoginInvalidUsername bool
WatchSleepInterval time.Duration
}

func NewLogWatcher(logFile string, notifier notifier.Notifier, hostMachine string, watchSettings WatchSettings) LogWatcher {
func NewLogWatcher(logFile string, notifier notifier.Notifier, hostMachine string, watchSettings WatchSettings, processedLineTracker ProcessedLineTracker) LogWatcher {
return LogWatcher{
LogFile: logFile,
Notifier: notifier,
HostMachine: hostMachine,
WatchSettings: watchSettings,
LogFile: logFile,
Notifier: notifier,
HostMachine: hostMachine,
WatchSettings: watchSettings,
ProcessedLineTracker: processedLineTracker,
}
}

type ProcessedLineTracker interface {
GetLastProcessedLine() (int, error)
UpdateLastProcessedLine(lineNumber int) error
}

type LogWatcher struct {
LogFile string
Notifier notifier.Notifier
HostMachine string
WatchSettings WatchSettings
LogFile string
Notifier notifier.Notifier
HostMachine string
WatchSettings WatchSettings
ProcessedLineTracker ProcessedLineTracker
}

func (w LogWatcher) shouldSendMessage(eventType notifier.EventType) bool {
Expand All @@ -56,88 +56,34 @@ func (w LogWatcher) shouldSendMessage(eventType notifier.EventType) bool {
}
}

// getLastProcessedLine reads the statefile and extracts the last processed line number
// in the ssh log file.
func (w LogWatcher) getLastProcessedLine() int {
if _, err := os.Stat(stateFile); os.IsNotExist(err) {
return 0
}

state, err := os.Open(stateFile)
if err != nil {
log.Error().Err(err).Msg("Failed opening state file")
return 0
}
defer state.Close()

scanner := bufio.NewScanner(state)
var currentLine int
for scanner.Scan() {
currentLine, err = strconv.Atoi(scanner.Text())
if err != nil {
log.Error().Err(err).Msg("Failed converting state file line to int")
return 0
}
}

if err := scanner.Err(); err != nil {
log.Error().Err(err).Msg("Error while reading state file")
return 0
}

return currentLine

}

func (w LogWatcher) updateLastProcessedLine(lineNumber int) error {
state, err := os.Create(stateFile)
if err != nil {
message := "failed to create or truncate state file"
log.Error().Err(err).Msg(message)
return fmt.Errorf("%v: %w", message, err)
}
defer state.Close()

if _, err := state.WriteString(fmt.Sprintf("%d", lineNumber)); err != nil {
message := "failed to write to state file"
log.Error().Err(err).Msg(message)
return fmt.Errorf("%v: %w", message, err)
func (w LogWatcher) parseLogLine(line string) notifier.LogLine {
logLine := notifier.LogLine{}
if !strings.Contains(line, "sshd") {
return logLine
}

if err := state.Sync(); err != nil {
message := "failed to syc state file"
log.Error().Err(err).Msg(message)
return fmt.Errorf("%v: %w", message, err)
switch {
case strings.Contains(line, "Accepted password"), strings.Contains(line, "Accepted publickey"):
logLine.EventType = notifier.LoggedIn
case strings.Contains(strings.ToLower(line), "invalid user"):
logLine.EventType = notifier.FailedLoginAttemptInvalidUsername
case strings.Contains(line, "Failed password"), strings.Contains(line, "Connection closed by authenticating user"):
logLine.EventType = notifier.FailedLoginAttempt
}

return nil
}

func (w LogWatcher) parseLogLine(line string) notifier.LogLine {
logLine := notifier.LogLine{}
if strings.Contains(line, "sshd") {
switch {
case strings.Contains(line, "Accepted password"), strings.Contains(line, "Accepted publickey"):
logLine.EventType = notifier.LoggedIn
case strings.Contains(strings.ToLower(line), "invalid user"):
logLine.EventType = notifier.FailedLoginAttemptInvalidUsername
case strings.Contains(line, "Failed password"), strings.Contains(line, "Connection closed by authenticating user"):
logLine.EventType = notifier.FailedLoginAttempt
}

if logLine.EventType != "" {
parts := strings.Split(line, " ")
logLine.LoginTime = parts[0] + " " + parts[1]
for i, part := range parts {
if part == "from" {
logLine.IpAddress = parts[i+1]
}
if part == "user" || part == "for" {
logLine.Username = parts[i+1]
}
if logLine.EventType != "" {
parts := strings.Split(line, " ")
logLine.LoginTime = parts[0] + " " + parts[1]
for i, part := range parts {
if part == "from" {
logLine.IpAddress = parts[i+1]
}
if part == "user" || part == "for" {
logLine.Username = parts[i+1]
}
}
}

return logLine
}

Expand All @@ -161,10 +107,10 @@ func (w LogWatcher) processNewLogLines(file *os.File, lastProcessedLine int) err
}
}

err := w.updateLastProcessedLine(lineNumber)
err := w.ProcessedLineTracker.UpdateLastProcessedLine(lineNumber)
if err != nil {
log.Error().Err(err)
return err
return fmt.Errorf("%v: %w", "failed updating last processed line", err)
}
}
return nil
Expand Down Expand Up @@ -207,8 +153,12 @@ func (w LogWatcher) Watch() error {
}

if stat.Size() > lastProcessedOffset {
lastProcessedLine := w.getLastProcessedLine()
err := w.processNewLogLines(file, lastProcessedLine)
lastProcessedLine, err := w.ProcessedLineTracker.GetLastProcessedLine()
if err != nil {
return fmt.Errorf("error getting last processed line: %w", err)
}

err = w.processNewLogLines(file, lastProcessedLine)
if err != nil {
return fmt.Errorf("error processing new log lines: %w", err)
}
Expand Down